-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26666][SQL] Support DSv2 overwrite and dynamic partition overwrite. #23606
Changes from all commits
8634911
aca75ae
8993f19
cc5fdac
d692754
84d01ab
c47575e
d67ad46
0e42cc2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -365,16 +365,17 @@ case class Join( | |
} | ||
|
||
/** | ||
* Append data to an existing table. | ||
* Base trait for DataSourceV2 write commands | ||
*/ | ||
case class AppendData( | ||
table: NamedRelation, | ||
query: LogicalPlan, | ||
isByName: Boolean) extends LogicalPlan { | ||
trait V2WriteCommand extends Command { | ||
def table: NamedRelation | ||
def query: LogicalPlan | ||
|
||
override def children: Seq[LogicalPlan] = Seq(query) | ||
override def output: Seq[Attribute] = Seq.empty | ||
|
||
override lazy val resolved: Boolean = { | ||
override lazy val resolved: Boolean = outputResolved | ||
|
||
def outputResolved: Boolean = { | ||
table.resolved && query.resolved && query.output.size == table.output.size && | ||
query.output.zip(table.output).forall { | ||
case (inAttr, outAttr) => | ||
|
@@ -386,16 +387,66 @@ case class AppendData( | |
} | ||
} | ||
|
||
/** | ||
* Append data to an existing table. | ||
*/ | ||
case class AppendData( | ||
table: NamedRelation, | ||
query: LogicalPlan, | ||
isByName: Boolean) extends V2WriteCommand | ||
|
||
object AppendData { | ||
def byName(table: NamedRelation, df: LogicalPlan): AppendData = { | ||
new AppendData(table, df, true) | ||
new AppendData(table, df, isByName = true) | ||
} | ||
|
||
def byPosition(table: NamedRelation, query: LogicalPlan): AppendData = { | ||
new AppendData(table, query, false) | ||
new AppendData(table, query, isByName = false) | ||
} | ||
} | ||
|
||
/** | ||
* Overwrite data matching a filter in an existing table. | ||
*/ | ||
case class OverwriteByExpression( | ||
table: NamedRelation, | ||
deleteExpr: Expression, | ||
query: LogicalPlan, | ||
isByName: Boolean) extends V2WriteCommand { | ||
override lazy val resolved: Boolean = outputResolved && deleteExpr.resolved | ||
} | ||
|
||
object OverwriteByExpression { | ||
def byName( | ||
table: NamedRelation, df: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = { | ||
OverwriteByExpression(table, deleteExpr, df, isByName = true) | ||
} | ||
|
||
def byPosition( | ||
table: NamedRelation, query: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = { | ||
OverwriteByExpression(table, deleteExpr, query, isByName = false) | ||
} | ||
} | ||
|
||
/** | ||
* Dynamically overwrite partitions in an existing table. | ||
*/ | ||
case class OverwritePartitionsDynamic( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Users are unable to generate this plan via DF writer APIs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, remember that the DataFrameWriter API is ambiguous, partly because it uses SaveMode and partly because it isn't clear what "save" or "saveAsTable" mean. A better plan is to introduce an API that has obvious behavior, like the one proposed in the logical plans SPIP. In the short term, this plan will be available from SQL because it implements one of the modes for |
||
table: NamedRelation, | ||
query: LogicalPlan, | ||
isByName: Boolean) extends V2WriteCommand | ||
|
||
object OverwritePartitionsDynamic { | ||
def byName(table: NamedRelation, df: LogicalPlan): OverwritePartitionsDynamic = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly here. Should the parameter names be consistent? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my other comment. This signals that this corresponds to the behavior of DataFrames. |
||
OverwritePartitionsDynamic(table, df, isByName = true) | ||
} | ||
|
||
def byPosition(table: NamedRelation, query: LogicalPlan): OverwritePartitionsDynamic = { | ||
OverwritePartitionsDynamic(table, query, isByName = false) | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the | ||
* concrete implementations during analysis. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1452,7 +1452,7 @@ object SQLConf { | |
" register class names for which data source V2 write paths are disabled. Writes from these" + | ||
" sources will fall back to the V1 sources.") | ||
.stringConf | ||
.createWithDefault("") | ||
.createWithDefault("orc") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gengliangwang @cloud-fan Please discuss it with @rdblue and see whether we can keep using data source v2 API even if the table catalog is not ready. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is required. We cannot move sources to v2 by default until we are confident that they will behave the same way. We can't do that until v2 is actually implemented. |
||
|
||
val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") | ||
.doc("A comma-separated list of fully qualified data source register class names for which" + | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason it's
df
here andquery
inbyPosition
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The DataFrame API uses
byName
, while the SQL path usesbyPosition
.