Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26666][SQL] Support DSv2 overwrite and dynamic partition overwrite. #23606

Closed
wants to merge 9 commits into from

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Jan 21, 2019

What changes were proposed in this pull request?

This adds two logical plans that implement the ReplaceData operation from the logical plans SPIP. These two plans will be used to implement Spark's INSERT OVERWRITE behavior for v2.

Specific changes:

  • Add SupportsTruncate, SupportsOverwrite, and SupportsDynamicOverwrite to DSv2 write API
  • Add OverwriteByExpression and OverwritePartitionsDynamic plans (logical and physical)
  • Add new plans to DSv2 write validation rule ResolveOutputRelation
  • Refactor WriteToDataSourceV2Exec into trait used by all DSv2 write exec nodes

How was this patch tested?

  • The v2 analysis suite has been updated to validate the new overwrite plans
  • The analysis suite for OverwriteByExpression checks that the delete expression is resolved using the table's columns
  • Existing tests validate that overwrite exec plan works
  • Updated existing v2 test because schema is used to validate overwrite

@rdblue
Copy link
Contributor Author

rdblue commented Jan 21, 2019

@mccheah, @felixcheung, and @cloud-fan, can you review this? It adds overwrite plans for DSv2.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

Copy link
Contributor

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see very many concerns, overall this looks very good. Had a minor comment about a var name.


object OverwriteByExpression {
def byName(
table: NamedRelation, df: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason it's df here and query in byPosition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The DataFrame API uses byName, while the SQL path uses byPosition.

isByName: Boolean) extends V2WriteCommand

object OverwritePartitionsDynamic {
def byName(table: NamedRelation, df: LogicalPlan): OverwritePartitionsDynamic = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here. Should the parameter names be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment. This signals that this corresponds to the behavior of DataFrames.

@mccheah
Copy link
Contributor

mccheah commented Jan 22, 2019

Also needs to pass tests and style of course.

* Overwriting data by filter will delete any data that matches the filter and replace it with data
* that is committed in the write.
*/
public interface SupportsOverwrite extends WriteBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of the SQL MERGE command:
https://en.wikipedia.org/wiki/Merge_(SQL)
https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?view=sql-server-2017

SQL has 3 standard simple date changing commands: INSERT, UPDATE, DELETE.
INSERT is Append in DS v2.
UPDATE and DELETE are special. They do not have a source table, they just change the data of the target table directly. I'd say they are not data writing, and we should exclude them from ds v2 write API.

MERGE(also called UPSERT) is a more powerful version of INSERT. It uses the source table to update the target table. For each row in the target table, find the matched row in the source table w.r.t. a condition, and update the row in the target table according to the matched row.

This SupportsOverwrite here looks like just a DELETE + INSERT, and I don't know if there is any SQL command has the same semantic. Can you give some use cases of this overwrite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for a couple use cases:

