-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC #23601
Conversation
74c37f5
to
91689ac
Compare
Test build #101471 has finished for PR 23601 at commit
|
Due to #21381, the write path is much easier to implement. |
Test build #101484 has finished for PR 23601 at commit
|
Test build #101486 has finished for PR 23601 at commit
|
Test build #101488 has finished for PR 23601 at commit
|
Hi, @gengliangwang . Please check UT failure in your local environment first. |
Hi @dongjoon-hyun , |
I got it~ And, thanks for the fix, @gengliangwang . |
Test build #101547 has finished for PR 23601 at commit
|
Oh, SparkR seems to complain for some reasons.
|
Test build #101575 has finished for PR 23601 at commit
|
retest this please. |
Test build #101580 has finished for PR 23601 at commit
|
* Returns whether this format supports the given [[DataType]] in write path. | ||
* By default all data types are supported. | ||
*/ | ||
def supportDataType(dataType: DataType): Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to find a better solution for this. Mark this PR as WIP
for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can implement the supportDataType API in another PR. This PR is ready for review.
2ca90a7
to
5fda97e
Compare
Ur, #23639 seems to make conflicts. Could you resolve the conflicts? |
Test build #101732 has finished for PR 23601 at commit
|
5fda97e
to
9538a1b
Compare
Test build #101761 has finished for PR 23601 at commit
|
9538a1b
to
5358ad4
Compare
Test build #101762 has finished for PR 23601 at commit
|
committer: FileCommitProtocol) | ||
extends BatchWrite { | ||
override def commit(messages: Array[WriterCommitMessage]): Unit = { | ||
committer.commitJob(job, messages.map(_.asInstanceOf[WriteTaskResult].commitMsg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we call FileFormatWriter.processStats
here?
Test build #101819 has finished for PR 23601 at commit
|
f.fallBackFileFormat | ||
case _ => lookupCls | ||
} | ||
// SPARK-26673: In Data Source V2 project, partitioning is still under development. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove this (SPARK-26673
) since this is the current PR's JIRA?
} | ||
// SPARK-26673: In Data Source V2 project, partitioning is still under development. | ||
// Here we fallback to V1 if the write path if output partitioning is required. | ||
// TODO: use V2 implementations when partitioning feature is supported. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clearly mention what JIRA ID is for this TODO?
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable | |||
* E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into view `t` fails | |||
* since there is no corresponding physical plan. | |||
* SPARK-23817: This is a temporary hack for making current data source V2 work. It should be | |||
* removed when write path of file data source v2 is finished. | |||
* removed when Catalog of file data source v2 is finished. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catalog of file data source v2 is finished
? Does this mean catalog support of file data source v2
?
Test build #101869 has finished for PR 23601 at commit
|
retest this please. |
Test build #101883 has finished for PR 23601 at commit
|
case _ => lookupCls | ||
} | ||
// In Data Source V2 project, partitioning is still under development. | ||
// Here we fallback to V1 if the write path if output partitioning is required. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we fallback to V1 if partitioning columns are specified
this | ||
} | ||
|
||
override def buildForBatch(): BatchWrite = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is too long, could be better if we can separate it into multiple methods
@@ -56,18 +56,25 @@ case class WriteToDataSourceV2Exec(batchWrite: BatchWrite, query: SparkPlan) | |||
val writerFactory = batchWrite.createBatchWriterFactory() | |||
val useCommitCoordinator = batchWrite.useCommitCoordinator | |||
val rdd = query.execute() | |||
val messages = new Array[WriterCommitMessage](rdd.partitions.length) | |||
// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single | |||
// partition rdd to make sure we at least set up one write task to write the metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok for now, but we should improve it later:
- use a config to do it, it seems only file source need it
- or do it in
FileBatchWrite.commit
. If commit messages are empty, write a metadata file.
LGTM except a few minor comments |
Test build #101937 has finished for PR 23601 at commit
|
Test build #101939 has finished for PR 23601 at commit
|
thanks, merging to master! |
…ast object in FileWriterFactory ## What changes were proposed in this pull request? This is a followup PR to fix two issues in #23601: 1. the class `FileWriterFactory` contains `conf: SerializableConfiguration` as a member, which is duplicated with `WriteJobDescription. serializableHadoopConf `. By removing it we can reduce the broadcast task binary size by around 70KB 2. The test suite `OrcV1QuerySuite`/`OrcV1QuerySuite`/`OrcV1PartitionDiscoverySuite` didn't change the configuration `SQLConf.USE_V1_SOURCE_WRITER_LIST` to `"orc"`. We should set the conf. ## How was this patch tested? Unit test Closes #23800 from gengliangwang/reduceWriteTaskSize. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…e ORC ## What changes were proposed in this pull request? Create a framework for write path of File Source V2. Also, migrate write path of ORC to V2. Supported: * Write to file as Dataframe Not Supported: * Partitioning, which is still under development in the data source V2 project. * Bucketing, which is still under development in the data source V2 project. * Catalog. ## How was this patch tested? Unit test Closes apache#23601 from gengliangwang/orc_write. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ast object in FileWriterFactory ## What changes were proposed in this pull request? This is a followup PR to fix two issues in apache#23601: 1. the class `FileWriterFactory` contains `conf: SerializableConfiguration` as a member, which is duplicated with `WriteJobDescription. serializableHadoopConf `. By removing it we can reduce the broadcast task binary size by around 70KB 2. The test suite `OrcV1QuerySuite`/`OrcV1QuerySuite`/`OrcV1PartitionDiscoverySuite` didn't change the configuration `SQLConf.USE_V1_SOURCE_WRITER_LIST` to `"orc"`. We should set the conf. ## How was this patch tested? Unit test Closes apache#23800 from gengliangwang/reduceWriteTaskSize. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
null | ||
|
||
case SaveMode.Overwrite => | ||
committer.deleteWithJob(fs, path, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happened if the path does not exist? It is possible that the underlying committer's deleteWithJob might not handle this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (fs.exists(path)) {
committer.deleteWithJob(fs, path, recursive = true)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile I check the source code. Actually, all the implementations (that I can see in IDE) handle the case that the file path does not exist. But in InsertIntoHadoopFsRelationCommand
the deleteWithJob
is used as following:
if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
throw new IOException(s"Unable to clear partition " +
s"directory $path prior to writing to it")
}
Should we follow it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea let's follow it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, create #23889 for this.
…t path before delete it ## What changes were proposed in this pull request? This is a followup PR to resolve comment: apache#23601 (review) When Spark writes DataFrame with "overwrite" mode, it deletes the output path before actual writes. To safely handle the case that the output path doesn't exist, it is suggested to follow the V1 code by checking the existence. ## How was this patch tested? Apply apache#23836 and run unit tests Closes apache#23889 from gengliangwang/checkFileBeforeOverwrite. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
What changes were proposed in this pull request?
Create a framework for write path of File Source V2.
Also, migrate write path of ORC to V2.
Supported:
Not Supported:
How was this patch tested?
Unit test