diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 99f7f7f69..fe030561b 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -303,10 +303,10 @@ class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan) override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = plan.doExecuteBroadcast[T]() - override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarShuffleExchangeAdaptor] + override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBroadcastExchangeAdaptor] override def equals(other: Any): Boolean = other match { - case that: ColumnarShuffleExchangeAdaptor => + case that: ColumnarBroadcastExchangeAdaptor => (that canEqual this) && super.equals(that) case _ => false } diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 189ca28b7..a98d63582 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -319,7 +319,9 @@ case class ColumnarShuffleExchangeAdaptor( attr.name + ":" + attr.dataType }.toString() } - override protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeAdaptor = + + // For spark3.2. + protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeAdaptor = copy(child = newChild) } diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala index 1b57f8381..ea074c4c9 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.execution +import com.intel.oap.sql.shims.SparkShimLoader + import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch @@ -87,7 +88,11 @@ class ShuffledColumnarBatchRDD( coalescedPartitionSpec.endReducerIndex).flatMap { reducerIndex => tracker.getPreferredLocationsForShuffle(dependency, reducerIndex) } - case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, _) => + case spec if SparkShimLoader.getSparkShims.isCoalescedMapperPartitionSpec(spec) => + val startMapIndex = + SparkShimLoader.getSparkShims.getStartMapIndexOfCoalescedMapperPartitionSpec(spec) + val endMapIndex = + SparkShimLoader.getSparkShims.getEndMapIndexOfCoalescedMapperPartitionSpec(spec) tracker.getMapLocation(dependency, startMapIndex, endMapIndex) case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) => @@ -133,7 +138,13 @@ class ShuffledColumnarBatchRDD( context, sqlMetricsReporter) - case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) => + case spec if SparkShimLoader.getSparkShims.isCoalescedMapperPartitionSpec(spec) => + val startMapIndex = + SparkShimLoader.getSparkShims.getStartMapIndexOfCoalescedMapperPartitionSpec(spec) + val endMapIndex = + SparkShimLoader.getSparkShims.getEndMapIndexOfCoalescedMapperPartitionSpec(spec) + val numReducers = + SparkShimLoader.getSparkShims.getNumReducersOfCoalescedMapperPartitionSpec(spec) SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, startMapIndex, diff --git a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala index 62420a6f8..61bd49b57 100644 --- a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala @@ -113,4 +113,12 @@ trait SparkShims { * REPARTITION is changed to REPARTITION_BY_COL from spark 3.2. */ def isRepartition(shuffleOrigin: ShuffleOrigin): Boolean + + def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean + + def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int + + def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int + + def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int } diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index e637877e9..7535c4eeb 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -36,8 +36,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} -import org.apache.spark.sql.execution.ShufflePartitionSpec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, CustomShuffleReaderExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -179,4 +178,32 @@ class Spark311Shims extends SparkShims { } } + /** + * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return false for spark3.1. + */ + override def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean = { + false + } + + /** + * This method cannot be invoked in spark3.1. + */ + override def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + throw new RuntimeException("This method should not be invoked in spark 3.1.") + } + + /** + * This method cannot be invoked in spark3.1. + */ + override def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + throw new RuntimeException("This method should not be invoked in spark 3.1.") + } + + /** + * This method cannot be invoked in spark3.1. + */ + override def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + throw new RuntimeException("This method should not be invoked in spark 3.1.") + } + } \ No newline at end of file diff --git a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala index ce8d2d33e..8512e1b2b 100644 --- a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala @@ -38,8 +38,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec -import org.apache.spark.sql.execution.ShufflePartitionSpec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{CoalescedMapperPartitionSpec, ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -230,4 +229,32 @@ class Spark321Shims extends SparkShims { } } + override def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean = { + spec match { + case _: CoalescedMapperPartitionSpec => true + case _ => false + } + } + + override def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + spec match { + case c: CoalescedMapperPartitionSpec => c.startMapIndex + case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!") + } + } + + override def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + spec match { + case c: CoalescedMapperPartitionSpec => c.endMapIndex + case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!") + } + } + + override def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + spec match { + case c: CoalescedMapperPartitionSpec => c.numReducers + case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!") + } + } + } \ No newline at end of file