  1. INSERT OVERWRITE ... PARTITION ... with static partitions in static overwrite mode, for compatibility with existing queries.
  2. Situations where users would currently call INSERT OVERWRITE, but should be explicit.

This operation is a combined delete and replace, ReplaceData in the SPIP. This is currently implemented by the existing INSERT OVERWRITE statement.

INSERT OVERWRITE (as defined by Hive) is dynamic so what gets deleted is implicit. Implicit deletes lead to confusion, so it would be better to move users to patterns that explicitly replace data.

An example of such a pattern is an hourly job that is idempotent and produces a summary for a day. Each hour, it overwrites the summary from the last hour, until the day is finished and the last summary is left. We are moving users to structure this query to explicitly overwrite using an expression, which we can also use to warn if the user writes data that wouldn't be replaced by running the query again.

A second use for this is static overwrite mode, which is a Spark-only behavior. In static mode, Spark will drop any matching static partitions and then insert data. In that case, we can't use dynamic overwrite because it wouldn't delete all of the static partitions. Instead, we translate the static partitions to an overwrite expression. So using this, we can implement all overwrite behaviors using the v2 API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, make senses. So this SupportsOverwrite is mostly for partitioned tables, though it doesn't mention partition at all. I think the semantic can also apply to non-partitioned tables but that will be very hard to implement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW for a normal INSERT OVERWRITE, which needs to truncate the entire table, the filters will be a single true literal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the semantic can also apply to non-partitioned tables but that will be very hard to implement.

Not necessarily. JDBC sources can implement this fairly easily, and those that support transactions can make it an atomic operation. There are also strategies that can work for unpartitioned tables, like deleting data files using min/max ranges show all rows are matched by a filter.

BTW for a normal INSERT OVERWRITE, which needs to truncate the entire table, the filters will be a single true literal?

I think that truncate is slightly different. Because it is fairly easy to support truncate, but not overwrite by expression, I think that truncate should be a separate operation in the v2 API. I would make SupportsOverwrite implement SupportsTruncate with a default that calls overwrite with true like you suggest, but I think we will need to add a true filter.

Also, what do you mean by "normal INSERT OVERWRITE"? What operation is that? Right now, INSERT OVERWRITE is effectively dynamic partition overwrite. Unpartitioned tables are truncated because they have just one "partition". Do you agree with that summary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unpartitioned tables are truncated because they have just one "partition"

If we go with this definition, do you mean data sources that do not support partition must implement SupportsDynamicOverwrite to support INSERT OVERWRITE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, I've added SupportsTruncate to address this. Now tables can support truncation to implement INSERT OVERWRITE that replaces all data in the table.

I also ran tests to find out how file system data sources behave with overwrite mode in DataFrameWriter. That showed that tables are always truncated and the value of spark.sql.sources.partitionOverwriteMode is always ignored.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 23, 2019

Retest this please.

@SparkQA
Copy link

SparkQA commented Jan 23, 2019

Test build #101603 has finished for PR 23606 at commit 233d5c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AppendData(
  • case class OverwriteByExpression(
  • case class OverwritePartitionsDynamic(
  • implicit class TableHelper(table: Table)
  • implicit class OptionsHelper(options: Map[String, String])
  • case class AppendDataExec(
  • case class OverwriteByExpressionExec(
  • case class OverwritePartitionsDynamicExec(
  • case class WriteToDataSourceV2Exec(

@@ -2245,6 +2245,26 @@ class Analyzer(
} else {
append
}

case overwrite @ OverwriteByExpression(table, _, query, isByName)
if table.resolved && query.resolved && !overwrite.resolved =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should this be indented? 4 spaces from the start of case?

}

case overwrite @ OverwritePartitionsDynamic(table, query, isByName)
if table.resolved && query.resolved && !overwrite.resolved =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(deleteExpr).map {
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
throw new SparkException(s"Cannot translate expression to source filter: $filter"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, AnalysisException is clearer than SparkException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conversion to physical plan happens after the analysis is finished, so I think the right exception to use is SparkException.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue .
I know the logic, but we already use AnalysisException in this file. It's based on the characteristic of the exception instead of the location. Please see what you removed. For me, SparkException is just too broad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, what is the correct exception to use during conversion to a physical plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalysisException is not currently referenced in DataSourceV2Strategy. I also checked DataSourceStrategy and that does not throw AnalysisException except in the DataSourceAnalysis rules that are run by the analyzer.

An alternative is to consider this a failure to produce a plan and return Nil. But that would result in an unhelpful assertion failure, which is worse than a specific SparkException.

I think this is doing the right thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that AnalysisException is not proper here, as this is not the analysis phase. But SparkException is worse. SparkException should be used at executor side, which indicates a Spark job is launched.

One thing we can do is, do this conversion at analysis phase. i.e. the logical plan OverwriteByExpression should take Array[Filter] as well. However, the downside is, for advanced users that touch catalyst rules directly, they are not able to support arbitrary expressions.

So I think AnalysisException is OK here, as this is something we should do at the analysis phase, but we don't for other reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change this to AnalysisException.


override protected def doExecute(): RDD[InternalRow] = {
val batchWrite = newWriteBuilder() match {
case builder: SupportsSaveMode => // TODO: Remove this
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we have three TODOs in this PR at the different places. In this case, shall we create SPARK issue and use the ID here explicitly? I guess that all three TODO will have the same SPARK issue ID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wenchen already opened an issue for this: SPARK-26356

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's what I requested. We had better have IDed TODOs.

- // TODO: Remove this
+ // TODO(SPARK-26356): Remove this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm removing the comments. If SupportsSaveMode is no longer defined, then this has to be removed anyway. The comment doesn't help.

builder.mode(SaveMode.Overwrite).buildForBatch()

case _ =>
throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalysisException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other physical plans throw SparkException. I thought AnalysisException was just for query analysis?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely a case where it is too late to throw an analysis exception. This will be checked by a new analysis rule when capabilities are added.

case SaveMode.Overwrite =>
// truncate the table
runCommand(df.sparkSession, "save") {
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile, I verified that file system sources truncate in all cases when mode is overwrite. I've updated this to always truncate as you requested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> filter = Literal(true).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how shall we indicate a truncate? using true literal as the overwrite expression works, another option is to use Nil. No overwrite expressions can also mean truncating.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, I think that Literal(true) is better than using Nil. This translates directly to an AlwaysTrue filter, which has obvious behavior: DELETE FROM table WHERE true.

Using Nil or no filters to signal that all rows should be deleted is not obvious and requires more documentation for the contract with implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we already have this contract. e.g. INSERT OVERWRITE tbl SELECT ... without WHERE will truncate the table and write new data. If the contract is here for end-users, I think it's OK for data source developers as well.

Anyway I think this shouldn't block this PR and we can discuss it later.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 27, 2019

@cloud-fan, I've added analysis tests for the new logical plans. Existing tests also validate that the new overwrite plan has the same behavior as using WriteToDataSourceV2. Further tests depend on introducing a catalog API because dynamic and static overwrites other than truncate are only available through the SQL API. And we can't translate SQL into v2 plans until we can update the table resolution rule.

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Jan 28, 2019

Test build #101735 has finished for PR 23606 at commit 013513b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AlwaysTrue() extends Filter
  • case class AlwaysFalse() extends Filter

@SparkQA
Copy link

SparkQA commented Jan 28, 2019

Test build #101737 has finished for PR 23606 at commit 6047f65.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait V2WriteCommand extends Command
  • trait BatchWriteHelper
  • trait V2TableWriteExec extends UnaryExecNode

@rdblue
Copy link
Contributor Author

rdblue commented Jan 28, 2019

@cloud-fan, I've added the missing tests to this PR and tests are passing. I think it is ready for a final review. Thanks!

@SparkQA
Copy link

SparkQA commented Jan 30, 2019

Test build #101848 has finished for PR 23606 at commit ab9ed01.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait V2WriteCommand extends Command
  • trait BatchWriteHelper
  • trait V2TableWriteExec extends UnaryExecNode

@rdblue
Copy link
Contributor Author

rdblue commented Jan 31, 2019

@rxin, if you have time, could you review this?

/**
* A filter that always evaluates to `true`.
*/
case class AlwaysTrue() extends Filter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's a case class instead of object?

Copy link
Contributor Author

@rdblue rdblue Feb 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an object that extends it just below. I thought it was still a good idea to match the pattern of the other filters and make a class as well as an object, since this will be used by Java code. Checking instanceof AlwaysTrue$ or == AlwaysTrue$.MODULE$ leaks Scala details into Java.

builder.mode(SaveMode.Overwrite).buildForBatch()

case _ =>
throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should fail earlier, at analysis time, or at least planning time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this as a follow-up? I think this check should use capabilities and I don't want to add that API in this commit. Capabilities will be my next PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, SGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened SPARK-26811 to track earlier analysis exceptions.

@SparkQA
Copy link

SparkQA commented Feb 14, 2019

Test build #102317 has finished for PR 23606 at commit bd743d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 14, 2019

Test build #102326 has finished for PR 23606 at commit fa1fb72.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 14, 2019

Test build #102360 has finished for PR 23606 at commit d67ad46.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Feb 15, 2019

LGTM except one comment, thanks for working on it!

@cloud-fan
Copy link
Contributor

BTW it would be good to add end-to-end tests, but since we don't have any implementation yet, I think it's fine to delay it when we fully migrate file source to v2 API.

@SparkQA
Copy link

SparkQA commented Feb 15, 2019

Test build #102399 has finished for PR 23606 at commit 0e42cc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 60caa92 Feb 18, 2019
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…rite.

## What changes were proposed in this pull request?

This adds two logical plans that implement the ReplaceData operation from the [logical plans SPIP](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d). These two plans will be used to implement Spark's `INSERT OVERWRITE` behavior for v2.

Specific changes:
* Add `SupportsTruncate`, `SupportsOverwrite`, and `SupportsDynamicOverwrite` to DSv2 write API
* Add `OverwriteByExpression` and `OverwritePartitionsDynamic` plans (logical and physical)
* Add new plans to DSv2 write validation rule `ResolveOutputRelation`
* Refactor `WriteToDataSourceV2Exec` into trait used by all DSv2 write exec nodes

## How was this patch tested?

* The v2 analysis suite has been updated to validate the new overwrite plans
* The analysis suite for `OverwriteByExpression` checks that the delete expression is resolved using the table's columns
* Existing tests validate that overwrite exec plan works
* Updated existing v2 test because schema is used to validate overwrite

Closes apache#23606 from rdblue/SPARK-26666-add-overwrite.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
case SaveMode.Overwrite =>
// truncate the table
runCommand(df.sparkSession, "save") {
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true))
Copy link
Member

@HyukjinKwon HyukjinKwon Feb 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I missed some discussions here but why did we again end up with reading schema here again, for instance via:

	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:85)
	at org.apache.spark.sql.execution.datasources.v2.orc.OrcTable.inferSchema(OrcTable.scala:38)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$1(FileTable.scala:45)
	at scala.Option.orElse(Option.scala:306)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:45)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:44)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:53)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:100)

in writing code path? I thought we agreed upon reading it only when append mode. Did I miss something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two tests are being failed by this (4dce45a and this were independently merged at similar time range so the tests didn't catch, which is fine - it happens time to time) :

org.apache.spark.sql.FileBasedDataSourceSuite.SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc
org.apache.spark.sql.FileBasedDataSourceSuite.SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc
ScalaTestFailureLocation: org.apache.spark.sql.FileBasedDataSourceSuite at (FileBasedDataSourceSuite.scala:349)
org.scalatest.exceptions.TestFailedException: Expected exception org.apache.spark.sql.AnalysisException to be thrown, but org.apache.spark.SparkException was thrown
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions.intercept(Assertions.scala:812)
	at org.scalatest.Assertions.intercept$(Assertions.scala:802)
	at org.scalatest.FunSuite.intercept(FunSuite.scala:1560)
	at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$69(FileBasedDataSourceSuite.scala:349)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$68(FileBasedDataSourceSuite.scala:348)
	at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:47)
	at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:31)
	at org.apache.spark.sql.FileBasedDataSourceSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(FileBasedDataSourceSuite.scala:37)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:230)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:228)
	at org.apache.spark.sql.FileBasedDataSourceSuite.withSQLConf(FileBasedDataSourceSuite.scala:37)
	at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$67(FileBasedDataSourceSuite.scala:346)
	at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$67$adapted(FileBasedDataSourceSuite.scala:332)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$66(FileBasedDataSourceSuite.scala:332)
	at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$66$adapted(FileBasedDataSourceSuite.scala:330)
	at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1(SQLTestUtils.scala:75)
	at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1$adapted(SQLTestUtils.scala:74)
	at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:116)
	at org.apache.spark.sql.FileBasedDataSourceSuite.org$apache$spark$sql$test$SQLTestUtils$$super$withTempDir(FileBasedDataSourceSuite.scala:37)
	at org.apache.spark.sql.test.SQLTestUtils.withTempDir(SQLTestUtils.scala:74)
	at org.apache.spark.sql.test.SQLTestUtils.withTempDir$(SQLTestUtils.scala:73)
	at org.apache.spark.sql.FileBasedDataSourceSuite.withTempDir(FileBasedDataSourceSuite.scala:37)
	at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$65(FileBasedDataSourceSuite.scala:330)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
	at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
	at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
	at org.apache.spark.sql.FileBasedDataSourceSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(FileBasedDataSourceSuite.scala:37)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
	at org.apache.spark.sql.FileBasedDataSourceSuite.runTest(FileBasedDataSourceSuite.scala:37)
	at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite.run(Suite.scala:1147)
	at org.scalatest.Suite.run$(Suite.scala:1129)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:53)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:53)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1340)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1334)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
	at org.scalatest.tools.Runner$.run(Runner.scala:850)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
