Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26666][SQL] Support DSv2 overwrite and dynamic partition overwrite. #23606

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,11 @@ class Analyzer(
case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) =>
a.mapExpressions(resolveExpressionTopDown(_, appendColumns))

case o: OverwriteByExpression if !o.outputResolved =>
// do not resolve expression attributes until the query attributes are resolved against the
// table by ResolveOutputRelation. that rule will alias the attributes to the table's names.
o

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
q.mapExpressions(resolveExpressionTopDown(_, q))
Expand Down Expand Up @@ -2246,14 +2251,34 @@ class Analyzer(
object ResolveOutputRelation extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case append @ AppendData(table, query, isByName)
if table.resolved && query.resolved && !append.resolved =>
if table.resolved && query.resolved && !append.outputResolved =>
val projection = resolveOutputColumns(table.name, table.output, query, isByName)

if (projection != query) {
append.copy(query = projection)
} else {
append
}

case overwrite @ OverwriteByExpression(table, _, query, isByName)
if table.resolved && query.resolved && !overwrite.outputResolved =>
val projection = resolveOutputColumns(table.name, table.output, query, isByName)

if (projection != query) {
overwrite.copy(query = projection)
} else {
overwrite
}

case overwrite @ OverwritePartitionsDynamic(table, query, isByName)
if table.resolved && query.resolved && !overwrite.outputResolved =>
val projection = resolveOutputColumns(table.name, table.output, query, isByName)

if (projection != query) {
overwrite.copy(query = projection)
} else {
overwrite
}
}

def resolveOutputColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,16 +365,17 @@ case class Join(
}

/**
* Append data to an existing table.
* Base trait for DataSourceV2 write commands
*/
case class AppendData(
table: NamedRelation,
query: LogicalPlan,
isByName: Boolean) extends LogicalPlan {
trait V2WriteCommand extends Command {
def table: NamedRelation
def query: LogicalPlan

override def children: Seq[LogicalPlan] = Seq(query)
override def output: Seq[Attribute] = Seq.empty

override lazy val resolved: Boolean = {
override lazy val resolved: Boolean = outputResolved

def outputResolved: Boolean = {
table.resolved && query.resolved && query.output.size == table.output.size &&
query.output.zip(table.output).forall {
case (inAttr, outAttr) =>
Expand All @@ -386,16 +387,66 @@ case class AppendData(
}
}

/**
* Append data to an existing table.
*/
case class AppendData(
table: NamedRelation,
query: LogicalPlan,
isByName: Boolean) extends V2WriteCommand

object AppendData {
def byName(table: NamedRelation, df: LogicalPlan): AppendData = {
new AppendData(table, df, true)
new AppendData(table, df, isByName = true)
}

def byPosition(table: NamedRelation, query: LogicalPlan): AppendData = {
new AppendData(table, query, false)
new AppendData(table, query, isByName = false)
}
}

/**
* Overwrite data matching a filter in an existing table.
*/
case class OverwriteByExpression(
table: NamedRelation,
deleteExpr: Expression,
query: LogicalPlan,
isByName: Boolean) extends V2WriteCommand {
override lazy val resolved: Boolean = outputResolved && deleteExpr.resolved
}

object OverwriteByExpression {
def byName(
table: NamedRelation, df: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason it's df here and query in byPosition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The DataFrame API uses byName, while the SQL path uses byPosition.

OverwriteByExpression(table, deleteExpr, df, isByName = true)
}

def byPosition(
table: NamedRelation, query: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = {
OverwriteByExpression(table, deleteExpr, query, isByName = false)
}
}

/**
* Dynamically overwrite partitions in an existing table.
*/
case class OverwritePartitionsDynamic(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users are unable to generate this plan via DF writer APIs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, remember that the DataFrameWriter API is ambiguous, partly because it uses SaveMode and partly because it isn't clear what "save" or "saveAsTable" mean. A better plan is to introduce an API that has obvious behavior, like the one proposed in the logical plans SPIP.

In the short term, this plan will be available from SQL because it implements one of the modes for INSERT OVERWRITE.

table: NamedRelation,
query: LogicalPlan,
isByName: Boolean) extends V2WriteCommand

object OverwritePartitionsDynamic {
def byName(table: NamedRelation, df: LogicalPlan): OverwritePartitionsDynamic = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here. Should the parameter names be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment. This signals that this corresponds to the behavior of DataFrames.

OverwritePartitionsDynamic(table, df, isByName = true)
}

def byPosition(table: NamedRelation, query: LogicalPlan): OverwritePartitionsDynamic = {
OverwritePartitionsDynamic(table, query, isByName = false)
}
}


/**
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
* concrete implementations during analysis.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,7 @@ object SQLConf {
" register class names for which data source V2 write paths are disabled. Writes from these" +
" sources will fall back to the V1 sources.")
.stringConf
.createWithDefault("")
.createWithDefault("orc")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang @cloud-fan Please discuss it with @rdblue and see whether we can keep using data source v2 API even if the table catalog is not ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required. We cannot move sources to v2 by default until we are confident that they will behave the same way. We can't do that until v2 is actually implemented.


val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
.doc("A comma-separated list of fully qualified data source register class names for which" +
Expand Down
Loading