Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build: Drop Spark 3.2 support #581

Merged
merged 12 commits into from
Jun 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile

object ShimBatchReader {

// TODO: remove after dropping Spark 3.2 & 3.3 support and directly call PartitionedFile
// TODO: remove after dropping Spark 3.3 support and directly call PartitionedFile
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
classOf[PartitionedFile].getDeclaredConstructors
.map(c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@ package org.apache.comet.shims

object ShimFileFormat {

// TODO: remove after dropping Spark 3.2 & 3.3 support and directly use FileFormat.ROW_INDEX
// TODO: remove after dropping Spark 3.3 support and directly use FileFormat.ROW_INDEX
val ROW_INDEX = "row_index"

// A name for a temporary column that holds row indexes computed by the file format reader
// until they can be placed in the _metadata struct.
// TODO: remove after dropping Spark 3.2 & 3.3 support and directly use
// TODO: remove after dropping Spark 3.3 support and directly use
// FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
val ROW_INDEX_TEMPORARY_COLUMN_NAME: String = s"_tmp_metadata_$ROW_INDEX"

// TODO: remove after dropping Spark 3.2 support and use FileFormat.OPTION_RETURNING_BATCH
val OPTION_RETURNING_BATCH = "returning_batch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.Try
import org.apache.spark.sql.types.{StructField, StructType}

object ShimResolveDefaultColumns {
// TODO: remove after dropping Spark 3.2 & 3.3 support and directly use ResolveDefaultColumns
// TODO: remove after dropping Spark 3.3 support and directly use ResolveDefaultColumns
def getExistenceDefaultValue(field: StructField): Any =
Try {
// scalastyle:off classforname
Expand Down
2 changes: 1 addition & 1 deletion core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub enum CometError {
Internal(String),

// Note that this message format is based on Spark 3.4 and is more detailed than the message
// returned by Spark 3.2 or 3.3
// returned by Spark 3.3
#[error("[CAST_INVALID_INPUT] The value '{value}' of the type \"{from_type}\" cannot be cast to \"{to_type}\" \
because it is malformed. Correct the value as per the syntax, or change its target type. \
Use `try_cast` to tolerate malformed input and return NULL instead. If necessary \
Expand Down
7 changes: 2 additions & 5 deletions docs/source/contributor-guide/adding_a_new_expression.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The `QueryPlanSerde` object has a method `exprToProto`, which is responsible for
For example, the `unhex` function looks like this:

```scala
case e: Unhex if !isSpark32 =>
case e: Unhex =>
val unHex = unhexSerde(e)

val childExpr = exprToProtoInternal(unHex._1, inputs)
Expand All @@ -59,7 +59,6 @@ case e: Unhex if !isSpark32 =>

A few things to note here:

* The `isSpark32` check is used to fall back to Spark's implementation of `unhex` in Spark 3.2. This is somewhat context specific, because in this case, due to a bug in Spark 3.2 for `unhex`, we want to use the Spark implementation and not a Comet implementation that would behave differently if correct.
* The function is recursively called on child expressions, so you'll need to make sure that the child expressions are also converted to protobuf.
* `scalarExprToProtoWithReturnType` is for scalar functions that need return type information. Your expression may use a different method depending on the type of expression.

Expand All @@ -71,8 +70,6 @@ For example, this is the test case for the `unhex` expression:

```scala
test("unhex") {
assume(!isSpark32, "unhex function has incorrect behavior in 3.2") // used to skip the test in Spark 3.2

val table = "unhex_table"
withTable(table) {
sql(s"create table $table(col string) using parquet")
Expand Down Expand Up @@ -172,7 +169,7 @@ pub(super) fn spark_unhex(args: &[ColumnarValue]) -> Result<ColumnarValue, DataF
If the expression you're adding has different behavior across different Spark versions, you'll need to account for that in your implementation. There are two tools at your disposal to help with this:

1. Shims that exist in `spark/src/main/spark-$SPARK_VERSION/org/apache/comet/shims/CometExprShim.scala` for each Spark version. These shims are used to provide compatibility between different Spark versions.
2. Variables that correspond to the Spark version, such as `isSpark32`, which can be used to conditionally execute code based on the Spark version.
2. Variables that correspond to the Spark version, such as `isSpark33Plus`, which can be used to conditionally execute code based on the Spark version.

## Shimming to Support Different Spark Versions

Expand Down
14 changes: 0 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -517,20 +517,6 @@ under the License.
</properties>
</profile>

<profile>
<id>spark-3.2</id>
<properties>
<scala.version>2.12.15</scala.version>
<spark.version>3.2.2</spark.version>
<spark.version.short>3.2</spark.version.short>
<parquet.version>1.12.0</parquet.version>
<!-- we don't add special test suits for spark-3.2, so a not existed dir is specified-->
<additional.3_3.test.source>not-needed-yet</additional.3_3.test.source>
<additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
<shims.minorVerSrc>spark-3.2</shims.minorVerSrc>
</properties>
</profile>

<profile>
<id>spark-3.3</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class CometSparkSessionExtensions
isSchemaSupported(scanExec.scan.asInstanceOf[ParquetScan].readDataSchema) &&
isSchemaSupported(scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema) &&
// Comet does not support pushedAggregate
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isEmpty =>
scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isEmpty =>
val cometScan = CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan])
logInfo("Comet extension enabled for Scan")
CometBatchScanExec(
Expand All @@ -116,7 +116,7 @@ class CometSparkSessionExtensions
s"Partition schema $readPartitionSchema is not supported")
// Comet does not support pushedAggregate
val info3 = createMessage(
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined,
scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isDefined,
"Comet does not support pushed aggregate")
withInfos(scanExec, Seq(info1, info2, info3).flatten.toSet)
scanExec
Expand Down Expand Up @@ -992,8 +992,7 @@ object CometSparkSessionExtensions extends Logging {
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
true
// `TimestampNTZType` is private in Spark 3.2.
case t: DataType if t.typeName == "timestamp_ntz" && !isSpark32 => true
case t: DataType if t.typeName == "timestamp_ntz" => true
case dt =>
logInfo(s"Comet extension is disabled because data type $dt is not supported")
false
Expand All @@ -1015,11 +1014,6 @@ object CometSparkSessionExtensions extends Logging {
}
}

/** Used for operations that weren't available in Spark 3.2 */
def isSpark32: Boolean = {
org.apache.spark.SPARK_VERSION.matches("3\\.2\\..*")
}

def isSpark33Plus: Boolean = {
org.apache.spark.SPARK_VERSION >= "3.3"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
import org.apache.comet.shims.ShimSQLConf

/**
* Copied from Spark 3.2 & 3.4, in order to fix Parquet shading issue. TODO: find a way to remove
* this duplication
* Copied from Spark 3.4, in order to fix Parquet shading issue. TODO: find a way to remove this
* duplication
*
* Some utility function to convert Spark data source filters to Parquet filters.
*/
Expand Down
18 changes: 6 additions & 12 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark32, isSpark34Plus, withInfo}
import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark34Plus, withInfo}
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, Unsupported}
import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc}
import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo}
Expand All @@ -63,7 +63,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: DecimalType |
_: DateType | _: BooleanType | _: NullType =>
true
// `TimestampNTZType` is private in Spark 3.2.
case dt if dt.typeName == "timestamp_ntz" => true
case dt =>
emitWarning(s"unsupported Spark data type: $dt")
Expand Down Expand Up @@ -1413,7 +1412,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
}

case UnaryExpression(child) if expr.prettyName == "promote_precision" =>
// `UnaryExpression` includes `PromotePrecision` for Spark 3.2 & 3.3
// `UnaryExpression` includes `PromotePrecision` for Spark 3.3
// `PromotePrecision` is just a wrapper, don't need to serialize it.
exprToProtoInternal(child, inputs)

Expand Down Expand Up @@ -1518,7 +1517,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim

optExprWithInfo(optExpr, expr, child)

case e: Unhex if !isSpark32 =>
case e: Unhex =>
val unHex = unhexSerde(e)

val childExpr = exprToProtoInternal(unHex._1, inputs)
Expand Down Expand Up @@ -1585,9 +1584,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
val optExpr = scalarExprToProto("pow", leftExpr, rightExpr)
optExprWithInfo(optExpr, expr, left, right)

// round function for Spark 3.2 does not allow negative round target scale. In addition,
// it has different result precision/scale for decimals. Supporting only 3.3 and above.
case r: Round if !isSpark32 =>
case r: Round =>
// _scale s a constant, copied from Spark's RoundBase because it is a protected val
val scaleV: Any = r.scale.eval(EmptyRow)
val _scale: Int = scaleV.asInstanceOf[Int]
Expand Down Expand Up @@ -2066,7 +2063,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
childExpr)
optExprWithInfo(optExpr, expr, child)

