-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26666][SQL] Support DSv2 overwrite and dynamic partition overwrite. #23606
Conversation
019ac10
to
5766c1f
Compare
@mccheah, @felixcheung, and @cloud-fan, can you review this? It adds overwrite plans for DSv2. |
This comment has been minimized.
This comment has been minimized.
5766c1f
to
233d5c5
Compare
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see very many concerns, overall this looks very good. Had a minor comment about a var name.
|
||
object OverwriteByExpression { | ||
def byName( | ||
table: NamedRelation, df: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason it's df
here and query
in byPosition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The DataFrame API uses byName
, while the SQL path uses byPosition
.
isByName: Boolean) extends V2WriteCommand | ||
|
||
object OverwritePartitionsDynamic { | ||
def byName(table: NamedRelation, df: LogicalPlan): OverwritePartitionsDynamic = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here. Should the parameter names be consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my other comment. This signals that this corresponds to the behavior of DataFrames.
Also needs to pass tests and style of course. |
* Overwriting data by filter will delete any data that matches the filter and replace it with data | ||
* that is committed in the write. | ||
*/ | ||
public interface SupportsOverwrite extends WriteBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me of the SQL MERGE command:
https://en.wikipedia.org/wiki/Merge_(SQL)
https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?view=sql-server-2017
SQL has 3 standard simple date changing commands: INSERT, UPDATE, DELETE.
INSERT is Append in DS v2.
UPDATE and DELETE are special. They do not have a source table, they just change the data of the target table directly. I'd say they are not data writing, and we should exclude them from ds v2 write API.
MERGE(also called UPSERT) is a more powerful version of INSERT. It uses the source table to update the target table. For each row in the target table, find the matched row in the source table w.r.t. a condition, and update the row in the target table according to the matched row.
This SupportsOverwrite
here looks like just a DELETE + INSERT, and I don't know if there is any SQL command has the same semantic. Can you give some use cases of this overwrite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed for a couple use cases:
INSERT OVERWRITE ... PARTITION ...
with static partitions in static overwrite mode, for compatibility with existing queries.- Situations where users would currently call
INSERT OVERWRITE
, but should be explicit.
This operation is a combined delete and replace, ReplaceData in the SPIP. This is currently implemented by the existing INSERT OVERWRITE
statement.
INSERT OVERWRITE
(as defined by Hive) is dynamic so what gets deleted is implicit. Implicit deletes lead to confusion, so it would be better to move users to patterns that explicitly replace data.
An example of such a pattern is an hourly job that is idempotent and produces a summary for a day. Each hour, it overwrites the summary from the last hour, until the day is finished and the last summary is left. We are moving users to structure this query to explicitly overwrite using an expression, which we can also use to warn if the user writes data that wouldn't be replaced by running the query again.
A second use for this is static overwrite mode, which is a Spark-only behavior. In static mode, Spark will drop any matching static partitions and then insert data. In that case, we can't use dynamic overwrite because it wouldn't delete all of the static partitions. Instead, we translate the static partitions to an overwrite expression. So using this, we can implement all overwrite behaviors using the v2 API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see, make senses. So this SupportsOverwrite
is mostly for partitioned tables, though it doesn't mention partition at all. I think the semantic can also apply to non-partitioned tables but that will be very hard to implement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW for a normal INSERT OVERWRITE, which needs to truncate the entire table, the filters
will be a single true literal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the semantic can also apply to non-partitioned tables but that will be very hard to implement.
Not necessarily. JDBC sources can implement this fairly easily, and those that support transactions can make it an atomic operation. There are also strategies that can work for unpartitioned tables, like deleting data files using min/max ranges show all rows are matched by a filter.
BTW for a normal INSERT OVERWRITE, which needs to truncate the entire table, the filters will be a single true literal?
I think that truncate is slightly different. Because it is fairly easy to support truncate, but not overwrite by expression, I think that truncate should be a separate operation in the v2 API. I would make SupportsOverwrite
implement SupportsTruncate
with a default that calls overwrite with true
like you suggest, but I think we will need to add a true
filter.
Also, what do you mean by "normal INSERT OVERWRITE"? What operation is that? Right now, INSERT OVERWRITE
is effectively dynamic partition overwrite. Unpartitioned tables are truncated because they have just one "partition". Do you agree with that summary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unpartitioned tables are truncated because they have just one "partition"
If we go with this definition, do you mean data sources that do not support partition must implement SupportsDynamicOverwrite
to support INSERT OVERWRITE
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, I've added SupportsTruncate
to address this. Now tables can support truncation to implement INSERT OVERWRITE
that replaces all data in the table.
I also ran tests to find out how file system data sources behave with overwrite mode in DataFrameWriter
. That showed that tables are always truncated and the value of spark.sql.sources.partitionOverwriteMode
is always ignored.
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
Outdated
Show resolved
Hide resolved
Retest this please. |
Test build #101603 has finished for PR 23606 at commit
|
@@ -2245,6 +2245,26 @@ class Analyzer( | |||
} else { | |||
append | |||
} | |||
|
|||
case overwrite @ OverwriteByExpression(table, _, query, isByName) | |||
if table.resolved && query.resolved && !overwrite.resolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should this be indented? 4 spaces from the start of case
?
} | ||
|
||
case overwrite @ OverwritePartitionsDynamic(table, query, isByName) | ||
if table.resolved && query.resolved && !overwrite.resolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
// fail if any filter cannot be converted. correctness depends on removing all matching data. | ||
val filters = splitConjunctivePredicates(deleteExpr).map { | ||
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( | ||
throw new SparkException(s"Cannot translate expression to source filter: $filter")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, AnalysisException
is clearer than SparkException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conversion to physical plan happens after the analysis is finished, so I think the right exception to use is SparkException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue .
I know the logic, but we already use AnalysisException
in this file. It's based on the characteristic of the exception instead of the location. Please see what you removed. For me, SparkException
is just too broad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, what is the correct exception to use during conversion to a physical plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AnalysisException
is not currently referenced in DataSourceV2Strategy
. I also checked DataSourceStrategy
and that does not throw AnalysisException
except in the DataSourceAnalysis
rules that are run by the analyzer.
An alternative is to consider this a failure to produce a plan and return Nil
. But that would result in an unhelpful assertion failure, which is worse than a specific SparkException
.
I think this is doing the right thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that AnalysisException
is not proper here, as this is not the analysis phase. But SparkException
is worse. SparkException
should be used at executor side, which indicates a Spark job is launched.
One thing we can do is, do this conversion at analysis phase. i.e. the logical plan OverwriteByExpression
should take Array[Filter]
as well. However, the downside is, for advanced users that touch catalyst rules directly, they are not able to support arbitrary expressions.
So I think AnalysisException
is OK here, as this is something we should do at the analysis phase, but we don't for other reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change this to AnalysisException
.
|
||
override protected def doExecute(): RDD[InternalRow] = { | ||
val batchWrite = newWriteBuilder() match { | ||
case builder: SupportsSaveMode => // TODO: Remove this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we have three TODOs in this PR at the different places. In this case, shall we create SPARK issue and use the ID here explicitly? I guess that all three TODO will have the same SPARK issue ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wenchen already opened an issue for this: SPARK-26356
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That's what I requested. We had better have ID
ed TODOs.
- // TODO: Remove this
+ // TODO(SPARK-26356): Remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm removing the comments. If SupportsSaveMode is no longer defined, then this has to be removed anyway. The comment doesn't help.
builder.mode(SaveMode.Overwrite).buildForBatch() | ||
|
||
case _ => | ||
throw new SparkException(s"Table does not support dynamic partition overwrite: $table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AnalysisException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other physical plans throw SparkException. I thought AnalysisException was just for query analysis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is definitely a case where it is too late to throw an analysis exception. This will be checked by a new analysis rule when capabilities are added.
case SaveMode.Overwrite => | ||
// truncate the table | ||
runCommand(df.sparkSession, "save") { | ||
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile, I verified that file system sources truncate in all cases when mode is overwrite. I've updated this to always truncate as you requested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> filter = Literal(true)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how shall we indicate a truncate? using true literal as the overwrite expression works, another option is to use Nil
. No overwrite expressions can also mean truncating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, I think that Literal(true)
is better than using Nil
. This translates directly to an AlwaysTrue
filter, which has obvious behavior: DELETE FROM table WHERE true
.
Using Nil
or no filters to signal that all rows should be deleted is not obvious and requires more documentation for the contract with implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we already have this contract. e.g. INSERT OVERWRITE tbl SELECT ...
without WHERE will truncate the table and write new data. If the contract is here for end-users, I think it's OK for data source developers as well.
Anyway I think this shouldn't block this PR and we can discuss it later.
@cloud-fan, I've added analysis tests for the new logical plans. Existing tests also validate that the new overwrite plan has the same behavior as using WriteToDataSourceV2. Further tests depend on introducing a catalog API because dynamic and static overwrites other than truncate are only available through the SQL API. And we can't translate SQL into v2 plans until we can update the table resolution rule. |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
This comment has been minimized.
This comment has been minimized.
6bbe754
to
6047f65
Compare
Test build #101735 has finished for PR 23606 at commit
|
Test build #101737 has finished for PR 23606 at commit
|
@cloud-fan, I've added the missing tests to this PR and tests are passing. I think it is ready for a final review. Thanks! |
6047f65
to
ab9ed01
Compare
Test build #101848 has finished for PR 23606 at commit
|
@rxin, if you have time, could you review this? |
/** | ||
* A filter that always evaluates to `true`. | ||
*/ | ||
case class AlwaysTrue() extends Filter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it's a case class instead of object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an object that extends it just below. I thought it was still a good idea to match the pattern of the other filters and make a class as well as an object, since this will be used by Java code. Checking instanceof AlwaysTrue$
or == AlwaysTrue$.MODULE$
leaks Scala details into Java.
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java
Show resolved
Hide resolved
builder.mode(SaveMode.Overwrite).buildForBatch() | ||
|
||
case _ => | ||
throw new SparkException(s"Table does not support dynamic partition overwrite: $table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should fail earlier, at analysis time, or at least planning time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this as a follow-up? I think this check should use capabilities and I don't want to add that API in this commit. Capabilities will be my next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, SGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened SPARK-26811 to track earlier analysis exceptions.
Test build #102317 has finished for PR 23606 at commit
|
Test build #102326 has finished for PR 23606 at commit
|
This ensures that the query and table columns for a write are resolved before expressions are resolved so that expressions are resolved using field names from the table, not the query. This also reverts moving ResolveOutputRelation because rule order no longer matters.
fa1fb72
to
d67ad46
Compare
Test build #102360 has finished for PR 23606 at commit
|
LGTM except one comment, thanks for working on it! |
BTW it would be good to add end-to-end tests, but since we don't have any implementation yet, I think it's fine to delay it when we fully migrate file source to v2 API. |
Test build #102399 has finished for PR 23606 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
thanks, merging to master! |
…rite. ## What changes were proposed in this pull request? This adds two logical plans that implement the ReplaceData operation from the [logical plans SPIP](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d). These two plans will be used to implement Spark's `INSERT OVERWRITE` behavior for v2. Specific changes: * Add `SupportsTruncate`, `SupportsOverwrite`, and `SupportsDynamicOverwrite` to DSv2 write API * Add `OverwriteByExpression` and `OverwritePartitionsDynamic` plans (logical and physical) * Add new plans to DSv2 write validation rule `ResolveOutputRelation` * Refactor `WriteToDataSourceV2Exec` into trait used by all DSv2 write exec nodes ## How was this patch tested? * The v2 analysis suite has been updated to validate the new overwrite plans * The analysis suite for `OverwriteByExpression` checks that the delete expression is resolved using the table's columns * Existing tests validate that overwrite exec plan works * Updated existing v2 test because schema is used to validate overwrite Closes apache#23606 from rdblue/SPARK-26666-add-overwrite. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
case SaveMode.Overwrite => | ||
// truncate the table | ||
runCommand(df.sparkSession, "save") { | ||
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if I missed some discussions here but why did we again end up with reading schema here again, for instance via:
at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:85)
at org.apache.spark.sql.execution.datasources.v2.orc.OrcTable.inferSchema(OrcTable.scala:38)
at org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$1(FileTable.scala:45)
at scala.Option.orElse(Option.scala:306)
at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:45)
at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:44)
at org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:53)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:100)
in writing code path? I thought we agreed upon reading it only when append mode. Did I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two tests are being failed by this (4dce45a and this were independently merged at similar time range so the tests didn't catch, which is fine - it happens time to time) :
org.apache.spark.sql.FileBasedDataSourceSuite.SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc
org.apache.spark.sql.FileBasedDataSourceSuite.SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc
ScalaTestFailureLocation: org.apache.spark.sql.FileBasedDataSourceSuite at (FileBasedDataSourceSuite.scala:349)
org.scalatest.exceptions.TestFailedException: Expected exception org.apache.spark.sql.AnalysisException to be thrown, but org.apache.spark.SparkException was thrown
at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528)
at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527)
at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
at org.scalatest.Assertions.intercept(Assertions.scala:812)
at org.scalatest.Assertions.intercept$(Assertions.scala:802)
at org.scalatest.FunSuite.intercept(FunSuite.scala:1560)
at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$69(FileBasedDataSourceSuite.scala:349)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$68(FileBasedDataSourceSuite.scala:348)
at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:47)
at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:31)
at org.apache.spark.sql.FileBasedDataSourceSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(FileBasedDataSourceSuite.scala:37)
at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:230)
at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:228)
at org.apache.spark.sql.FileBasedDataSourceSuite.withSQLConf(FileBasedDataSourceSuite.scala:37)
at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$67(FileBasedDataSourceSuite.scala:346)
at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$67$adapted(FileBasedDataSourceSuite.scala:332)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$66(FileBasedDataSourceSuite.scala:332)
at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$66$adapted(FileBasedDataSourceSuite.scala:330)
at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1(SQLTestUtils.scala:75)
at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1$adapted(SQLTestUtils.scala:74)
at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:116)
at org.apache.spark.sql.FileBasedDataSourceSuite.org$apache$spark$sql$test$SQLTestUtils$$super$withTempDir(FileBasedDataSourceSuite.scala:37)
at org.apache.spark.sql.test.SQLTestUtils.withTempDir(SQLTestUtils.scala:74)
at org.apache.spark.sql.test.SQLTestUtils.withTempDir$(SQLTestUtils.scala:73)
at org.apache.spark.sql.FileBasedDataSourceSuite.withTempDir(FileBasedDataSourceSuite.scala:37)
at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$65(FileBasedDataSourceSuite.scala:330)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
at org.apache.spark.sql.FileBasedDataSourceSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(FileBasedDataSourceSuite.scala:37)
at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
at org.apache.spark.sql.FileBasedDataSourceSuite.runTest(FileBasedDataSourceSuite.scala:37)
at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite.run(Suite.scala:1147)
at org.scalatest.Suite.run$(Suite.scala:1129)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:53)
at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:53)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1340)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1334)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
Caused by: org.apache.spark.SparkException: Could not read footer for file: file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-77cc900f-2c11-4961-a704-8f013e7390b6/files/part-00001-7a740eab-e3b0-4354-83ba-2e3c0423aeb2-c000.csv
at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:75)
at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.$anonfun$readSchema$2(OrcUtils.scala:85)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.TraversableOnce.collectFirst(TraversableOnce.scala:148)
at scala.collection.TraversableOnce.collectFirst$(TraversableOnce.scala:135)
at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1429)
at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:85)
at org.apache.spark.sql.execution.datasources.v2.orc.OrcTable.inferSchema(OrcTable.scala:38)
at org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$1(FileTable.scala:45)
at scala.Option.orElse(Option.scala:306)
at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:45)
at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:44)
at org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:53)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:100)
at org.apache.spark.sql.DataFrameWriter.relation$lzycompute$1(DataFrameWriter.scala:268)
at org.apache.spark.sql.DataFrameWriter.relation$1(DataFrameWriter.scala:268)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:278)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$70(FileBasedDataSourceSuite.scala:350)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.scalatest.Assertions.intercept(Assertions.scala:805)
... 75 more
Caused by: org.apache.orc.FileFormatException: Not a valid ORC file file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-77cc900f-2c11-4961-a704-8f013e7390b6/files/part-00001-7a740eab-e3b0-4354-83ba-2e3c0423aeb2-c000.csv (maxFileLength= 9223372036854775807)
at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:546)
at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:370)
at org.apache.orc.OrcFile.createReader(OrcFile.java:342)
at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:62)
... 95 more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's weird, orc v2 is disabled in this PR.
Let me try these 2 test locally, and revert this PR if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, thanks. I just opened a PR to disable the tests themselves. I will close mine if there's another fix soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, looks the test explicitly sets spark.sql.sources.write.useV1SourceList
.
…ts for FileDataSourceV2 (partially revert ) ## What changes were proposed in this pull request? This PR partially revert SPARK-26744. 60caa92 and 4dce45a were merged at similar time range independently. So the test failures were not caught. - 60caa92 happened to add a schema reading logic in writing path for overwrite mode as well. - 4dce45a added some tests with overwrite modes with migrated ORC v2. And the tests looks starting to fail. I guess the discussion won't be short (see #23606 (comment)) and this PR proposes to disable the tests added at 4dce45a to unblock other PRs for now. ## How was this patch tested? Existing tests. Closes #23828 from HyukjinKwon/SPARK-26744. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…rite. ## What changes were proposed in this pull request? This adds two logical plans that implement the ReplaceData operation from the [logical plans SPIP](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d). These two plans will be used to implement Spark's `INSERT OVERWRITE` behavior for v2. Specific changes: * Add `SupportsTruncate`, `SupportsOverwrite`, and `SupportsDynamicOverwrite` to DSv2 write API * Add `OverwriteByExpression` and `OverwritePartitionsDynamic` plans (logical and physical) * Add new plans to DSv2 write validation rule `ResolveOutputRelation` * Refactor `WriteToDataSourceV2Exec` into trait used by all DSv2 write exec nodes ## How was this patch tested? * The v2 analysis suite has been updated to validate the new overwrite plans * The analysis suite for `OverwriteByExpression` checks that the delete expression is resolved using the table's columns * Existing tests validate that overwrite exec plan works * Updated existing v2 test because schema is used to validate overwrite Closes apache#23606 from rdblue/SPARK-26666-add-overwrite. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ts for FileDataSourceV2 (partially revert ) ## What changes were proposed in this pull request? This PR partially revert SPARK-26744. apache@60caa92 and apache@4dce45a were merged at similar time range independently. So the test failures were not caught. - apache@60caa92 happened to add a schema reading logic in writing path for overwrite mode as well. - apache@4dce45a added some tests with overwrite modes with migrated ORC v2. And the tests looks starting to fail. I guess the discussion won't be short (see apache#23606 (comment)) and this PR proposes to disable the tests added at apache@4dce45a to unblock other PRs for now. ## How was this patch tested? Existing tests. Closes apache#23828 from HyukjinKwon/SPARK-26744. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Added the `sealed` keyword to the `Filter` class ### Why are the changes needed? To do not miss handling of new filters in a datasource in the future. For example, `AlwaysTrue` and `AlwaysFalse` were added recently by #23606 ### Does this PR introduce any user-facing change? Should not. ### How was this patch tested? By existing tests. Closes #26950 from MaxGekk/sealed-filter. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This adds two logical plans that implement the ReplaceData operation from the logical plans SPIP. These two plans will be used to implement Spark's
INSERT OVERWRITE
behavior for v2.Specific changes:
SupportsTruncate
,SupportsOverwrite
, andSupportsDynamicOverwrite
to DSv2 write APIOverwriteByExpression
andOverwritePartitionsDynamic
plans (logical and physical)ResolveOutputRelation
WriteToDataSourceV2Exec
into trait used by all DSv2 write exec nodesHow was this patch tested?
OverwriteByExpression
checks that the delete expression is resolved using the table's columns