From 57eb11106de1eb45f2c54e2ea9a5609d7b6594c1 Mon Sep 17 00:00:00 2001 From: philo Date: Wed, 16 Mar 2022 22:34:09 +0800 Subject: [PATCH 1/3] Use correct type for equals in ColumnarBroadcastExchangeAdaptor --- .../spark/sql/execution/ColumnarBroadcastExchangeExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 99f7f7f69..fe030561b 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -303,10 +303,10 @@ class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan) override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = plan.doExecuteBroadcast[T]() - override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarShuffleExchangeAdaptor] + override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBroadcastExchangeAdaptor] override def equals(other: Any): Boolean = other match { - case that: ColumnarShuffleExchangeAdaptor => + case that: ColumnarBroadcastExchangeAdaptor => (that canEqual this) && super.equals(that) case _ => false } From f59f047fad04bf52a85e62372375c6190b4daa77 Mon Sep 17 00:00:00 2001 From: philo Date: Thu, 17 Mar 2022 00:08:45 +0800 Subject: [PATCH 2/3] Fix compatibility issues to make the changes work on spark 3.1 also --- .../ColumnarShuffleExchangeExec.scala | 4 ++- .../execution/ShuffledColumnarBatchRDD.scala | 17 ++++++++-- .../com/intel/oap/sql/shims/SparkShims.scala | 8 +++++ .../sql/shims/spark311/Spark311Shims.scala | 31 +++++++++++++++++-- .../sql/shims/spark321/Spark321Shims.scala | 31 +++++++++++++++++-- 5 files changed, 83 insertions(+), 8 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 189ca28b7..a98d63582 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -319,7 +319,9 @@ case class ColumnarShuffleExchangeAdaptor( attr.name + ":" + attr.dataType }.toString() } - override protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeAdaptor = + + // For spark3.2. + protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeAdaptor = copy(child = newChild) } diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala index 1b57f8381..ea074c4c9 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.execution +import com.intel.oap.sql.shims.SparkShimLoader + import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch @@ -87,7 +88,11 @@ class ShuffledColumnarBatchRDD( coalescedPartitionSpec.endReducerIndex).flatMap { reducerIndex => tracker.getPreferredLocationsForShuffle(dependency, reducerIndex) } - case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, _) => + case spec if SparkShimLoader.getSparkShims.isCoalescedMapperPartitionSpec(spec) => + val startMapIndex = + SparkShimLoader.getSparkShims.getStartMapIndexOfCoalescedMapperPartitionSpec(spec) + val endMapIndex = + SparkShimLoader.getSparkShims.getEndMapIndexOfCoalescedMapperPartitionSpec(spec) tracker.getMapLocation(dependency, startMapIndex, endMapIndex) case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) => @@ -133,7 +138,13 @@ class ShuffledColumnarBatchRDD( context, sqlMetricsReporter) - case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) => + case spec if SparkShimLoader.getSparkShims.isCoalescedMapperPartitionSpec(spec) => + val startMapIndex = + SparkShimLoader.getSparkShims.getStartMapIndexOfCoalescedMapperPartitionSpec(spec) + val endMapIndex = + SparkShimLoader.getSparkShims.getEndMapIndexOfCoalescedMapperPartitionSpec(spec) + val numReducers = + SparkShimLoader.getSparkShims.getNumReducersOfCoalescedMapperPartitionSpec(spec) SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, startMapIndex, diff --git a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala index 62420a6f8..61bd49b57 100644 --- a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala @@ -113,4 +113,12 @@ trait SparkShims { * REPARTITION is changed to REPARTITION_BY_COL from spark 3.2. */ def isRepartition(shuffleOrigin: ShuffleOrigin): Boolean + + def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean + + def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int + + def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int + + def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int } diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index e637877e9..790b4cf33 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -36,8 +36,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} -import org.apache.spark.sql.execution.ShufflePartitionSpec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, CustomShuffleReaderExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -179,4 +178,32 @@ class Spark311Shims extends SparkShims { } } + /** + * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return false for spark3.1. + */ + override def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean = { + false + } + + /** + * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + */ + override def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + -1 + } + + /** + * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + */ + override def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + -1 + } + + /** + * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + */ + override def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + -1 + } + } \ No newline at end of file diff --git a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala index ce8d2d33e..8512e1b2b 100644 --- a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala @@ -38,8 +38,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec -import org.apache.spark.sql.execution.ShufflePartitionSpec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{CoalescedMapperPartitionSpec, ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -230,4 +229,32 @@ class Spark321Shims extends SparkShims { } } + override def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean = { + spec match { + case _: CoalescedMapperPartitionSpec => true + case _ => false + } + } + + override def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + spec match { + case c: CoalescedMapperPartitionSpec => c.startMapIndex + case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!") + } + } + + override def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + spec match { + case c: CoalescedMapperPartitionSpec => c.endMapIndex + case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!") + } + } + + override def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + spec match { + case c: CoalescedMapperPartitionSpec => c.numReducers + case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!") + } + } + } \ No newline at end of file From 5d92bde53a438558ea11f3835ca8ef8918e3c6cb Mon Sep 17 00:00:00 2001 From: philo Date: Thu, 17 Mar 2022 00:27:33 +0800 Subject: [PATCH 3/3] Refine the code --- .../intel/oap/sql/shims/spark311/Spark311Shims.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index 790b4cf33..7535c4eeb 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -186,24 +186,24 @@ class Spark311Shims extends SparkShims { } /** - * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + * This method cannot be invoked in spark3.1. */ override def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { - -1 + throw new RuntimeException("This method should not be invoked in spark 3.1.") } /** - * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + * This method cannot be invoked in spark3.1. */ override def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { - -1 + throw new RuntimeException("This method should not be invoked in spark 3.1.") } /** - * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + * This method cannot be invoked in spark3.1. */ override def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { - -1 + throw new RuntimeException("This method should not be invoked in spark 3.1.") } } \ No newline at end of file