Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compatibility issues #1

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan)
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] =
plan.doExecuteBroadcast[T]()

override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarShuffleExchangeAdaptor]
override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBroadcastExchangeAdaptor]

override def equals(other: Any): Boolean = other match {
case that: ColumnarShuffleExchangeAdaptor =>
case that: ColumnarBroadcastExchangeAdaptor =>
(that canEqual this) && super.equals(that)
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ case class ColumnarShuffleExchangeAdaptor(
attr.name + ":" + attr.dataType
}.toString()
}
override protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeAdaptor =

// For spark3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeAdaptor =
copy(child = newChild)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.execution

import com.intel.oap.sql.shims.SparkShimLoader

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -87,7 +88,11 @@ class ShuffledColumnarBatchRDD(
coalescedPartitionSpec.endReducerIndex).flatMap { reducerIndex =>
tracker.getPreferredLocationsForShuffle(dependency, reducerIndex)
}
case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, _) =>
case spec if SparkShimLoader.getSparkShims.isCoalescedMapperPartitionSpec(spec) =>
val startMapIndex =
SparkShimLoader.getSparkShims.getStartMapIndexOfCoalescedMapperPartitionSpec(spec)
val endMapIndex =
SparkShimLoader.getSparkShims.getEndMapIndexOfCoalescedMapperPartitionSpec(spec)
tracker.getMapLocation(dependency, startMapIndex, endMapIndex)

case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) =>
Expand Down Expand Up @@ -133,7 +138,13 @@ class ShuffledColumnarBatchRDD(
context,
sqlMetricsReporter)

case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) =>
case spec if SparkShimLoader.getSparkShims.isCoalescedMapperPartitionSpec(spec) =>
val startMapIndex =
SparkShimLoader.getSparkShims.getStartMapIndexOfCoalescedMapperPartitionSpec(spec)
val endMapIndex =
SparkShimLoader.getSparkShims.getEndMapIndexOfCoalescedMapperPartitionSpec(spec)
val numReducers =
SparkShimLoader.getSparkShims.getNumReducersOfCoalescedMapperPartitionSpec(spec)
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
startMapIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,12 @@ trait SparkShims {
* REPARTITION is changed to REPARTITION_BY_COL from spark 3.2.
*/
def isRepartition(shuffleOrigin: ShuffleOrigin): Boolean

def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean

def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int

def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int

def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.execution.ShufflePartitionSpec
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, CustomShuffleReaderExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
Expand Down Expand Up @@ -179,4 +178,32 @@ class Spark311Shims extends SparkShims {
}
}

/**
* CoalescedMapperPartitionSpec is introduced in spark3.2. So always return false for spark3.1.
*/
override def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean = {
false
}

/**
* This method cannot be invoked in spark3.1.
*/
override def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = {
throw new RuntimeException("This method should not be invoked in spark 3.1.")
}

/**
* This method cannot be invoked in spark3.1.
*/
override def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = {
throw new RuntimeException("This method should not be invoked in spark 3.1.")
}

/**
* This method cannot be invoked in spark3.1.
*/
override def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = {
throw new RuntimeException("This method should not be invoked in spark 3.1.")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.execution.ShufflePartitionSpec
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{CoalescedMapperPartitionSpec, ShufflePartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
Expand Down Expand Up @@ -230,4 +229,32 @@ class Spark321Shims extends SparkShims {
}
}

override def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean = {
spec match {
case _: CoalescedMapperPartitionSpec => true
case _ => false
}
}

override def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = {
spec match {
case c: CoalescedMapperPartitionSpec => c.startMapIndex
case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!")
}
}

override def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = {
spec match {
case c: CoalescedMapperPartitionSpec => c.endMapIndex
case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!")
}
}

override def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = {
spec match {
case c: CoalescedMapperPartitionSpec => c.numReducers
case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!")
}
}

}