Caused by: org.apache.spark.SparkException: Could not read footer for file: file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-77cc900f-2c11-4961-a704-8f013e7390b6/files/part-00001-7a740eab-e3b0-4354-83ba-2e3c0423aeb2-c000.csv
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:75)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.$anonfun$readSchema$2(OrcUtils.scala:85)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.TraversableOnce.collectFirst(TraversableOnce.scala:148)
	at scala.collection.TraversableOnce.collectFirst$(TraversableOnce.scala:135)
	at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1429)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:85)
	at org.apache.spark.sql.execution.datasources.v2.orc.OrcTable.inferSchema(OrcTable.scala:38)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$1(FileTable.scala:45)
	at scala.Option.orElse(Option.scala:306)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:45)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:44)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:53)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:100)
	at org.apache.spark.sql.DataFrameWriter.relation$lzycompute$1(DataFrameWriter.scala:268)
	at org.apache.spark.sql.DataFrameWriter.relation$1(DataFrameWriter.scala:268)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:278)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
	at org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$70(FileBasedDataSourceSuite.scala:350)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.Assertions.intercept(Assertions.scala:805)
	... 75 more
Caused by: org.apache.orc.FileFormatException: Not a valid ORC file file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-77cc900f-2c11-4961-a704-8f013e7390b6/files/part-00001-7a740eab-e3b0-4354-83ba-2e3c0423aeb2-c000.csv (maxFileLength= 9223372036854775807)
	at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:546)
	at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:370)
	at org.apache.orc.OrcFile.createReader(OrcFile.java:342)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:62)
	... 95 more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's weird, orc v2 is disabled in this PR.

