-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing #43463
[SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing #43463
Conversation
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
Outdated
Show resolved
Hide resolved
val jsonRelativeHarPath = new Path("/test.json") | ||
val parquetRelativeHarPath = new Path("/test.parquet") | ||
val orcRelativeHarPath = new Path("/test.orc") | ||
val globeRelativeHarPath = new Path("/test.*") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to test all the file format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're needed to test glob path. I decided to reuse har from DataFrameReaderWriterSuite
tests instead of creating new archive.
@tigrulya-exe Please re-trigger GA tests. |
Please fix the conflicts. |
898e77e
to
4532b46
Compare
@beliefer Hi! I fixed the conflicts and rebased on master |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan
@cloud-fan Hi! Could you take a look please? |
4532b46
to
a005f65
Compare
@cloud-fan Hi! I've rebased on master and fixed conflicts. Could you please take a look? |
The fix is straightforward but the test is convoluted. How do you test |
@cloud-fan we construct absolute file paths with We don't need to create or test the |
test("SPARK-39910: read files from Hadoop archives") { | ||
val fileSchema = new StructType().add("str", StringType) | ||
val harPath = testFile("test-data/test-archive.har") | ||
.replaceFirst("file:/", "har:/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So Spark works with har:/
paths out of the box? BTW, I think this test is good enough, we don't need to add more tests in DataSourceSuite
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the HarFileSystem
support is included in the HDFS client by default. Ok, removed tests from DataSourceSuite
, left only MockFileSystem#getUri
method to correctly qualify paths with mockFs://
scheme.
val harPath = testFile("test-data/test-archive.har") | ||
.replaceFirst("file:/", "har:/") | ||
|
||
testRead(spark.read.textFile(s"$harPath/test.txt").toDF(), data, textSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we only want to test path globbing, I think testing one file format is sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, removed file formats other than csv
a005f65
to
8161581
Compare
@@ -214,4 +216,6 @@ class MockFileSystem extends RawLocalFileSystem { | |||
override def globStatus(pathPattern: Path): Array[FileStatus] = { | |||
mockGlobResults.getOrElse(pathPattern, Array()) | |||
} | |||
|
|||
override def getUri: URI = URI.create("mockFs://mockFs/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if we don't override this method, then path check inside fs.makeQualified(path)
will fail, because it expects path with file://
scheme (MockFileSystem
inherits RawLocalFileSystem
)
…ataSource file path globbing
8161581
to
227889b
Compare
thanks, merging to master/3.5! |
…ataSource file path globbing In current version `DataSource#checkAndGlobPathIfNecessary` qualifies paths via `Path#makeQualified` and `PartitioningAwareFileIndex` qualifies via `FileSystem#makeQualified`. Most `FileSystem` implementations simply delegate to `Path#makeQualified`, but others, like `HarFileSystem` contain fs-specific logic, that can produce different result. Such inconsistencies can lead to a situation, when spark can't find partitions of the source file, because qualified paths, built by `Path` and `FileSystem` are different. Therefore, for uniformity, the `FileSystem` path qualification should be used in `DataSource#checkAndGlobPathIfNecessary`. Allow users to read files from hadoop archives (.har) using DataFrameReader API No New tests were added in `DataSourceSuite` and `DataFrameReaderWriterSuite` No Closes #43463 from tigrulya-exe/SPARK-39910-use-fs-path-qualification. Authored-by: Tigran Manasyan <t.manasyan@arenadata.io> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit b7edc5f) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
In current version
DataSource#checkAndGlobPathIfNecessary
qualifies paths viaPath#makeQualified
andPartitioningAwareFileIndex
qualifies viaFileSystem#makeQualified
. MostFileSystem
implementations simply delegate toPath#makeQualified
, but others, likeHarFileSystem
contain fs-specific logic, that can produce different result. Such inconsistencies can lead to a situation, when spark can't find partitions of the source file, because qualified paths, built byPath
andFileSystem
are different. Therefore, for uniformity, theFileSystem
path qualification should be used inDataSource#checkAndGlobPathIfNecessary
.Why are the changes needed?
Allow users to read files from hadoop archives (.har) using DataFrameReader API
Does this PR introduce any user-facing change?
No
How was this patch tested?
New tests were added in
DataSourceSuite
andDataFrameReaderWriterSuite
Was this patch authored or co-authored using generative AI tooling?
No