-
Notifications
You must be signed in to change notification settings - Fork 156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: CometShuffleExchangeExec logical link should be correct #324
Changes from 4 commits
96b9826
d67071e
fbccbf5
f17f9e3
f6e9dc3
87f50dc
cf7b382
f5e3437
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,8 @@ import org.apache.spark.util.MutablePair | |
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} | ||
import org.apache.spark.util.random.XORShiftRandom | ||
|
||
import com.google.common.base.Objects | ||
|
||
import org.apache.comet.serde.{OperatorOuterClass, PartitioningOuterClass, QueryPlanSerde} | ||
import org.apache.comet.serde.OperatorOuterClass.Operator | ||
import org.apache.comet.serde.QueryPlanSerde.serializeDataType | ||
|
@@ -61,6 +63,7 @@ import org.apache.comet.shims.ShimCometShuffleExchangeExec | |
case class CometShuffleExchangeExec( | ||
override val outputPartitioning: Partitioning, | ||
child: SparkPlan, | ||
originalPlan: ShuffleExchangeLike, | ||
shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS, | ||
shuffleType: ShuffleType = CometNativeShuffle, | ||
advisoryPartitionSize: Option[Long] = None) | ||
|
@@ -192,6 +195,24 @@ case class CometShuffleExchangeExec( | |
|
||
override protected def withNewChildInternal(newChild: SparkPlan): CometShuffleExchangeExec = | ||
copy(child = newChild) | ||
|
||
override def equals(obj: Any): Boolean = { | ||
obj match { | ||
case other: CometShuffleExchangeExec => | ||
this.outputPartitioning == other.outputPartitioning && | ||
this.shuffleOrigin == other.shuffleOrigin && this.child == other.child && | ||
this.shuffleType == other.shuffleType && | ||
this.advisoryPartitionSize == other.advisoryPartitionSize | ||
case _ => | ||
false | ||
} | ||
} | ||
|
||
override def hashCode(): Int = | ||
Objects.hashCode(outputPartitioning, shuffleOrigin, shuffleType, advisoryPartitionSize, child) | ||
|
||
override def stringArgs: Iterator[Any] = | ||
Iterator(outputPartitioning, shuffleOrigin, shuffleType, child) ++ Iterator(s"[plan_id=$id]") | ||
Comment on lines
+214
to
+215
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Follow Spark |
||
} | ||
|
||
object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode | |
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec} | ||
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} | ||
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec} | ||
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec | ||
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} | ||
import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, CartesianProductExec, SortMergeJoinExec} | ||
import org.apache.spark.sql.execution.window.WindowExec | ||
import org.apache.spark.sql.expressions.Window | ||
|
@@ -62,6 +62,37 @@ class CometExecSuite extends CometTestBase { | |
} | ||
} | ||
|
||
test("CometShuffleExchangeExec logical link should be correct") { | ||
withTempView("v") { | ||
spark.sparkContext | ||
.parallelize((1 to 4).map(i => TestData(i, i.toString)), 2) | ||
.toDF("c1", "c2") | ||
.createOrReplaceTempView("v") | ||
|
||
withSQLConf( | ||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", | ||
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { | ||
val df = sql("SELECT * FROM v where c1 = 1 order by c1, c2") | ||
val shuffle = find(df.queryExecution.executedPlan) { | ||
case _: CometShuffleExchangeExec => true | ||
case _ => false | ||
}.get.asInstanceOf[CometShuffleExchangeExec] | ||
assert(shuffle.logicalLink.isEmpty) | ||
} | ||
|
||
withSQLConf( | ||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", | ||
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { | ||
val df = sql("SELECT * FROM v where c1 = 1 order by c1, c2") | ||
val shuffle = find(df.queryExecution.executedPlan) { | ||
case _: ShuffleExchangeExec => true | ||
case _ => false | ||
}.get.asInstanceOf[ShuffleExchangeExec] | ||
assert(shuffle.logicalLink.isEmpty) | ||
viirya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
|
||
test("Ensure that the correct outputPartitioning of CometSort") { | ||
withTable("test_data") { | ||
val tableDF = spark.sparkContext | ||
|
@@ -302,7 +333,8 @@ class CometExecSuite extends CometTestBase { | |
withSQLConf( | ||
CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", | ||
"spark.sql.autoBroadcastJoinThreshold" -> "0", | ||
"spark.sql.adaptive.autoBroadcastJoinThreshold" -> "-1", | ||
"spark.sql.autoBroadcastJoinThreshold" -> "-1", | ||
Comment on lines
+328
to
+329
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Query plan is changed due to the logical link fix. In order to have CometSortMergeJoin, we need to disable broadcast join and AQE broadcast join. |
||
"spark.sql.join.preferSortMergeJoin" -> "true") { | ||
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl1") { | ||
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl2") { | ||
|
@@ -373,6 +405,7 @@ class CometExecSuite extends CometTestBase { | |
withSQLConf( | ||
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "true", | ||
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", | ||
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar reason here. The query plan is changed and AQE interferes with a broadcast join. |
||
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", | ||
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { | ||
withTable(tableName, dim) { | ||
|
@@ -1306,3 +1339,5 @@ case class BucketedTableTestSpec( | |
expectedShuffle: Boolean = true, | ||
expectedSort: Boolean = true, | ||
expectedNumOutputPartitions: Option[Int] = None) | ||
|
||
case class TestData(key: Int, value: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because
CometShuffleExchangeExec
addsoriginalPlan
parameter which is not covered by canonicalization in Spark, we need to exclude it when compare twoCometShuffleExchangeExec
to make sure Spark reuse shuffle rule work.