case b @ BinaryExpression(_, _) if isBloomFilterMightContain(b) =>
case b @ BloomFilterMightContain(_, _) =>
val bloomFilter = b.left
val value = b.right
val bloomFilterExpr = exprToProtoInternal(bloomFilter, inputs)
Expand Down Expand Up @@ -2252,7 +2249,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: StringType | _: DateType | _: DecimalType | _: BooleanType =>
true
// `TimestampNTZType` is private in Spark 3.2/3.3.
// `TimestampNTZType` is private in Spark 3.3.
case dt if dt.typeName == "timestamp_ntz" => true
case _ => false
}
Expand Down Expand Up @@ -2330,12 +2327,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
if (childOp.nonEmpty && globalLimitExec.limit >= 0) {
val limitBuilder = OperatorOuterClass.Limit.newBuilder()

// Spark 3.2 doesn't support offset for GlobalLimit, but newer Spark versions
// support it. Before we upgrade to Spark 3.3, just set it zero.
// TODO: Spark 3.3 might have negative limit (-1) for Offset usage.
// When we upgrade to Spark 3.3., we need to address it here.
limitBuilder.setLimit(globalLimitExec.limit)
limitBuilder.setOffset(0)

Some(result.setLimit(limitBuilder).build())
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,5 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres
}
}

