diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 04735b5d1..d64a3a3ae 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -537,23 +537,18 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf(CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) { // Test all combinations of different aggregation & group-by types - (1 to 4).foreach { col => - (1 to 14).foreach { gCol => - withView("v") { - sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _$col FROM tbl ORDER BY _$col") - checkSparkAnswer(s"SELECT _g$gCol, FIRST(_$col) FROM v GROUP BY _g$gCol") - checkSparkAnswer(s"SELECT _g$gCol, LAST(_$col) FROM v GROUP BY _g$gCol") - } - checkSparkAnswer(s"SELECT _g$gCol, SUM(_$col) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer( - s"SELECT _g$gCol, SUM(DISTINCT _$col) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer(s"SELECT _g$gCol, COUNT(_$col) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer( - s"SELECT _g$gCol, COUNT(DISTINCT _$col) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer( - s"SELECT _g$gCol, MIN(_$col), MAX(_$col) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer(s"SELECT _g$gCol, AVG(_$col) FROM tbl GROUP BY _g$gCol") + (1 to 14).foreach { gCol => + withView("v") { + sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _1, _2, _3, _4 " + + "FROM tbl ORDER BY _1, _2, _3, _4") + checkSparkAnswer(s"SELECT _g$gCol, FIRST(_1), FIRST(_2), FIRST(_3), " + + s"FIRST(_4), LAST(_1), LAST(_2), LAST(_3), LAST(_4) FROM v GROUP BY _g$gCol") } + checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2), COUNT(_3), COUNT(_4), " + + s"MIN(_1), MAX(_4), AVG(_2), AVG(_4) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer(s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer( + s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY _g$gCol") } } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala similarity index 54% rename from spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala rename to spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 0d7c73d42..fec6197d6 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -24,28 +24,23 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.{Partitioner, SparkConf} -import org.apache.spark.sql.{CometTestBase, Row} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, CometShuffleExchangeExec, CometShuffleManager} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf -import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus - -abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPlanHelper { +abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper { protected val adaptiveExecutionEnabled: Boolean - - protected val fastMergeEnabled: Boolean = true - protected val numElementsForceSpillThreshold: Int = 10 override protected def sparkConf: SparkConf = { val conf = super.sparkConf conf .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveExecutionEnabled.toString) - .set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString) } protected val asyncShuffleEnable: Boolean @@ -55,7 +50,10 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla super.test(testName, testTags: _*) { withSQLConf( CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncShuffleEnable.toString, - CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString) { + CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString, + CometConf.COMET_EXEC_ENABLED.key -> "false", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { testFun } } @@ -63,33 +61,10 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla import testImplicits._ - test("Native shuffle with dictionary of binary") { - Seq("true", "false").foreach { dictionaryEnabled => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { - withParquetTable( - (0 until 1000).map(i => (i % 5, (i % 5).toString.getBytes())), - "tbl", - dictionaryEnabled.toBoolean) { - val shuffled = sql("SELECT * FROM tbl").repartition(2, $"_2") - - checkCometExchange(shuffled, 1, true) - checkSparkAnswer(shuffled) - } - } - } - } - test("columnar shuffle on nested struct including nulls") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => (i, Seq((i + 1, i.toString), null, (i + 3, (i + 3).toString)), i + 1)), @@ -99,8 +74,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla .repartition(numPartitions, $"_1", $"_2", $"_3") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } } } @@ -110,11 +84,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on struct including nulls") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { val data: Seq[(Int, (Int, String))] = Seq((1, (0, "1")), (2, (3, "3")), (3, null)) withParquetTable(data, "tbl") { @@ -123,8 +93,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } } } @@ -137,8 +106,6 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq("1.0", "10.0").foreach { ratio => withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> execEnabled, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable((0 until 50).map(i => (Map(Seq(i, i + 1) -> i), i + 1)), "tbl") { val df = sql("SELECT * FROM tbl") @@ -146,9 +113,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_2") - checkSparkAnswer(df) // Array map key array element fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } withParquetTable((0 until 50).map(i => (Map(i -> Seq(i, i + 1)), i + 1)), "tbl") { @@ -157,9 +123,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_2") - checkSparkAnswer(df) // Array map value array element fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } withParquetTable((0 until 50).map(i => (Map((i, i.toString) -> i), i + 1)), "tbl") { @@ -168,9 +133,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_2") - checkSparkAnswer(df) // Struct map key array element fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } withParquetTable((0 until 50).map(i => (Map(i -> (i, i.toString)), i + 1)), "tbl") { @@ -179,9 +143,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_2") - checkSparkAnswer(df) // Struct map value array element fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } } } @@ -195,8 +158,6 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq("1.0", "10.0").foreach { ratio => withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> execEnabled, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => ((Seq(Map(1 -> i)), Map(2 -> i), Map(3 -> i)), i + 1)), @@ -206,9 +167,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_2") - checkSparkAnswer(df) // Map array element fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } } } @@ -220,9 +180,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla withSQLConf( // AQE has `ShuffleStage` which is a leaf node which blocks // collecting `CometShuffleExchangeExec` node. - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl") val shuffled = df @@ -280,11 +238,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { // Boolean key withParquetTable(genTuples(50, Seq(true, false)), "tbl") { val df = sql("SELECT * FROM tbl") @@ -305,8 +259,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Byte key @@ -329,8 +282,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Short key @@ -353,8 +305,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Int key @@ -377,8 +328,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Long key @@ -401,8 +351,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Float key @@ -425,8 +374,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Double key @@ -449,8 +397,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Date key @@ -475,8 +422,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Timestamp key @@ -503,8 +449,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Decimal key @@ -531,8 +476,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // String key @@ -555,8 +499,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Binary key @@ -581,8 +524,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } } } @@ -592,11 +534,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on array") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => ( @@ -617,8 +555,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla .repartition(numPartitions, $"_1", $"_2", $"_3", $"_4", $"_5") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } } } @@ -629,11 +566,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq("false", "true").foreach { execEnabled => Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => (Seq(Seq(i + 1), Seq(i + 2), Seq(i + 3)), i + 1)), "tbl") { @@ -642,9 +575,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_1") - checkSparkAnswer(df) // Nested array fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } } } @@ -655,11 +587,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on nested struct") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => ((i, 2.toString, (i + 1).toLong, (3.toString, i + 1, (i + 2).toLong)), i + 1)), @@ -669,44 +597,20 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } } } } } - test("fix: Dictionary arrays imported from native should not be overridden") { - Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_BATCH_SIZE.key -> "10", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 50).map(i => (1.toString, 2.toString, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" === 1.toString) - .repartition(numPartitions, $"_1", $"_2") - .sortWithinPartitions($"_1") - checkSparkAnswerAndOperator(df) - } - } - } - } - test("fix: closing sliced dictionary Comet vector should not close dictionary array") { (0 to 10).foreach { _ => withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_BATCH_SIZE.key -> "10", - CometConf.COMET_EXEC_ENABLED.key -> "false", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.1", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") { val table1 = (0 until 1000) .map(i => (111111.toString, 2222222.toString, 3333333.toString, i.toLong)) .toDF("a", "b", "c", "d") @@ -718,7 +622,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla val df = sql( "select a, b, count(distinct h) from tbl_a, tbl_b " + "where c = e and b = '2222222' and a not like '2' group by a, b") - checkSparkAnswer(df) + checkShuffleAnswer(df, 4) } } } @@ -727,11 +631,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("fix: Dictionary field should have distinct dict_id") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0") { withParquetTable( (0 until 10000).map(i => (1.toString, 2.toString, (i + 1).toLong)), "tbl") { @@ -740,7 +640,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla "SELECT * FROM tbl") .count()) val shuffled = sql("SELECT * FROM tbl").repartition(numPartitions, $"_1") - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } } @@ -748,18 +648,14 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("dictionary shuffle") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0") { withParquetTable((0 until 10000).map(i => (1.toString, (i + 1).toLong)), "tbl") { assert( sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() == sql( "SELECT * FROM tbl") .count()) val shuffled = sql("SELECT * FROM tbl").select($"_1").repartition(numPartitions, $"_1") - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } } @@ -767,33 +663,24 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("dictionary shuffle: fallback to string") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0") { withParquetTable((0 until 10000).map(i => (1.toString, (i + 1).toLong)), "tbl") { assert( sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() == sql( "SELECT * FROM tbl") .count()) val shuffled = sql("SELECT * FROM tbl").select($"_1").repartition(numPartitions, $"_1") - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } } } test("fix: inMemSorter should be reset after spilling") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") { - assert( - sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") - .count()) - } + withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") { + assert( + sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") + .count()) } } @@ -820,14 +707,9 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_18", $"_19", $"_20").foreach { col => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - readParquetFile(path.toString) { df => - val shuffled = df.select(col).repartition(numPartitions, col) - checkSparkAnswer(shuffled) - } + readParquetFile(path.toString) { df => + val shuffled = df.select(col).repartition(numPartitions, col) + checkShuffleAnswer(shuffled, 1) } } } @@ -836,17 +718,14 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("fix: StreamReader should always set useDecimal128 as true") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withTempPath { dir => val data = makeDecimalRDD(1000, DecimalType(12, 2), false) data.write.parquet(dir.getCanonicalPath) readParquetFile(dir.getCanonicalPath) { df => { val shuffled = df.repartition(numPartitions, $"dec") - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } } @@ -856,18 +735,13 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("fix: Native Unsafe decimal accessors return incorrect results") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withTempPath { dir => val data = makeDecimalRDD(1000, DecimalType(22, 2), false) data.write.parquet(dir.getCanonicalPath) readParquetFile(dir.getCanonicalPath) { df => - { - val shuffled = df.repartition(numPartitions, $"dec") - checkSparkAnswer(shuffled) - } + val shuffled = df.repartition(numPartitions, $"dec") + checkShuffleAnswer(shuffled, 1) } } } @@ -876,10 +750,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("Comet shuffle reader should respect spark.comet.batchSize") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") { assert( sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() == sql( @@ -889,14 +760,11 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } - test("Arrow shuffle should work with BatchScan") { + test("columnar shuffle should work with BatchScan") { withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> "", // Use DataSourceV2 SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", // Disable AQE - CometConf.COMET_SCAN_ENABLED.key -> "false", // Disable CometScan to use Spark BatchScan - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SCAN_ENABLED.key -> "false") { // Disable CometScan to use Spark BatchScan withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl") val shuffled = df @@ -913,131 +781,69 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("Columnar shuffle for large shuffle partition number") { Seq(10, 200, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") - val shuffled = df.repartitionByRange(numPartitions, $"_2") + val shuffled = df.repartitionByRange(numPartitions, $"_2") - val cometShuffleExecs = checkCometExchange(shuffled, 1, false) - // `CometSerializedShuffleHandle` is used for large shuffle partition number, - // i.e., sort-based shuffle writer - cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName - .contains("CometSerializedShuffleHandle") + val cometShuffleExecs = checkCometExchange(shuffled, 1, false) + // `CometSerializedShuffleHandle` is used for large shuffle partition number, + // i.e., sort-based shuffle writer + cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName + .contains("CometSerializedShuffleHandle") - checkSparkAnswer(shuffled) - } + checkSparkAnswer(shuffled) } } } - test("grouped aggregate: Comet shuffle") { - withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { - val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1") - checkCometExchange(df, 1, true) - checkSparkAnswerAndOperator(df) - } - } - } + test("hash-based columnar shuffle") { + Seq(10, 200, 201).foreach { numPartitions => + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") - test("hash shuffle: Comet shuffle") { - // Disable CometExec to explicit test Comet Arrow shuffle path - Seq(true, false).foreach { execEnabled => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled1 = df.repartition(10, $"_1") + val shuffled1 = + df.repartitionByRange(numPartitions, $"_2").limit(2).repartition(numPartitions, $"_1") - // If Comet execution is disabled, `Sort` operator is Spark operator - // and jvm arrow shuffle is applied. - checkCometExchange(shuffled1, 1, execEnabled) - checkSparkAnswer(shuffled1) + // 3 exchanges are expected: 1) shuffle to repartition by range, 2) shuffle to global limit, 3) hash shuffle + checkShuffleAnswer(shuffled1, 3) - val shuffled2 = df.repartition(10, $"_1", $"_2") + val shuffled2 = df + .repartitionByRange(numPartitions, $"_2") + .limit(2) + .repartition(numPartitions, $"_1", $"_2") - checkCometExchange(shuffled2, 1, execEnabled) - checkSparkAnswer(shuffled2) + checkShuffleAnswer(shuffled2, 3) - val shuffled3 = df.repartition(10, $"_2", $"_1") + val shuffled3 = df + .repartitionByRange(numPartitions, $"_2") + .limit(2) + .repartition(numPartitions, $"_2", $"_1") - checkCometExchange(shuffled3, 1, execEnabled) - checkSparkAnswer(shuffled3) - } + checkShuffleAnswer(shuffled3, 3) } } } - test("Comet shuffle: different data type") { - // Disable CometExec to explicit test Comet native shuffle path - Seq(true, false).foreach { execEnabled => - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - val all_types = if (isSpark34Plus) { - Seq( - $"_1", - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_13", - $"_14", - $"_15", - $"_16", - $"_17", - $"_18", - $"_19", - $"_20") - } else { - Seq( - $"_1", - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_13", - $"_15", - $"_16", - $"_18", - $"_19", - $"_20") - } - all_types.foreach { col => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - "parquet.enable.dictionary" -> dictionaryEnabled.toString) { - readParquetFile(path.toString) { df => - val shuffled = df - .select($"_1") - .repartition(10, col) - checkCometExchange(shuffled, 1, true) - if (execEnabled) { - checkSparkAnswerAndOperator(shuffled) - } else { - checkSparkAnswer(shuffled) - } + test("columnar shuffle: different data type") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 1000) + + Seq(10, 201).foreach { numPartitions => + (1 to 20).map(i => s"_$i").foreach { c => + readParquetFile(path.toString) { df => + val shuffled = df + .select($"_1") + .repartition(numPartitions, col(c)) + val cometShuffleExecs = checkCometExchange(shuffled, 1, false) + if (numPartitions > 200) { + // For sort-based shuffle writer + cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName + .contains("CometSerializedShuffleHandle") } + checkSparkAnswer(shuffled) } } } @@ -1045,275 +851,94 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } - test("hash shuffle: Comet columnar shuffle") { - Seq(10, 200, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") - - val shuffled1 = - df.repartitionByRange(numPartitions, $"_2").limit(2).repartition(numPartitions, $"_1") - - // 3 exchanges are expected: 1) shuffle to repartition by range, 2) shuffle to global limit, 3) hash shuffle - checkCometExchange(shuffled1, 3, false) - checkSparkAnswer(shuffled1) + test("native operator after columnar shuffle") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") - val shuffled2 = df - .repartitionByRange(numPartitions, $"_2") - .limit(2) - .repartition(numPartitions, $"_1", $"_2") + val shuffled1 = df + .repartition(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) - checkCometExchange(shuffled2, 3, false) - checkSparkAnswer(shuffled2) - - val shuffled3 = df - .repartitionByRange(numPartitions, $"_2") - .limit(2) - .repartition(numPartitions, $"_2", $"_1") - - checkCometExchange(shuffled3, 3, false) - checkSparkAnswer(shuffled3) - } - } - } - } + // 2 Comet shuffle exchanges are expected + checkShuffleAnswer(shuffled1, 2) - test("Comet columnar shuffle shuffle: different data type") { - Seq(10, 200, 201).foreach { numPartitions => - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - - Seq( - $"_1", - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_13", - $"_14", - $"_15", - $"_16", - $"_17", - $"_18", - $"_19", - $"_20").foreach { col => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - readParquetFile(path.toString) { df => - val shuffled = df - .select($"_1") - .repartition(numPartitions, col) - val cometShuffleExecs = checkCometExchange(shuffled, 1, false) - if (numPartitions > 200) { - // For sort-based shuffle writer - cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName - .contains("CometSerializedShuffleHandle") - } - checkSparkAnswer(shuffled) - } - } - } - } - } - } - } + val shuffled2 = df + .repartitionByRange(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) - test("Comet native operator after Comet shuffle") { - Seq(true, false).foreach { columnarShuffle => - withSQLConf( - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> columnarShuffle.toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") - - val shuffled1 = df - .repartition(10, $"_2") - .select($"_1", $"_1" + 1, $"_2" + 2) - .repartition(10, $"_1") - .filter($"_1" > 1) - - // 2 Comet shuffle exchanges are expected - checkCometExchange(shuffled1, 2, !columnarShuffle) - checkSparkAnswer(shuffled1) - - val shuffled2 = df - .repartitionByRange(10, $"_2") - .select($"_1", $"_1" + 1, $"_2" + 2) - .repartition(10, $"_1") - .filter($"_1" > 1) - - // 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled - if (columnarShuffle) { - checkCometExchange(shuffled2, 2, !columnarShuffle) - } else { - // Because the first exchange from the bottom is range exchange which native shuffle - // doesn't support. So Comet exec operators stop before the first exchange and thus - // there is no Comet exchange. - checkCometExchange(shuffled2, 0, true) - } - checkSparkAnswer(shuffled2) - } - } + // 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled + checkShuffleAnswer(shuffled2, 2) } } - test("Comet shuffle: single partition") { - Seq(true, false).foreach { execEnabled => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + test("columnar shuffle: single partition") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled = df.repartition(1) + val shuffled = df.repartition(1) - checkCometExchange(shuffled, 1, execEnabled) - checkSparkAnswer(shuffled) - } - } + checkShuffleAnswer(shuffled, 1) } } - test("fix: comet native shuffle with binary data") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl") - - val shuffled = df.repartition(1, $"binary") + test("sort-based columnar shuffle metrics") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + val shuffled = df.repartition(201, $"_1") - checkCometExchange(shuffled, 1, true) - checkSparkAnswer(shuffled) - } - } - } + checkShuffleAnswer(shuffled, 1) - test("Comet shuffle metrics") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled = df.repartition(10, $"_1") + // Materialize the shuffled data + shuffled.collect() + val metrics = find(shuffled.queryExecution.executedPlan) { + case _: CometShuffleExchangeExec => true + case _ => false + }.map(_.metrics).get - checkCometExchange(shuffled, 1, true) - checkSparkAnswer(shuffled) + assert(metrics.contains("shuffleRecordsWritten")) + assert(metrics("shuffleRecordsWritten").value == 5L) - // Materialize the shuffled data - shuffled.collect() - val metrics = find(shuffled.queryExecution.executedPlan) { - case _: CometShuffleExchangeExec => true - case _ => false - }.map(_.metrics).get + assert(metrics.contains("shuffleBytesWritten")) + assert(metrics("shuffleBytesWritten").value > 0) - assert(metrics.contains("shuffleRecordsWritten")) - assert(metrics("shuffleRecordsWritten").value == 5L) - } + assert(metrics.contains("shuffleWriteTime")) + assert(metrics("shuffleWriteTime").value > 0) } } - test("sort-based shuffle metrics") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled = df.repartition(201, $"_1") - - checkCometExchange(shuffled, 1, false) - checkSparkAnswer(shuffled) - - // Materialize the shuffled data - shuffled.collect() - val metrics = find(shuffled.queryExecution.executedPlan) { - case _: CometShuffleExchangeExec => true - case _ => false - }.map(_.metrics).get - - assert(metrics.contains("shuffleRecordsWritten")) - assert(metrics("shuffleRecordsWritten").value == 5L) - - assert(metrics.contains("shuffleBytesWritten")) - assert(metrics("shuffleBytesWritten").value > 0) - - assert(metrics.contains("shuffleWriteTime")) - assert(metrics("shuffleWriteTime").value > 0) - } - } + /** + * Checks that `df` produces the same answer as Spark does, and has the `expectedNum` Comet + * exchange operators. + */ + private def checkShuffleAnswer(df: DataFrame, expectedNum: Int): Unit = { + checkCometExchange(df, expectedNum, false) + checkSparkAnswer(df) } } -class CometAsyncShuffleSuite extends CometShuffleSuiteBase { +class CometAsyncShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = true protected val adaptiveExecutionEnabled: Boolean = true } -class CometAsyncNonFastMergeShuffleSuite extends CometShuffleSuiteBase { - override protected val fastMergeEnabled: Boolean = false - - protected val adaptiveExecutionEnabled: Boolean = true - - protected val asyncShuffleEnable: Boolean = true -} - -class CometNonFastMergeShuffleSuite extends CometShuffleSuiteBase { - override protected val fastMergeEnabled: Boolean = false - - protected val adaptiveExecutionEnabled: Boolean = true - - protected val asyncShuffleEnable: Boolean = false -} - -class CometShuffleSuite extends CometShuffleSuiteBase { +class CometShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = false protected val adaptiveExecutionEnabled: Boolean = true - - import testImplicits._ - - // TODO: this test takes ~5mins to run, we should reduce the test time. - // Because this test takes too long, we only have it in `CometShuffleSuite`. - test("fix: Too many task completion listener of ArrowReaderIterator causes OOM") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_BATCH_SIZE.key -> "1", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 1000000).map(i => (1, (i + 1).toLong)), "tbl") { - assert( - sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") - .count()) - } - } - } } -class DisableAQECometShuffleSuite extends CometShuffleSuiteBase { +class DisableAQECometShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = false protected val adaptiveExecutionEnabled: Boolean = false } -class DisableAQECometAsyncShuffleSuite extends CometShuffleSuiteBase { +class DisableAQECometAsyncShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = true protected val adaptiveExecutionEnabled: Boolean = false diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala new file mode 100644 index 000000000..c35763c34 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.exec + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{CometTestBase, DataFrame} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.col + +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus + +class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper { + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + testFun + } + } + } + + import testImplicits._ + + // TODO: this test takes a long time to run, we should reduce the test time. + test("fix: Too many task completion listener of ArrowReaderIterator causes OOM") { + withSQLConf(CometConf.COMET_BATCH_SIZE.key -> "1") { + withParquetTable((0 until 100000).map(i => (1, (i + 1).toLong)), "tbl") { + assert( + sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") + .count()) + } + } + } + + test("native shuffle: different data type") { + Seq(true, false).foreach { execEnabled => + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 1000) + var allTypes: Seq[Int] = (1 to 20) + if (isSpark34Plus) { + allTypes = allTypes.filterNot(Set(14, 17).contains) + } + allTypes.map(i => s"_$i").foreach { c => + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, + "parquet.enable.dictionary" -> dictionaryEnabled.toString) { + readParquetFile(path.toString) { df => + val shuffled = df + .select($"_1") + .repartition(10, col(c)) + checkShuffleAnswer(shuffled, 1, checkNativeOperators = execEnabled) + } + } + } + } + } + } + } + + test("hash-based native shuffle") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + val shuffled1 = df.repartition(10, $"_1") + checkShuffleAnswer(shuffled1, 1) + + val shuffled2 = df.repartition(10, $"_1", $"_2") + checkShuffleAnswer(shuffled2, 1) + + val shuffled3 = df.repartition(10, $"_2", $"_1") + checkShuffleAnswer(shuffled3, 1) + } + } + + test("native shuffle: single partition") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + + val shuffled = df.repartition(1) + checkShuffleAnswer(shuffled, 1) + } + } + + test("native shuffle with dictionary of binary") { + Seq("true", "false").foreach { dictionaryEnabled => + withParquetTable( + (0 until 1000).map(i => (i % 5, (i % 5).toString.getBytes())), + "tbl", + dictionaryEnabled.toBoolean) { + val shuffled = sql("SELECT * FROM tbl").repartition(2, $"_2") + checkShuffleAnswer(shuffled, 1) + } + } + } + + test("native operator after native shuffle") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") + + val shuffled1 = df + .repartition(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) + + // 2 Comet shuffle exchanges are expected + checkShuffleAnswer(shuffled1, 2) + + val shuffled2 = df + .repartitionByRange(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) + + // Because the first exchange from the bottom is range exchange which native shuffle + // doesn't support. So Comet exec operators stop before the first exchange and thus + // there is no Comet exchange. + checkShuffleAnswer(shuffled2, 0) + } + } + + test("grouped aggregate: native shuffle") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { + val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1") + checkShuffleAnswer(df, 1, checkNativeOperators = true) + } + } + + test("native shuffle metrics") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + val shuffled = df.repartition(10, $"_1") + + checkShuffleAnswer(shuffled, 1) + + // Materialize the shuffled data + shuffled.collect() + val metrics = find(shuffled.queryExecution.executedPlan) { + case _: CometShuffleExchangeExec => true + case _ => false + }.map(_.metrics).get + + assert(metrics.contains("shuffleRecordsWritten")) + assert(metrics("shuffleRecordsWritten").value == 5L) + } + } + + test("fix: Dictionary arrays imported from native should not be overridden") { + Seq(10, 201).foreach { numPartitions => + withSQLConf( + CometConf.COMET_BATCH_SIZE.key -> "10", + CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", + CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true") { + withParquetTable((0 until 50).map(i => (1.toString, 2.toString, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") + .filter($"_1" === 1.toString) + .repartition(numPartitions, $"_1", $"_2") + .sortWithinPartitions($"_1") + checkSparkAnswerAndOperator(df) + } + } + } + } + + test("fix: Comet native shuffle with binary data") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl") + + val shuffled = df.repartition(1, $"binary") + checkShuffleAnswer(shuffled, 1) + } + } + + /** + * Checks that `df` produces the same answer as Spark does, and has the `expectedNum` Comet + * exchange operators. When `checkNativeOperators` is true, this also checks that all operators + * used by `df` are Comet native operators. + */ + private def checkShuffleAnswer( + df: DataFrame, + expectedNum: Int, + checkNativeOperators: Boolean = false): Unit = { + checkCometExchange(df, expectedNum, true) + if (checkNativeOperators) { + checkSparkAnswerAndOperator(df) + } else { + checkSparkAnswer(df) + } + } +}