Let me try these 2 test locally, and revert this PR if needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks. I just opened a PR to disable the tests themselves. I will close mine if there's another fix soon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, looks the test explicitly sets spark.sql.sources.write.useV1SourceList.

cloud-fan pushed a commit that referenced this pull request Feb 18, 2019
…ts for FileDataSourceV2 (partially revert )

## What changes were proposed in this pull request?

This PR partially revert SPARK-26744.

60caa92 and 4dce45a were merged at similar time range independently. So the test failures were not caught.

- 60caa92 happened to add a schema reading logic in writing path for overwrite mode as well.

- 4dce45a added some tests with overwrite modes with migrated ORC v2.

And the tests looks starting to fail.

I guess the discussion won't be short (see #23606 (comment)) and this PR proposes to disable the tests added at 4dce45a to unblock other PRs for now.

## How was this patch tested?

Existing tests.

Closes #23828 from HyukjinKwon/SPARK-26744.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
…rite.

## What changes were proposed in this pull request?

This adds two logical plans that implement the ReplaceData operation from the [logical plans SPIP](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d). These two plans will be used to implement Spark's `INSERT OVERWRITE` behavior for v2.

Specific changes:
* Add `SupportsTruncate`, `SupportsOverwrite`, and `SupportsDynamicOverwrite` to DSv2 write API
* Add `OverwriteByExpression` and `OverwritePartitionsDynamic` plans (logical and physical)
* Add new plans to DSv2 write validation rule `ResolveOutputRelation`
* Refactor `WriteToDataSourceV2Exec` into trait used by all DSv2 write exec nodes

