From 8d2f648d101044e2ede85a324aefb8a04531f575 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 18 Jun 2024 16:03:31 -0700 Subject: [PATCH] build: Drop Spark 3.2 support (#581) * build: Drop Spark 3.2 support * remove un-used import * fix BloomFilterMightContain * revert the changes for TimestampNTZType and PartitionIdPassthrough * address comments and remove more 3.2 related code * remove un-used import * put back newDataSourceRDD * remove un-used import and put back lazy val partitions * address comments * Trigger Build * remove the missed 3.2 pipeline * address comments --- .github/workflows/pr_build.yml | 15 +--- .../comet/parquet/CometParquetUtils.scala | 3 +- .../parquet/CometParquetReadSupport.scala | 13 ++- .../CometSparkToParquetSchemaConverter.scala | 5 +- .../apache/comet/shims/ShimBatchReader.scala | 2 +- .../apache/comet/shims/ShimFileFormat.scala | 7 +- .../shims/ShimResolveDefaultColumns.scala | 2 +- .../comet/shims/ShimCometParquetUtils.scala | 81 ------------------- .../comet/shims/ShimCometParquetUtils.scala | 38 --------- core/src/errors.rs | 2 +- .../adding_a_new_expression.md | 9 +-- docs/source/user-guide/installation.md | 2 +- docs/source/user-guide/overview.md | 2 +- pom.xml | 14 ---- .../comet/CometSparkSessionExtensions.scala | 12 +-- .../apache/comet/parquet/ParquetFilters.scala | 4 +- .../apache/comet/serde/QueryPlanSerde.scala | 18 ++--- .../spark/sql/comet/CometBatchScanExec.scala | 10 +-- .../spark/sql/comet/CometScanExec.scala | 6 +- .../shuffle/CometShuffleExchangeExec.scala | 4 +- .../plans/AliasAwareOutputExpression.scala | 2 +- .../PartitioningPreservingUnaryExecNode.scala | 2 +- .../apache/comet/shims/CometExprShim.scala | 37 --------- .../comet/shims/ShimCometBatchScanExec.scala | 14 +--- .../ShimCometBroadcastHashJoinExec.scala | 4 +- .../shims/ShimCometShuffleExchangeExec.scala | 2 +- .../ShimCometSparkSessionExtensions.scala | 13 +-- .../ShimCometTakeOrderedAndProjectExec.scala | 2 +- .../comet/shims/ShimQueryPlanSerde.scala | 9 +-- .../org/apache/comet/shims/ShimSQLConf.scala | 2 +- .../ShimCometBroadcastExchangeExec.scala | 2 +- .../sql/comet/shims/ShimCometScanExec.scala | 47 +++-------- .../comet/shims/ShimCometBatchScanExec.scala | 7 +- .../sql/comet/shims/ShimCometScanExec.scala | 12 --- .../org/apache/comet/CometCastSuite.scala | 20 +---- .../apache/comet/CometExpressionSuite.scala | 26 ++---- .../comet/exec/CometAggregateSuite.scala | 2 +- .../comet/parquet/ParquetReadSuite.scala | 15 +--- .../spark/sql/CometTPCHQuerySuite.scala | 39 +-------- .../sql/benchmark/CometReadBenchmark.scala | 4 +- .../sql/comet/CometPlanStabilitySuite.scala | 2 +- .../spark/comet/shims/ShimTestUtils.scala | 43 ---------- 42 files changed, 87 insertions(+), 468 deletions(-) delete mode 100644 common/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala delete mode 100644 common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala delete mode 100644 spark/src/main/spark-3.2/org/apache/comet/shims/CometExprShim.scala delete mode 100644 spark/src/test/spark-3.x/org/apache/spark/comet/shims/ShimTestUtils.scala diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index 2bf023357..981905ec7 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -109,15 +109,8 @@ jobs: os: [ubuntu-latest] java_version: [8, 11, 17] test-target: [java] - spark-version: ['3.2', '3.3'] + spark-version: ['3.3'] scala-version: ['2.12', '2.13'] - exclude: - - java_version: 17 - spark-version: '3.2' - - java_version: 11 - spark-version: '3.2' - - spark-version: '3.2' - scala-version: '2.13' fail-fast: false name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ matrix.test-target }} runs-on: ${{ matrix.os }} @@ -254,15 +247,11 @@ jobs: matrix: java_version: [8, 17] test-target: [java] - spark-version: ['3.2', '3.3'] + spark-version: ['3.3'] scala-version: ['2.12', '2.13'] exclude: - - java_version: 17 - spark-version: '3.2' - java_version: 8 spark-version: '3.3' - - spark-version: '3.2' - scala-version: '2.13' fail-fast: false name: macos-14(Silicon)/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ matrix.test-target }} runs-on: macos-14 diff --git a/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala b/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala index d03252d06..a37ec7e66 100644 --- a/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala +++ b/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala @@ -20,10 +20,9 @@ package org.apache.comet.parquet import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.comet.shims.ShimCometParquetUtils import org.apache.spark.sql.internal.SQLConf -object CometParquetUtils extends ShimCometParquetUtils { +object CometParquetUtils { private val PARQUET_FIELD_ID_WRITE_ENABLED = "spark.sql.parquet.fieldId.write.enabled" private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled" private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing" diff --git a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala index 0e8a190c2..4523a057b 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala @@ -27,10 +27,9 @@ import org.apache.parquet.schema._ import org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation import org.apache.parquet.schema.Type.Repetition import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.types._ -import org.apache.comet.parquet.CometParquetUtils - /** * This class is copied & slightly modified from [[ParquetReadSupport]] in Spark. Changes: * - This doesn't extend from Parquet's `ReadSupport` class since that is used for row-based @@ -53,7 +52,7 @@ object CometParquetReadSupport { ignoreMissingIds: Boolean): MessageType = { if (!ignoreMissingIds && !containsFieldIds(parquetSchema) && - CometParquetUtils.hasFieldIds(catalystSchema)) { + ParquetUtils.hasFieldIds(catalystSchema)) { throw new RuntimeException( "Spark read schema expects field Ids, " + "but Parquet file schema doesn't contain any field Ids.\n" + @@ -334,14 +333,14 @@ object CometParquetReadSupport { } def matchIdField(f: StructField): Type = { - val fieldId = CometParquetUtils.getFieldId(f) + val fieldId = ParquetUtils.getFieldId(f) idToParquetFieldMap .get(fieldId) .map { parquetTypes => if (parquetTypes.size > 1) { // Need to fail if there is ambiguity, i.e. more than one field is matched val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]") - throw CometParquetUtils.foundDuplicateFieldInFieldIdLookupModeError( + throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError( fieldId, parquetTypesString) } else { @@ -355,9 +354,9 @@ object CometParquetReadSupport { } } - val shouldMatchById = useFieldId && CometParquetUtils.hasFieldIds(structType) + val shouldMatchById = useFieldId && ParquetUtils.hasFieldIds(structType) structType.map { f => - if (shouldMatchById && CometParquetUtils.hasFieldId(f)) { + if (shouldMatchById && ParquetUtils.hasFieldId(f)) { matchIdField(f) } else if (caseSensitive) { matchCaseSensitiveField(f) diff --git a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala index 2c8187e18..56adc4607 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala @@ -26,6 +26,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.parquet.schema.Type.Repetition._ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -66,8 +67,8 @@ class CometSparkToParquetSchemaConverter( */ def convertField(field: StructField): Type = { val converted = convertField(field, if (field.nullable) OPTIONAL else REQUIRED) - if (useFieldId && CometParquetUtils.hasFieldId(field)) { - converted.withId(CometParquetUtils.getFieldId(field)) + if (useFieldId && ParquetUtils.hasFieldId(field)) { + converted.withId(ParquetUtils.getFieldId(field)) } else { converted } diff --git a/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala b/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala index ece4cfbe5..18f91acc3 100644 --- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala +++ b/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile object ShimBatchReader { - // TODO: remove after dropping Spark 3.2 & 3.3 support and directly call PartitionedFile + // TODO: remove after dropping Spark 3.3 support and directly call PartitionedFile def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile = classOf[PartitionedFile].getDeclaredConstructors .map(c => diff --git a/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala b/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala index 5ab7eaf4f..685e8f566 100644 --- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala +++ b/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala @@ -21,15 +21,12 @@ package org.apache.comet.shims object ShimFileFormat { - // TODO: remove after dropping Spark 3.2 & 3.3 support and directly use FileFormat.ROW_INDEX + // TODO: remove after dropping Spark 3.3 support and directly use FileFormat.ROW_INDEX val ROW_INDEX = "row_index" // A name for a temporary column that holds row indexes computed by the file format reader // until they can be placed in the _metadata struct. - // TODO: remove after dropping Spark 3.2 & 3.3 support and directly use + // TODO: remove after dropping Spark 3.3 support and directly use // FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME val ROW_INDEX_TEMPORARY_COLUMN_NAME: String = s"_tmp_metadata_$ROW_INDEX" - - // TODO: remove after dropping Spark 3.2 support and use FileFormat.OPTION_RETURNING_BATCH - val OPTION_RETURNING_BATCH = "returning_batch" } diff --git a/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala b/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala index 8a30c8e00..4f7d49831 100644 --- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala +++ b/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala @@ -24,7 +24,7 @@ import scala.util.Try import org.apache.spark.sql.types.{StructField, StructType} object ShimResolveDefaultColumns { - // TODO: remove after dropping Spark 3.2 & 3.3 support and directly use ResolveDefaultColumns + // TODO: remove after dropping Spark 3.3 support and directly use ResolveDefaultColumns def getExistenceDefaultValue(field: StructField): Any = Try { // scalastyle:off classforname diff --git a/common/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala b/common/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala deleted file mode 100644 index f22ac4060..000000000 --- a/common/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.comet.shims - -import org.apache.spark.sql.types._ - -trait ShimCometParquetUtils { - // The following is copied from QueryExecutionErrors - // TODO: remove after dropping Spark 3.2.0 support and directly use - // QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError - def foundDuplicateFieldInFieldIdLookupModeError( - requiredId: Int, - matchedFields: String): Throwable = { - new RuntimeException(s""" - |Found duplicate field(s) "$requiredId": $matchedFields - |in id mapping mode - """.stripMargin.replaceAll("\n", " ")) - } - - // The followings are copied from org.apache.spark.sql.execution.datasources.parquet.ParquetUtils - // TODO: remove after dropping Spark 3.2.0 support and directly use ParquetUtils - /** - * A StructField metadata key used to set the field id of a column in the Parquet schema. - */ - val FIELD_ID_METADATA_KEY = "parquet.field.id" - - /** - * Whether there exists a field in the schema, whether inner or leaf, has the parquet field ID - * metadata. - */ - def hasFieldIds(schema: StructType): Boolean = { - def recursiveCheck(schema: DataType): Boolean = { - schema match { - case st: StructType => - st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType)) - - case at: ArrayType => recursiveCheck(at.elementType) - - case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType) - - case _ => - // No need to really check primitive types, just to terminate the recursion - false - } - } - if (schema.isEmpty) false else recursiveCheck(schema) - } - - def hasFieldId(field: StructField): Boolean = - field.metadata.contains(FIELD_ID_METADATA_KEY) - - def getFieldId(field: StructField): Int = { - require( - hasFieldId(field), - s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + field) - try { - Math.toIntExact(field.metadata.getLong(FIELD_ID_METADATA_KEY)) - } catch { - case _: ArithmeticException | _: ClassCastException => - throw new IllegalArgumentException( - s"The key `$FIELD_ID_METADATA_KEY` must be a 32-bit integer") - } - } -} diff --git a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala b/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala deleted file mode 100644 index d402cd786..000000000 --- a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.comet.shims - -import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils -import org.apache.spark.sql.types._ - -trait ShimCometParquetUtils { - def foundDuplicateFieldInFieldIdLookupModeError( - requiredId: Int, - matchedFields: String): Throwable = { - QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(requiredId, matchedFields) - } - - def hasFieldIds(schema: StructType): Boolean = ParquetUtils.hasFieldIds(schema) - - def hasFieldId(field: StructField): Boolean = ParquetUtils.hasFieldId(field) - - def getFieldId(field: StructField): Int = ParquetUtils.getFieldId (field) -} diff --git a/core/src/errors.rs b/core/src/errors.rs index 493880c3e..b38c5e90b 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -61,7 +61,7 @@ pub enum CometError { Internal(String), // Note that this message format is based on Spark 3.4 and is more detailed than the message - // returned by Spark 3.2 or 3.3 + // returned by Spark 3.3 #[error("[CAST_INVALID_INPUT] The value '{value}' of the type \"{from_type}\" cannot be cast to \"{to_type}\" \ because it is malformed. Correct the value as per the syntax, or change its target type. \ Use `try_cast` to tolerate malformed input and return NULL instead. If necessary \ diff --git a/docs/source/contributor-guide/adding_a_new_expression.md b/docs/source/contributor-guide/adding_a_new_expression.md index 6cf10c758..6d906c662 100644 --- a/docs/source/contributor-guide/adding_a_new_expression.md +++ b/docs/source/contributor-guide/adding_a_new_expression.md @@ -46,7 +46,7 @@ The `QueryPlanSerde` object has a method `exprToProto`, which is responsible for For example, the `unhex` function looks like this: ```scala -case e: Unhex if !isSpark32 => +case e: Unhex => val unHex = unhexSerde(e) val childExpr = exprToProtoInternal(unHex._1, inputs) @@ -59,7 +59,6 @@ case e: Unhex if !isSpark32 => A few things to note here: -* The `isSpark32` check is used to fall back to Spark's implementation of `unhex` in Spark 3.2. This is somewhat context specific, because in this case, due to a bug in Spark 3.2 for `unhex`, we want to use the Spark implementation and not a Comet implementation that would behave differently if correct. * The function is recursively called on child expressions, so you'll need to make sure that the child expressions are also converted to protobuf. * `scalarExprToProtoWithReturnType` is for scalar functions that need return type information. Your expression may use a different method depending on the type of expression. @@ -71,8 +70,6 @@ For example, this is the test case for the `unhex` expression: ```scala test("unhex") { - assume(!isSpark32, "unhex function has incorrect behavior in 3.2") // used to skip the test in Spark 3.2 - val table = "unhex_table" withTable(table) { sql(s"create table $table(col string) using parquet") @@ -172,11 +169,11 @@ pub(super) fn spark_unhex(args: &[ColumnarValue]) -> Result - - spark-3.2 - - 2.12.15 - 3.2.2 - 3.2 - 1.12.0 - - not-needed-yet - not-needed-yet - spark-3.2 - - - spark-3.3 diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 0136b62a4..e939b43a1 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -96,7 +96,7 @@ class CometSparkSessionExtensions isSchemaSupported(scanExec.scan.asInstanceOf[ParquetScan].readDataSchema) && isSchemaSupported(scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema) && // Comet does not support pushedAggregate - getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isEmpty => + scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isEmpty => val cometScan = CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan]) logInfo("Comet extension enabled for Scan") CometBatchScanExec( @@ -116,7 +116,7 @@ class CometSparkSessionExtensions s"Partition schema $readPartitionSchema is not supported") // Comet does not support pushedAggregate val info3 = createMessage( - getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined, + scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isDefined, "Comet does not support pushed aggregate") withInfos(scanExec, Seq(info1, info2, info3).flatten.toSet) scanExec @@ -992,8 +992,7 @@ object CometSparkSessionExtensions extends Logging { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType | StringType | _: DecimalType | DateType | TimestampType => true - // `TimestampNTZType` is private in Spark 3.2. - case t: DataType if t.typeName == "timestamp_ntz" && !isSpark32 => true + case t: DataType if t.typeName == "timestamp_ntz" => true case dt => logInfo(s"Comet extension is disabled because data type $dt is not supported") false @@ -1015,11 +1014,6 @@ object CometSparkSessionExtensions extends Logging { } } - /** Used for operations that weren't available in Spark 3.2 */ - def isSpark32: Boolean = { - org.apache.spark.SPARK_VERSION.matches("3\\.2\\..*") - } - def isSpark33Plus: Boolean = { org.apache.spark.SPARK_VERSION >= "3.3" } diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala index 58c2aeb41..17844aba8 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala @@ -45,8 +45,8 @@ import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus import org.apache.comet.shims.ShimSQLConf /** - * Copied from Spark 3.2 & 3.4, in order to fix Parquet shading issue. TODO: find a way to remove - * this duplication + * Copied from Spark 3.4, in order to fix Parquet shading issue. TODO: find a way to remove this + * duplication * * Some utility function to convert Spark data source filters to Parquet filters. */ diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 7c4f5f251..67ecfe52d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf -import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark32, isSpark34Plus, withInfo} +import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark34Plus, withInfo} import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, Unsupported} import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo} @@ -63,7 +63,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: DecimalType | _: DateType | _: BooleanType | _: NullType => true - // `TimestampNTZType` is private in Spark 3.2. case dt if dt.typeName == "timestamp_ntz" => true case dt => emitWarning(s"unsupported Spark data type: $dt") @@ -1413,7 +1412,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case UnaryExpression(child) if expr.prettyName == "promote_precision" => - // `UnaryExpression` includes `PromotePrecision` for Spark 3.2 & 3.3 + // `UnaryExpression` includes `PromotePrecision` for Spark 3.3 // `PromotePrecision` is just a wrapper, don't need to serialize it. exprToProtoInternal(child, inputs) @@ -1518,7 +1517,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim optExprWithInfo(optExpr, expr, child) - case e: Unhex if !isSpark32 => + case e: Unhex => val unHex = unhexSerde(e) val childExpr = exprToProtoInternal(unHex._1, inputs) @@ -1585,9 +1584,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim val optExpr = scalarExprToProto("pow", leftExpr, rightExpr) optExprWithInfo(optExpr, expr, left, right) - // round function for Spark 3.2 does not allow negative round target scale. In addition, - // it has different result precision/scale for decimals. Supporting only 3.3 and above. - case r: Round if !isSpark32 => + case r: Round => // _scale s a constant, copied from Spark's RoundBase because it is a protected val val scaleV: Any = r.scale.eval(EmptyRow) val _scale: Int = scaleV.asInstanceOf[Int] @@ -2066,7 +2063,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim childExpr) optExprWithInfo(optExpr, expr, child) - case b @ BinaryExpression(_, _) if isBloomFilterMightContain(b) => + case b @ BloomFilterMightContain(_, _) => val bloomFilter = b.left val value = b.right val bloomFilterExpr = exprToProtoInternal(bloomFilter, inputs) @@ -2244,7 +2241,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: DateType | _: DecimalType | _: BooleanType => true - // `TimestampNTZType` is private in Spark 3.2/3.3. + // `TimestampNTZType` is private in Spark 3.3. case dt if dt.typeName == "timestamp_ntz" => true case _ => false } @@ -2322,12 +2319,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim if (childOp.nonEmpty && globalLimitExec.limit >= 0) { val limitBuilder = OperatorOuterClass.Limit.newBuilder() - // Spark 3.2 doesn't support offset for GlobalLimit, but newer Spark versions - // support it. Before we upgrade to Spark 3.3, just set it zero. // TODO: Spark 3.3 might have negative limit (-1) for Offset usage. // When we upgrade to Spark 3.3., we need to address it here. limitBuilder.setLimit(globalLimitExec.limit) - limitBuilder.setOffset(0) Some(result.setLimit(limitBuilder).build()) } else { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala index d6c3c87a3..82ebed95d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala @@ -44,6 +44,10 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres wrapped.logicalLink.foreach(setLogicalLink) + def keyGroupedPartitioning: Option[Seq[Expression]] = wrapped.keyGroupedPartitioning + + def inputPartitions: Seq[InputPartition] = wrapped.inputPartitions + override lazy val inputRDD: RDD[InternalRow] = wrappedScan.inputRDD override def doExecuteColumnar(): RDD[ColumnarBatch] = { @@ -144,11 +148,7 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres } } - // Intentionally omitting the return type as it is different depending on Spark version - // Spark 3.2.x Seq[InputPartition] - // Spark 3.3.x Seq[Seq[InputPartition]] - // TODO: add back the return type after dropping Spark 3.2.0 support - @transient override lazy val partitions = wrappedScan.partitions + @transient override lazy val partitions: Seq[Seq[InputPartition]] = wrappedScan.partitions override def supportsColumnar: Boolean = true } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 9a5b55d65..ca99e36b8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.comet.shims.ShimCometScanExec import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch @@ -44,7 +45,6 @@ import org.apache.spark.util.collection._ import org.apache.comet.{CometConf, MetricsSupport} import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory} -import org.apache.comet.shims.ShimFileFormat /** * Comet physical scan node for DataSource V1. Most of the code here follow Spark's @@ -150,7 +150,7 @@ case class CometScanExec( lazy val inputRDD: RDD[InternalRow] = { val options = relation.options + - (ShimFileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString) + (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString) val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -402,7 +402,7 @@ case class CometScanExec( new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf), metrics) - newDataSourceRDD( + new DataSourceRDD( fsRelation.sparkSession.sparkContext, partitions.map(Seq(_)), partitionReaderFactory, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 3f4d7bfd3..aabe3c350 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -583,14 +583,14 @@ class CometShuffleWriteProcessor( } /** - * Copied from Spark `PartitionIdPassthrough` as it is private in Spark 3.2. + * Copied from Spark `PartitionIdPassthrough` as it is private in Spark 3.3. */ private[spark] class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } /** - * Copied from Spark `ConstantPartitioner` as it doesn't exist in Spark 3.2. + * Copied from Spark `ConstantPartitioner` as it doesn't exist in Spark 3.3. */ private[spark] class ConstantPartitioner extends Partitioner { override def numPartitions: Int = 1 diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala b/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala index 996526e55..f28ca2bba 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala @@ -88,7 +88,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper { } } - // Copied from Spark 3.4+ to make it available in Spark 3.2+. + // Copied from Spark 3.4+ to make it available in Spark 3.3+. def multiTransformDown(expr: Expression)( rule: PartialFunction[Expression, Seq[Expression]]): Stream[Expression] = { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala index 8c6f0af18..d584e8609 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.UnaryExecNode * satisfies distribution requirements. * * This is copied from Spark's `PartitioningPreservingUnaryExecNode` because it is only available - * in Spark 3.4+. This is a workaround to make it available in Spark 3.2+. + * in Spark 3.4+. This is a workaround to make it available in Spark 3.3+. */ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with AliasAwareOutputExpression { final override def outputPartitioning: Partitioning = { diff --git a/spark/src/main/spark-3.2/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.2/org/apache/comet/shims/CometExprShim.scala deleted file mode 100644 index 2c6f6ccf4..000000000 --- a/spark/src/main/spark-3.2/org/apache/comet/shims/CometExprShim.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.comet.shims - -import org.apache.comet.expressions.CometEvalMode -import org.apache.spark.sql.catalyst.expressions._ - -/** - * `CometExprShim` acts as a shim for for parsing expressions from different Spark versions. - */ -trait CometExprShim { - /** - * Returns a tuple of expressions for the `unhex` function. - */ - protected def unhexSerde(unhex: Unhex): (Expression, Expression) = { - (unhex.child, Literal(false)) - } - - protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalMode.fromBoolean(c.ansiEnabled) -} - diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala index 9e7cc3ba5..49e931110 100644 --- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala +++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala @@ -19,24 +19,12 @@ package org.apache.comet.shims -import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder} -import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.execution.datasources.v2.BatchScanExec trait ShimCometBatchScanExec { def wrapped: BatchScanExec - // Only for Spark 3.3+ - def keyGroupedPartitioning: Option[Seq[Expression]] = wrapped.getClass.getDeclaredMethods - .filter(_.getName == "keyGroupedPartitioning") - .flatMap(_.invoke(wrapped).asInstanceOf[Option[Seq[Expression]]]) - .headOption - - // Only for Spark 3.3+ - def inputPartitions: Seq[InputPartition] = wrapped.getClass.getDeclaredMethods - .filter(_.getName == "inputPartitions") - .flatMap(_.invoke(wrapped).asInstanceOf[Seq[InputPartition]]) - // Only for Spark 3.4+ def ordering: Option[Seq[SortOrder]] = wrapped.getClass.getDeclaredMethods .filter(_.getName == "ordering") diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala index eef0ee9d5..442bb9e58 100644 --- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala +++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala @@ -27,9 +27,9 @@ trait ShimCometBroadcastHashJoinExec { /** * Returns the expressions that are used for hash partitioning including `HashPartitioning` and * `CoalescedHashPartitioning`. They shares same trait `HashPartitioningLike` since Spark 3.4, - * but Spark 3.2/3.3 doesn't have `HashPartitioningLike` and `CoalescedHashPartitioning`. + * but Spark 3.3 doesn't have `HashPartitioningLike` and `CoalescedHashPartitioning`. * - * TODO: remove after dropping Spark 3.2 and 3.3 support. + * TODO: remove after dropping Spark 3.3 support. */ def getHashPartitioningLikeExpressions(partitioning: Partitioning): Seq[Expression] = { partitioning.getClass.getDeclaredMethods diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala index 350aeb9f0..965b6851e 100644 --- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala +++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.types.{StructField, StructType} trait ShimCometShuffleExchangeExec { - // TODO: remove after dropping Spark 3.2 and 3.3 support + // TODO: remove after dropping Spark 3.3 support def apply(s: ShuffleExchangeExec, shuffleType: ShuffleType): CometShuffleExchangeExec = { val advisoryPartitionSize = s.getClass.getDeclaredMethods .filter(_.getName == "advisoryPartitionSize") diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala index 377485335..c8aeacf2a 100644 --- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala +++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala @@ -19,22 +19,11 @@ package org.apache.comet.shims -import org.apache.spark.sql.connector.expressions.aggregate.Aggregation import org.apache.spark.sql.execution.{LimitExec, QueryExecution, SparkPlan} -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan trait ShimCometSparkSessionExtensions { /** - * TODO: delete after dropping Spark 3.2.0 support and directly call scan.pushedAggregate - */ - def getPushedAggregate(scan: ParquetScan): Option[Aggregation] = scan.getClass.getDeclaredFields - .filter(_.getName == "pushedAggregate") - .map { a => a.setAccessible(true); a } - .flatMap(_.get(scan).asInstanceOf[Option[Aggregation]]) - .headOption - - /** - * TODO: delete after dropping Spark 3.2 and 3.3 support + * TODO: delete after dropping Spark 3.3 support */ def getOffset(limit: LimitExec): Int = getOffsetOpt(limit).getOrElse(0) diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala index 2e1c681c7..983e099f8 100644 --- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.TakeOrderedAndProjectExec trait ShimCometTakeOrderedAndProjectExec { /** - * TODO: delete after dropping Spark 3.2 and 3.3 support + * TODO: delete after dropping Spark 3.3 support */ protected def getOffset(plan: TakeOrderedAndProjectExec): Option[Int] = { plan.getClass.getDeclaredFields diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala index b92d3fc6a..1b0996d9d 100644 --- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala +++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala @@ -19,7 +19,7 @@ package org.apache.comet.shims -import org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic, BinaryExpression} +import org.apache.spark.sql.catalyst.expressions.BinaryArithmetic import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate trait ShimQueryPlanSerde { @@ -45,7 +45,7 @@ trait ShimQueryPlanSerde { } } - // TODO: delete after drop Spark 3.2/3.3 support + // TODO: delete after drop Spark 3.3 support // This method is used to check if the aggregate function is in legacy mode. // EvalMode is an enum object in Spark 3.4. def isLegacyMode(aggregate: DeclarativeAggregate): Boolean = { @@ -62,9 +62,4 @@ trait ShimQueryPlanSerde { "legacy".equalsIgnoreCase(evalMode.head.toString) } } - - // TODO: delete after drop Spark 3.2 support - def isBloomFilterMightContain(binary: BinaryExpression): Boolean = { - binary.getClass.getName == "org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain" - } } diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala index c3d0c56e5..579db51c3 100644 --- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala +++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala @@ -28,7 +28,7 @@ trait ShimSQLConf { * Spark 3.4 renamed parquetFilterPushDownStringStartWith to * parquetFilterPushDownStringPredicate * - * TODO: delete after dropping Spark 3.2 & 3.3 support and simply use + * TODO: delete after dropping Spark 3.3 support and simply use * parquetFilterPushDownStringPredicate */ protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean = diff --git a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala index aede47951..afcf653b4 100644 --- a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala +++ b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala @@ -25,7 +25,7 @@ import org.apache.spark.SparkContext import org.apache.spark.broadcast.Broadcast trait ShimCometBroadcastExchangeExec { - // TODO: remove after dropping Spark 3.2 and 3.3 support + // TODO: remove after dropping Spark 3.3 support protected def doBroadcast[T: ClassTag](sparkContext: SparkContext, value: T): Broadcast[Any] = { // Spark 3.4 has new API `broadcastInternal` to broadcast the relation without caching the // unserialized object. diff --git a/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index 02b97f9fb..65fb59a38 100644 --- a/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -21,32 +21,27 @@ package org.apache.spark.sql.comet.shims import org.apache.comet.shims.ShimFileFormat -import scala.language.implicitConversions - import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.SparkException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} +import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil} import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HadoopFsRelation, PartitionDirectory, PartitionedFile} -import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions -import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{LongType, StructField, StructType} trait ShimCometScanExec { def wrapped: FileSourceScanExec - // TODO: remove after dropping Spark 3.2 support and directly call wrapped.metadataColumns + // TODO: remove after dropping Spark 3.3 support lazy val metadataColumns: Seq[AttributeReference] = wrapped.getClass.getDeclaredMethods .filter(_.getName == "metadataColumns") .map { a => a.setAccessible(true); a } .flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]]) - // TODO: remove after dropping Spark 3.2 and 3.3 support and directly call + // TODO: remove after dropping Spark 3.3 support and directly call // wrapped.fileConstantMetadataColumns lazy val fileConstantMetadataColumns: Seq[AttributeReference] = wrapped.getClass.getDeclaredMethods @@ -54,18 +49,7 @@ trait ShimCometScanExec { .map { a => a.setAccessible(true); a } .flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]]) - // TODO: remove after dropping Spark 3.2 support and directly call new DataSourceRDD - protected def newDataSourceRDD( - sc: SparkContext, - inputPartitions: Seq[Seq[InputPartition]], - partitionReaderFactory: PartitionReaderFactory, - columnarReads: Boolean, - customMetrics: Map[String, SQLMetric]): DataSourceRDD = { - implicit def flattenSeq(p: Seq[Seq[InputPartition]]): Seq[InputPartition] = p.flatten - new DataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads, customMetrics) - } - - // TODO: remove after dropping Spark 3.2 support and directly call new FileScanRDD + // TODO: remove after dropping Spark 3.3 and 3.4 support and directly call new FileScanRDD protected def newFileScanRDD( fsRelation: HadoopFsRelation, readFunction: PartitionedFile => Iterator[InternalRow], @@ -74,10 +58,9 @@ trait ShimCometScanExec { options: ParquetOptions): FileScanRDD = classOf[FileScanRDD].getDeclaredConstructors // Prevent to pick up incorrect constructors from any custom Spark forks. - .filter(c => List(3, 5, 6).contains(c.getParameterCount()) ) + .filter(c => List(5, 6).contains(c.getParameterCount())) .map { c => c.getParameterCount match { - case 3 => c.newInstance(fsRelation.sparkSession, readFunction, filePartitions) case 5 => c.newInstance(fsRelation.sparkSession, readFunction, filePartitions, readSchema, metadataColumns) case 6 => @@ -93,19 +76,15 @@ trait ShimCometScanExec { .last .asInstanceOf[FileScanRDD] - // TODO: remove after dropping Spark 3.2 and 3.3 support and directly call + // TODO: remove after dropping Spark 3.3 support and directly call // QueryExecutionErrors.SparkException protected def invalidBucketFile(path: String, sparkVersion: String): Throwable = { - if (sparkVersion >= "3.3") { - val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path) else Array(path) - classOf[SparkException].getDeclaredConstructors - .filter(_.getParameterCount == 3) - .map(_.newInstance("INVALID_BUCKET_FILE", messageParameters, null)) - .last - .asInstanceOf[SparkException] - } else { // Spark 3.2 - new IllegalStateException(s"Invalid bucket file ${path}") - } + val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path) else Array(path) + classOf[SparkException].getDeclaredConstructors + .filter(_.getParameterCount == 3) + .map(_.newInstance("INVALID_BUCKET_FILE", messageParameters, null)) + .last + .asInstanceOf[SparkException] } // Copied from Spark 3.4 RowIndexUtil due to PARQUET-2161 (tracked in SPARK-39634) diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala index 167b539f8..d41502f6e 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala @@ -19,16 +19,11 @@ package org.apache.comet.shims -import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder} -import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.execution.datasources.v2.BatchScanExec trait ShimCometBatchScanExec { def wrapped: BatchScanExec - def keyGroupedPartitioning: Option[Seq[Expression]] = wrapped.keyGroupedPartitioning - - def inputPartitions: Seq[InputPartition] = wrapped.inputPartitions - def ordering: Option[Seq[SortOrder]] = wrapped.ordering } diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index 543116c10..48b9c8086 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -24,15 +24,11 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions -import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil} import org.apache.spark.sql.types.StructType -import org.apache.spark.SparkContext trait ShimCometScanExec { def wrapped: FileSourceScanExec @@ -40,14 +36,6 @@ trait ShimCometScanExec { lazy val fileConstantMetadataColumns: Seq[AttributeReference] = wrapped.fileConstantMetadataColumns - protected def newDataSourceRDD( - sc: SparkContext, - inputPartitions: Seq[Seq[InputPartition]], - partitionReaderFactory: PartitionReaderFactory, - columnarReads: Boolean, - customMetrics: Map[String, SQLMetric]): DataSourceRDD = - new DataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads, customMetrics) - protected def newFileScanRDD( fsRelation: HadoopFsRelation, readFunction: PartitionedFile => Iterator[InternalRow], diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 31d718d44..885d63858 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -571,9 +571,6 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast StringType to DateType") { - // error message for invalid dates in Spark 3.2 not supported by Comet see below issue. - // https://github.com/apache/datafusion-comet/issues/440 - assume(CometSparkSessionExtensions.isSpark33Plus) val validDates = Seq( "262142-01-01", "262142-01-01 ", @@ -956,7 +953,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { private def castTest(input: DataFrame, toType: DataType): Unit = { - // we now support the TryCast expression in Spark 3.2 and 3.3 + // we now support the TryCast expression in Spark 3.3 withTempPath { dir => val data = roundtripParquet(input, dir).coalesce(1) data.createOrReplaceTempView("t") @@ -1002,7 +999,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } else if (CometSparkSessionExtensions.isSpark34Plus) { // for Spark 3.4 we expect to reproduce the error message exactly assert(cometMessage == sparkMessage) - } else if (CometSparkSessionExtensions.isSpark33Plus) { + } else { // for Spark 3.3 we just need to strip the prefix from the Comet message // before comparing val cometMessageModified = cometMessage @@ -1015,19 +1012,6 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } else { assert(cometMessageModified == sparkMessage) } - } else { - // for Spark 3.2 we just make sure we are seeing a similar type of error - if (sparkMessage.contains("causes overflow")) { - assert(cometMessage.contains("due to an overflow")) - } else if (sparkMessage.contains("cannot be represented as")) { - assert(cometMessage.contains("cannot be represented as")) - } else { - // assume that this is an invalid input message in the form: - // `invalid input syntax for type numeric: -9223372036854775809` - // we just check that the Comet message contains the same literal value - val sparkInvalidValue = sparkMessage.substring(sparkMessage.indexOf(':') + 2) - assert(cometMessage.contains(sparkInvalidValue)) - } } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 1c06e1dac..8dbfb71b3 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.sql.types.{Decimal, DecimalType} -import org.apache.comet.CometSparkSessionExtensions.{isSpark32, isSpark33Plus, isSpark34Plus} +import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plus} class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -51,7 +51,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("decimals divide by zero") { - // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal divide operation + // TODO: enable Spark 3.3 tests after supporting decimal divide operation assume(isSpark34Plus) Seq(true, false).foreach { dictionary => @@ -293,7 +293,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast timestamp and timestamp_ntz to string") { - // TODO: make the test pass for Spark 3.2 & 3.3 + // TODO: make the test pass for Spark 3.3 assume(isSpark34Plus) withSQLConf( @@ -318,7 +318,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast timestamp and timestamp_ntz to long, date") { - // TODO: make the test pass for Spark 3.2 & 3.3 + // TODO: make the test pass for Spark 3.3 assume(isSpark34Plus) withSQLConf( @@ -411,7 +411,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc with timestamp_ntz") { - assume(!isSpark32, "timestamp functions for timestamp_ntz have incorrect behavior in 3.2") withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => @@ -619,8 +618,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("contains") { - assume(!isSpark32) - val table = "names" withTable(table) { sql(s"create table $table(id int, name varchar(20)) using parquet") @@ -637,8 +634,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("startswith") { - assume(!isSpark32) - val table = "names" withTable(table) { sql(s"create table $table(id int, name varchar(20)) using parquet") @@ -655,8 +650,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("endswith") { - assume(!isSpark32) - val table = "names" withTable(table) { sql(s"create table $table(id int, name varchar(20)) using parquet") @@ -693,7 +686,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("decimals arithmetic and comparison") { - // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal reminder operation + // TODO: enable Spark 3.3 tests after supporting decimal reminder operation assume(isSpark34Plus) def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = { @@ -958,10 +951,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("round") { - assume( - !isSpark32, - "round function for Spark 3.2 does not allow negative target scale and has different result precision/scale for decimals") - Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") @@ -1177,11 +1166,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("unhex") { - // When running against Spark 3.2, we include a bug fix for https://issues.apache.org/jira/browse/SPARK-40924 that - // was added in Spark 3.3, so although Comet's behavior is more correct when running against Spark 3.2, it is not - // the same (and this only applies to edge cases with hex inputs with lengths that are not divisible by 2) - assume(!isSpark32, "unhex function has incorrect behavior in 3.2") - val table = "unhex_table" withTable(table) { sql(s"create table $table(col string) using parquet") diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index ca7bc7df0..93be11110 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -800,7 +800,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("final decimal avg") { - // TODO: enable decimal average for Spark 3.2 & 3.3 + // TODO: enable decimal average for Spark 3.3 assume(isSpark34Plus) withSQLConf( diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 0b37f5ccf..bc18d8f10 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -184,7 +184,6 @@ abstract class ParquetReadSuite extends CometTestBase { withTempDir { dir => val path = new Path(dir.toURI.toString, "part-r-0.parquet") val expected = makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - val useLocalDateTime = spark.version >= "3.3" readParquetFile(path.toString) { df => checkAnswer( df.select($"_0", $"_1", $"_2", $"_3", $"_4", $"_5"), @@ -192,17 +191,11 @@ abstract class ParquetReadSuite extends CometTestBase { case None => Row(null, null, null, null, null, null) case Some(i) => - // use `LocalDateTime` for `TimestampNTZType` with Spark 3.3 and above. At the moment, - // Spark reads Parquet timestamp values into `Timestamp` (with local timezone) - // regardless of whether `isAdjustedToUTC` is true or false. See SPARK-36182. - // TODO: make `LocalDateTime` default after dropping Spark 3.2.0 support val ts = new java.sql.Timestamp(i) - val ldt = if (useLocalDateTime) { - ts.toLocalDateTime - .atZone(ZoneId.systemDefault()) - .withZoneSameInstant(ZoneOffset.UTC) - .toLocalDateTime - } else ts + val ldt = ts.toLocalDateTime + .atZone(ZoneId.systemDefault()) + .withZoneSameInstant(ZoneOffset.UTC) + .toLocalDateTime Row(ts, ts, ts, ldt, ts, ldt) }) } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala index ec87f19e9..2638a0e6a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, stringToFile} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession} +import org.apache.spark.sql.test.TestSparkSession import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus @@ -50,7 +50,7 @@ import org.apache.comet.shims.ShimCometTPCHQuerySuite * ./mvnw -Dsuites=org.apache.spark.sql.CometTPCHQuerySuite test * }}} */ -class CometTPCHQuerySuite extends QueryTest with CometTPCBase with ShimCometTPCHQuerySuite { +class CometTPCHQuerySuite extends QueryTest with TPCBase with ShimCometTPCHQuerySuite { private val tpchDataPath = sys.env.get("SPARK_TPCH_DATA") @@ -273,40 +273,7 @@ class CometTPCHQuerySuite extends QueryTest with CometTPCBase with ShimCometTPCH ignore("skipped because env `SPARK_TPCH_DATA` is not set") {} } - // TODO: remove once Spark 3.2 & 3.3 is no longer supported + // TODO: remove once Spark 3.3 is no longer supported private def shouldRegenerateGoldenFiles: Boolean = System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1" } - -/** - * `TPCBase` doesn't exist in Spark 3.2. TODO: remove once Spark 3.2 is no longer supported - */ -trait CometTPCBase extends SharedSparkSession { - protected def injectStats: Boolean = false - - override protected def sparkConf: SparkConf = { - if (injectStats) { - super.sparkConf - .set(SQLConf.MAX_TO_STRING_FIELDS, Int.MaxValue) - .set(SQLConf.CBO_ENABLED, true) - .set(SQLConf.PLAN_STATS_ENABLED, true) - .set(SQLConf.JOIN_REORDER_ENABLED, true) - } else { - super.sparkConf.set(SQLConf.MAX_TO_STRING_FIELDS, Int.MaxValue) - } - } - - override def beforeAll(): Unit = { - super.beforeAll() - createTables() - } - - override def afterAll(): Unit = { - dropTables() - super.afterAll() - } - - protected def createTables(): Unit - - protected def dropTables(): Unit -} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala index fc4549445..b47de19ba 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala @@ -24,8 +24,8 @@ import java.io.File import scala.collection.JavaConverters._ import scala.util.Random +import org.apache.spark.TestUtils import org.apache.spark.benchmark.Benchmark -import org.apache.spark.comet.shims.ShimTestUtils import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector @@ -124,7 +124,7 @@ object CometReadBenchmark extends CometBenchmarkBase { (col: ColumnVector, i: Int) => longSum += col.getUTF8String(i).toLongExact } - val files = ShimTestUtils.listDirectory(new File(dir, "parquetV1")) + val files = TestUtils.listDirectory(new File(dir, "parquetV1")) sqlBenchmark.addCase("ParquetReader Spark") { _ => files.map(_.asInstanceOf[String]).foreach { p => diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index 691d2cd63..5e9864f08 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -292,7 +292,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa new TestSparkSession(new SparkContext("local[1]", this.getClass.getCanonicalName, conf)) } - // TODO: remove once Spark 3.2 & 3.3 is no longer supported + // TODO: remove once Spark 3.3 is no longer supported private val shouldRegenerateGoldenFiles: Boolean = System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1" } diff --git a/spark/src/test/spark-3.x/org/apache/spark/comet/shims/ShimTestUtils.scala b/spark/src/test/spark-3.x/org/apache/spark/comet/shims/ShimTestUtils.scala deleted file mode 100644 index fcb543f9b..000000000 --- a/spark/src/test/spark-3.x/org/apache/spark/comet/shims/ShimTestUtils.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.comet.shims - -import java.io.File -import scala.collection.mutable.ArrayBuffer - -object ShimTestUtils { - - /** - * Spark 3.3.0 moved {{{SpecificParquetRecordReaderBase.listDirectory}}} to - * {{{org.apache.spark.TestUtils.listDirectory}}}. Copying it here to bridge the difference - * between Spark 3.2.0 and 3.3.0 TODO: remove after dropping Spark 3.2.0 support and use - * {{{org.apache.spark.TestUtils.listDirectory}}} - */ - def listDirectory(path: File): Array[String] = { - val result = ArrayBuffer.empty[String] - if (path.isDirectory) { - path.listFiles.foreach(f => result.appendAll(listDirectory(f))) - } else { - val c = path.getName.charAt(0) - if (c != '.' && c != '_') result.append(path.getAbsolutePath) - } - result.toArray - } -}