// Intentionally omitting the return type as it is different depending on Spark version
// Spark 3.2.x Seq[InputPartition]
// Spark 3.3.x Seq[Seq[InputPartition]]
// TODO: add back the return type after dropping Spark 3.2.0 support
@transient override lazy val partitions = wrappedScan.partitions
huaxingao marked this conversation as resolved.
Show resolved Hide resolved

override def supportsColumnar: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import org.apache.spark.util.collection._

import org.apache.comet.{CometConf, MetricsSupport}
import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory}
import org.apache.comet.shims.ShimFileFormat

/**
* Comet physical scan node for DataSource V1. Most of the code here follow Spark's
Expand Down Expand Up @@ -150,7 +149,7 @@ case class CometScanExec(

lazy val inputRDD: RDD[InternalRow] = {
val options = relation.options +
(ShimFileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
(FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
}
}

// Copied from Spark 3.4+ to make it available in Spark 3.2+.
// Copied from Spark 3.4+ to make it available in Spark 3.3+.
def multiTransformDown(expr: Expression)(
rule: PartialFunction[Expression, Seq[Expression]]): Stream[Expression] = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.UnaryExecNode
* satisfies distribution requirements.
*
* This is copied from Spark's `PartitioningPreservingUnaryExecNode` because it is only available
* in Spark 3.4+. This is a workaround to make it available in Spark 3.2+.
* in Spark 3.4+. This is a workaround to make it available in Spark 3.3+.
*/
trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ trait ShimCometBroadcastHashJoinExec {
/**
* Returns the expressions that are used for hash partitioning including `HashPartitioning` and
* `CoalescedHashPartitioning`. They shares same trait `HashPartitioningLike` since Spark 3.4,
* but Spark 3.2/3.3 doesn't have `HashPartitioningLike` and `CoalescedHashPartitioning`.
* but Spark 3.3 doesn't have `HashPartitioningLike` and `CoalescedHashPartitioning`.
*
* TODO: remove after dropping Spark 3.2 and 3.3 support.
* TODO: remove after dropping Spark 3.3 support.
*/
def getHashPartitioningLikeExpressions(partitioning: Partitioning): Seq[Expression] = {
partitioning.getClass.getDeclaredMethods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.types.{StructField, StructType}

trait ShimCometShuffleExchangeExec {
// TODO: remove after dropping Spark 3.2 and 3.3 support
// TODO: remove after dropping Spark 3.3 support
def apply(s: ShuffleExchangeExec, shuffleType: ShuffleType): CometShuffleExchangeExec = {
val advisoryPartitionSize = s.getClass.getDeclaredMethods
.filter(_.getName == "advisoryPartitionSize")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,11 @@

package org.apache.comet.shims

import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.execution.{LimitExec, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan

trait ShimCometSparkSessionExtensions {
/**
* TODO: delete after dropping Spark 3.2.0 support and directly call scan.pushedAggregate
*/
def getPushedAggregate(scan: ParquetScan): Option[Aggregation] = scan.getClass.getDeclaredFields
.filter(_.getName == "pushedAggregate")
.map { a => a.setAccessible(true); a }
.flatMap(_.get(scan).asInstanceOf[Option[Aggregation]])
.headOption

/**
* TODO: delete after dropping Spark 3.2 and 3.3 support
* TODO: delete after dropping Spark 3.3 support
*/
def getOffset(limit: LimitExec): Int = getOffsetOpt(limit).getOrElse(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.TakeOrderedAndProjectExec
trait ShimCometTakeOrderedAndProjectExec {

/**
* TODO: delete after dropping Spark 3.2 and 3.3 support
* TODO: delete after dropping Spark 3.3 support
*/
protected def getOffset(plan: TakeOrderedAndProjectExec): Option[Int] = {
plan.getClass.getDeclaredFields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.comet.shims

import org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic, BinaryExpression}
import org.apache.spark.sql.catalyst.expressions.BinaryArithmetic
import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate

trait ShimQueryPlanSerde {
Expand All @@ -45,7 +45,7 @@ trait ShimQueryPlanSerde {
}
}

// TODO: delete after drop Spark 3.2/3.3 support
// TODO: delete after drop Spark 3.3 support
// This method is used to check if the aggregate function is in legacy mode.
// EvalMode is an enum object in Spark 3.4.
def isLegacyMode(aggregate: DeclarativeAggregate): Boolean = {
Expand All @@ -62,9 +62,4 @@ trait ShimQueryPlanSerde {
"legacy".equalsIgnoreCase(evalMode.head.toString)
}
}

// TODO: delete after drop Spark 3.2 support
def isBloomFilterMightContain(binary: BinaryExpression): Boolean = {
binary.getClass.getName == "org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait ShimSQLConf {
* Spark 3.4 renamed parquetFilterPushDownStringStartWith to
* parquetFilterPushDownStringPredicate
*
* TODO: delete after dropping Spark 3.2 & 3.3 support and simply use
* TODO: delete after dropping Spark 3.3 support and simply use
* parquetFilterPushDownStringPredicate
*/
protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
Expand Down
Loading
Loading