## How was this patch tested?

* The v2 analysis suite has been updated to validate the new overwrite plans
* The analysis suite for `OverwriteByExpression` checks that the delete expression is resolved using the table's columns
* Existing tests validate that overwrite exec plan works
* Updated existing v2 test because schema is used to validate overwrite

Closes apache#23606 from rdblue/SPARK-26666-add-overwrite.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
…ts for FileDataSourceV2 (partially revert )

## What changes were proposed in this pull request?

This PR partially revert SPARK-26744.

apache@60caa92 and apache@4dce45a were merged at similar time range independently. So the test failures were not caught.

- apache@60caa92 happened to add a schema reading logic in writing path for overwrite mode as well.

- apache@4dce45a added some tests with overwrite modes with migrated ORC v2.

And the tests looks starting to fail.

I guess the discussion won't be short (see apache#23606 (comment)) and this PR proposes to disable the tests added at apache@4dce45a to unblock other PRs for now.

## How was this patch tested?

Existing tests.

Closes apache#23828 from HyukjinKwon/SPARK-26744.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit that referenced this pull request Dec 19, 2019
### What changes were proposed in this pull request?
Added the `sealed` keyword to the `Filter` class

### Why are the changes needed?
To do not miss handling of new filters in a datasource in the future. For example, `AlwaysTrue` and `AlwaysFalse` were added recently by #23606

### Does this PR introduce any user-facing change?
Should not.

### How was this patch tested?
By existing tests.

Closes #26950 from MaxGekk/sealed-filter.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants