From b7e7444f806deb0ebff5f0f5201f485807b8a099 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Sat, 24 Feb 2024 22:21:52 -0800 Subject: [PATCH 01/13] test: Reduce shuffle test time --- ....scala => CometColumnarShuffleSuite.scala} | 510 ++++-------------- .../comet/exec/CometNativeShuffleSuite.scala | 165 ++++++ 2 files changed, 273 insertions(+), 402 deletions(-) rename spark/src/test/scala/org/apache/comet/exec/{CometShuffleSuite.scala => CometColumnarShuffleSuite.scala} (64%) create mode 100644 spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala diff --git a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala similarity index 64% rename from spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala rename to spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 0d7c73d42..249803e22 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -27,24 +27,24 @@ import org.apache.spark.{Partitioner, SparkConf} import org.apache.spark.sql.{CometTestBase, Row} import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, CometShuffleExchangeExec, CometShuffleManager} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf -import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus - -abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPlanHelper { +abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper { protected val adaptiveExecutionEnabled: Boolean - protected val fastMergeEnabled: Boolean = true - protected val numElementsForceSpillThreshold: Int = 10 override protected def sparkConf: SparkConf = { val conf = super.sparkConf conf .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveExecutionEnabled.toString) + .set(CometConf.COMET_EXEC_ENABLED.key, "false") + .set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true") + .set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") .set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString) } @@ -85,11 +85,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on nested struct including nulls") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => (i, Seq((i + 1, i.toString), null, (i + 3, (i + 3).toString)), i + 1)), @@ -110,11 +106,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on struct including nulls") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { val data: Seq[(Int, (Int, String))] = Seq((1, (0, "1")), (2, (3, "3")), (3, null)) withParquetTable(data, "tbl") { @@ -137,8 +129,6 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq("1.0", "10.0").foreach { ratio => withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> execEnabled, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable((0 until 50).map(i => (Map(Seq(i, i + 1) -> i), i + 1)), "tbl") { val df = sql("SELECT * FROM tbl") @@ -195,8 +185,6 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq("1.0", "10.0").foreach { ratio => withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> execEnabled, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => ((Seq(Map(1 -> i)), Map(2 -> i), Map(3 -> i)), i + 1)), @@ -220,9 +208,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla withSQLConf( // AQE has `ShuffleStage` which is a leaf node which blocks // collecting `CometShuffleExchangeExec` node. - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl") val shuffled = df @@ -280,11 +266,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { // Boolean key withParquetTable(genTuples(50, Seq(true, false)), "tbl") { val df = sql("SELECT * FROM tbl") @@ -592,11 +574,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on array") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => ( @@ -629,11 +607,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq("false", "true").foreach { execEnabled => Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => (Seq(Seq(i + 1), Seq(i + 2), Seq(i + 3)), i + 1)), "tbl") { @@ -655,11 +629,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on nested struct") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => ((i, 2.toString, (i + 1).toLong, (3.toString, i + 1, (i + 2).toLong)), i + 1)), @@ -677,36 +647,13 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } - test("fix: Dictionary arrays imported from native should not be overridden") { - Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_BATCH_SIZE.key -> "10", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 50).map(i => (1.toString, 2.toString, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" === 1.toString) - .repartition(numPartitions, $"_1", $"_2") - .sortWithinPartitions($"_1") - checkSparkAnswerAndOperator(df) - } - } - } - } - test("fix: closing sliced dictionary Comet vector should not close dictionary array") { (0 to 10).foreach { _ => withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_BATCH_SIZE.key -> "10", - CometConf.COMET_EXEC_ENABLED.key -> "false", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.1", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") { val table1 = (0 until 1000) .map(i => (111111.toString, 2222222.toString, 3333333.toString, i.toLong)) .toDF("a", "b", "c", "d") @@ -727,11 +674,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("fix: Dictionary field should have distinct dict_id") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0") { withParquetTable( (0 until 10000).map(i => (1.toString, 2.toString, (i + 1).toLong)), "tbl") { @@ -748,11 +691,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("dictionary shuffle") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0") { withParquetTable((0 until 10000).map(i => (1.toString, (i + 1).toLong)), "tbl") { assert( sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() == sql( @@ -767,11 +706,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("dictionary shuffle: fallback to string") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0") { withParquetTable((0 until 10000).map(i => (1.toString, (i + 1).toLong)), "tbl") { assert( sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() == sql( @@ -785,15 +720,10 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } test("fix: inMemSorter should be reset after spilling") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") { - assert( - sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") - .count()) - } + withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") { + assert( + sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") + .count()) } } @@ -820,14 +750,9 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_18", $"_19", $"_20").foreach { col => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - readParquetFile(path.toString) { df => - val shuffled = df.select(col).repartition(numPartitions, col) - checkSparkAnswer(shuffled) - } + readParquetFile(path.toString) { df => + val shuffled = df.select(col).repartition(numPartitions, col) + checkSparkAnswer(shuffled) } } } @@ -836,10 +761,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("fix: StreamReader should always set useDecimal128 as true") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withTempPath { dir => val data = makeDecimalRDD(1000, DecimalType(12, 2), false) data.write.parquet(dir.getCanonicalPath) @@ -856,18 +778,13 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("fix: Native Unsafe decimal accessors return incorrect results") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withTempPath { dir => val data = makeDecimalRDD(1000, DecimalType(22, 2), false) data.write.parquet(dir.getCanonicalPath) readParquetFile(dir.getCanonicalPath) { df => - { - val shuffled = df.repartition(numPartitions, $"dec") - checkSparkAnswer(shuffled) - } + val shuffled = df.repartition(numPartitions, $"dec") + checkSparkAnswer(shuffled) } } } @@ -876,10 +793,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("Comet shuffle reader should respect spark.comet.batchSize") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") { assert( sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() == sql( @@ -889,14 +803,11 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } - test("Arrow shuffle should work with BatchScan") { + test("columnar shuffle should work with BatchScan") { withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> "", // Use DataSourceV2 SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", // Disable AQE - CometConf.COMET_SCAN_ENABLED.key -> "false", // Disable CometScan to use Spark BatchScan - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SCAN_ENABLED.key -> "false") { // Disable CometScan to use Spark BatchScan withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl") val shuffled = df @@ -913,23 +824,18 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("Columnar shuffle for large shuffle partition number") { Seq(10, 200, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") - val shuffled = df.repartitionByRange(numPartitions, $"_2") + val shuffled = df.repartitionByRange(numPartitions, $"_2") - val cometShuffleExecs = checkCometExchange(shuffled, 1, false) - // `CometSerializedShuffleHandle` is used for large shuffle partition number, - // i.e., sort-based shuffle writer - cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName - .contains("CometSerializedShuffleHandle") + val cometShuffleExecs = checkCometExchange(shuffled, 1, false) + // `CometSerializedShuffleHandle` is used for large shuffle partition number, + // i.e., sort-based shuffle writer + cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName + .contains("CometSerializedShuffleHandle") - checkSparkAnswer(shuffled) - } + checkSparkAnswer(shuffled) } } } @@ -944,186 +850,56 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } - test("hash shuffle: Comet shuffle") { - // Disable CometExec to explicit test Comet Arrow shuffle path - Seq(true, false).foreach { execEnabled => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled1 = df.repartition(10, $"_1") - - // If Comet execution is disabled, `Sort` operator is Spark operator - // and jvm arrow shuffle is applied. - checkCometExchange(shuffled1, 1, execEnabled) - checkSparkAnswer(shuffled1) - - val shuffled2 = df.repartition(10, $"_1", $"_2") - - checkCometExchange(shuffled2, 1, execEnabled) - checkSparkAnswer(shuffled2) - - val shuffled3 = df.repartition(10, $"_2", $"_1") - - checkCometExchange(shuffled3, 1, execEnabled) - checkSparkAnswer(shuffled3) - } - } - } - } - - test("Comet shuffle: different data type") { - // Disable CometExec to explicit test Comet native shuffle path - Seq(true, false).foreach { execEnabled => - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - val all_types = if (isSpark34Plus) { - Seq( - $"_1", - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_13", - $"_14", - $"_15", - $"_16", - $"_17", - $"_18", - $"_19", - $"_20") - } else { - Seq( - $"_1", - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_13", - $"_15", - $"_16", - $"_18", - $"_19", - $"_20") - } - all_types.foreach { col => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - "parquet.enable.dictionary" -> dictionaryEnabled.toString) { - readParquetFile(path.toString) { df => - val shuffled = df - .select($"_1") - .repartition(10, col) - checkCometExchange(shuffled, 1, true) - if (execEnabled) { - checkSparkAnswerAndOperator(shuffled) - } else { - checkSparkAnswer(shuffled) - } - } - } - } - } - } - } - } - - test("hash shuffle: Comet columnar shuffle") { + test("hash-based columnar shuffle") { Seq(10, 200, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") - val shuffled1 = - df.repartitionByRange(numPartitions, $"_2").limit(2).repartition(numPartitions, $"_1") + val shuffled1 = + df.repartitionByRange(numPartitions, $"_2").limit(2).repartition(numPartitions, $"_1") - // 3 exchanges are expected: 1) shuffle to repartition by range, 2) shuffle to global limit, 3) hash shuffle - checkCometExchange(shuffled1, 3, false) - checkSparkAnswer(shuffled1) + // 3 exchanges are expected: 1) shuffle to repartition by range, 2) shuffle to global limit, 3) hash shuffle + checkCometExchange(shuffled1, 3, false) + checkSparkAnswer(shuffled1) - val shuffled2 = df - .repartitionByRange(numPartitions, $"_2") - .limit(2) - .repartition(numPartitions, $"_1", $"_2") + val shuffled2 = df + .repartitionByRange(numPartitions, $"_2") + .limit(2) + .repartition(numPartitions, $"_1", $"_2") - checkCometExchange(shuffled2, 3, false) - checkSparkAnswer(shuffled2) + checkCometExchange(shuffled2, 3, false) + checkSparkAnswer(shuffled2) - val shuffled3 = df - .repartitionByRange(numPartitions, $"_2") - .limit(2) - .repartition(numPartitions, $"_2", $"_1") + val shuffled3 = df + .repartitionByRange(numPartitions, $"_2") + .limit(2) + .repartition(numPartitions, $"_2", $"_1") - checkCometExchange(shuffled3, 3, false) - checkSparkAnswer(shuffled3) - } + checkCometExchange(shuffled3, 3, false) + checkSparkAnswer(shuffled3) } } } - test("Comet columnar shuffle shuffle: different data type") { - Seq(10, 200, 201).foreach { numPartitions => - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - - Seq( - $"_1", - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_13", - $"_14", - $"_15", - $"_16", - $"_17", - $"_18", - $"_19", - $"_20").foreach { col => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - readParquetFile(path.toString) { df => - val shuffled = df - .select($"_1") - .repartition(numPartitions, col) - val cometShuffleExecs = checkCometExchange(shuffled, 1, false) - if (numPartitions > 200) { - // For sort-based shuffle writer - cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName - .contains("CometSerializedShuffleHandle") - } - checkSparkAnswer(shuffled) + test("columnar shuffle: different data type") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 1000) + + Seq(10, 201).foreach { numPartitions => + (1 to 20).map(i => s"_$i").foreach { c => + readParquetFile(path.toString) { df => + val shuffled = df + .select($"_1") + .repartition(numPartitions, col(c)) + val cometShuffleExecs = checkCometExchange(shuffled, 1, false) + if (numPartitions > 200) { + // For sort-based shuffle writer + cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName + .contains("CometSerializedShuffleHandle") } + checkSparkAnswer(shuffled) } } } @@ -1131,6 +907,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } + // TODO: separate this into `CometNativeShuffleSuite`? test("Comet native operator after Comet shuffle") { Seq(true, false).foreach { columnarShuffle => withSQLConf( @@ -1170,104 +947,51 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } - test("Comet shuffle: single partition") { - Seq(true, false).foreach { execEnabled => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - - val shuffled = df.repartition(1) - - checkCometExchange(shuffled, 1, execEnabled) - checkSparkAnswer(shuffled) - } - } - } - } - - test("fix: comet native shuffle with binary data") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl") - - val shuffled = df.repartition(1, $"binary") - - checkCometExchange(shuffled, 1, true) - checkSparkAnswer(shuffled) - } - } - } - - test("Comet shuffle metrics") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled = df.repartition(10, $"_1") - - checkCometExchange(shuffled, 1, true) - checkSparkAnswer(shuffled) + test("columnar shuffle: single partition") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - // Materialize the shuffled data - shuffled.collect() - val metrics = find(shuffled.queryExecution.executedPlan) { - case _: CometShuffleExchangeExec => true - case _ => false - }.map(_.metrics).get + val shuffled = df.repartition(1) - assert(metrics.contains("shuffleRecordsWritten")) - assert(metrics("shuffleRecordsWritten").value == 5L) - } + checkCometExchange(shuffled, 1, false) + checkSparkAnswer(shuffled) } } - test("sort-based shuffle metrics") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled = df.repartition(201, $"_1") + test("sort-based columnar shuffle metrics") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + val shuffled = df.repartition(201, $"_1") - checkCometExchange(shuffled, 1, false) - checkSparkAnswer(shuffled) + checkCometExchange(shuffled, 1, false) + checkSparkAnswer(shuffled) - // Materialize the shuffled data - shuffled.collect() - val metrics = find(shuffled.queryExecution.executedPlan) { - case _: CometShuffleExchangeExec => true - case _ => false - }.map(_.metrics).get + // Materialize the shuffled data + shuffled.collect() + val metrics = find(shuffled.queryExecution.executedPlan) { + case _: CometShuffleExchangeExec => true + case _ => false + }.map(_.metrics).get - assert(metrics.contains("shuffleRecordsWritten")) - assert(metrics("shuffleRecordsWritten").value == 5L) + assert(metrics.contains("shuffleRecordsWritten")) + assert(metrics("shuffleRecordsWritten").value == 5L) - assert(metrics.contains("shuffleBytesWritten")) - assert(metrics("shuffleBytesWritten").value > 0) + assert(metrics.contains("shuffleBytesWritten")) + assert(metrics("shuffleBytesWritten").value > 0) - assert(metrics.contains("shuffleWriteTime")) - assert(metrics("shuffleWriteTime").value > 0) - } + assert(metrics.contains("shuffleWriteTime")) + assert(metrics("shuffleWriteTime").value > 0) } } } -class CometAsyncShuffleSuite extends CometShuffleSuiteBase { +class CometAsyncShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = true protected val adaptiveExecutionEnabled: Boolean = true } -class CometAsyncNonFastMergeShuffleSuite extends CometShuffleSuiteBase { +class CometAsyncNonFastMergeShuffleSuite extends CometColumnarShuffleSuite { override protected val fastMergeEnabled: Boolean = false protected val adaptiveExecutionEnabled: Boolean = true @@ -1275,7 +999,7 @@ class CometAsyncNonFastMergeShuffleSuite extends CometShuffleSuiteBase { protected val asyncShuffleEnable: Boolean = true } -class CometNonFastMergeShuffleSuite extends CometShuffleSuiteBase { +class CometNonFastMergeShuffleSuite extends CometColumnarShuffleSuite { override protected val fastMergeEnabled: Boolean = false protected val adaptiveExecutionEnabled: Boolean = true @@ -1283,37 +1007,19 @@ class CometNonFastMergeShuffleSuite extends CometShuffleSuiteBase { protected val asyncShuffleEnable: Boolean = false } -class CometShuffleSuite extends CometShuffleSuiteBase { +class CometShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = false protected val adaptiveExecutionEnabled: Boolean = true - - import testImplicits._ - - // TODO: this test takes ~5mins to run, we should reduce the test time. - // Because this test takes too long, we only have it in `CometShuffleSuite`. - test("fix: Too many task completion listener of ArrowReaderIterator causes OOM") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_BATCH_SIZE.key -> "1", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 1000000).map(i => (1, (i + 1).toLong)), "tbl") { - assert( - sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") - .count()) - } - } - } } -class DisableAQECometShuffleSuite extends CometShuffleSuiteBase { +class DisableAQECometShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = false protected val adaptiveExecutionEnabled: Boolean = false } -class DisableAQECometAsyncShuffleSuite extends CometShuffleSuiteBase { +class DisableAQECometAsyncShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = true protected val adaptiveExecutionEnabled: Boolean = false diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala new file mode 100644 index 000000000..f047f0724 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.exec + +import org.apache.hadoop.fs.Path +import org.apache.spark.SparkConf +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.col + +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus + +class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper { + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + conf + .set(CometConf.COMET_EXEC_ENABLED.key, "true") + .set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "false") + .set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") + } + + import testImplicits._ + + // TODO: this test takes ~5mins to run, we should reduce the test time. + test("fix: Too many task completion listener of ArrowReaderIterator causes OOM") { + withSQLConf(CometConf.COMET_BATCH_SIZE.key -> "1") { + withParquetTable((0 until 1000000).map(i => (1, (i + 1).toLong)), "tbl") { + assert( + sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") + .count()) + } + } + } + + test("native shuffle: different data type") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 1000) + var allTypes: Seq[Int] = (1 to 20) + if (isSpark34Plus) { + allTypes = allTypes.filterNot(Set(14, 17).contains) + } + allTypes.map(i => s"_$i").foreach { c => + withSQLConf( + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + "parquet.enable.dictionary" -> dictionaryEnabled.toString) { + readParquetFile(path.toString) { df => + val shuffled = df + .select($"_1") + .repartition(10, col(c)) + checkCometExchange(shuffled, 1, true) + checkSparkAnswerAndOperator(shuffled) + } + } + } + } + } + } + + test("hash-based native shuffle") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + val shuffled1 = df.repartition(10, $"_1") + + checkCometExchange(shuffled1, 1, true) + checkSparkAnswer(shuffled1) + + val shuffled2 = df.repartition(10, $"_1", $"_2") + + checkCometExchange(shuffled2, 1, true) + checkSparkAnswer(shuffled2) + + val shuffled3 = df.repartition(10, $"_2", $"_1") + + checkCometExchange(shuffled3, 1, true) + checkSparkAnswer(shuffled3) + } + } + + test("columnar shuffle: single partition") { + Seq(true, false).foreach { execEnabled => + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + + val shuffled = df.repartition(1) + + checkCometExchange(shuffled, 1, execEnabled) + checkSparkAnswer(shuffled) + } + } + } + } + + test("native shuffle metrics") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + val shuffled = df.repartition(10, $"_1") + + checkCometExchange(shuffled, 1, true) + checkSparkAnswer(shuffled) + + // Materialize the shuffled data + shuffled.collect() + val metrics = find(shuffled.queryExecution.executedPlan) { + case _: CometShuffleExchangeExec => true + case _ => false + }.map(_.metrics).get + + assert(metrics.contains("shuffleRecordsWritten")) + assert(metrics("shuffleRecordsWritten").value == 5L) + } + } + + test("fix: Dictionary arrays imported from native should not be overridden") { + Seq(10, 201).foreach { numPartitions => + withSQLConf( + CometConf.COMET_BATCH_SIZE.key -> "10", + CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", + CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true") { + withParquetTable((0 until 50).map(i => (1.toString, 2.toString, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") + .filter($"_1" === 1.toString) + .repartition(numPartitions, $"_1", $"_2") + .sortWithinPartitions($"_1") + checkSparkAnswerAndOperator(df) + } + } + } + } + + test("fix: comet native shuffle with binary data") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl") + + val shuffled = df.repartition(1, $"binary") + + checkCometExchange(shuffled, 1, true) + checkSparkAnswer(shuffled) + } + } +} From 6ce3f31379070b57a1368a019323bfb808d2d964 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Sun, 25 Feb 2024 00:22:45 -0800 Subject: [PATCH 02/13] fix --- .../org/apache/comet/CometCastSuite.scala | 4 +- .../exec/CometColumnarShuffleSuite.scala | 78 +++++++------------ .../comet/exec/CometNativeShuffleSuite.scala | 77 ++++++++++++------ 3 files changed, 84 insertions(+), 75 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 565d2264b..317371fb9 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -90,13 +90,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { Range(0, len).map(_ => chars.charAt(r.nextInt(chars.length))).mkString } - private def fuzzCastFromString(chars: String, maxLen: Int, toType: DataType) { + private def fuzzCastFromString(chars: String, maxLen: Int, toType: DataType): Unit = { val r = new Random(0) val inputs = Range(0, 10000).map(_ => genString(r, chars, maxLen)) castTest(inputs.toDF("a"), toType) } - private def castTest(input: DataFrame, toType: DataType) { + private def castTest(input: DataFrame, toType: DataType): Unit = { withTempPath { dir => val df = roundtripParquet(input, dir) .withColumn("converted", col("a").cast(toType)) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 249803e22..da9d73023 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -42,9 +42,6 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar val conf = super.sparkConf conf .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveExecutionEnabled.toString) - .set(CometConf.COMET_EXEC_ENABLED.key, "false") - .set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true") - .set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") .set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString) } @@ -55,7 +52,10 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar super.test(testName, testTags: _*) { withSQLConf( CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncShuffleEnable.toString, - CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString) { + CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString, + CometConf.COMET_EXEC_ENABLED.key -> "false", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { testFun } } @@ -840,16 +840,6 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar } } - test("grouped aggregate: Comet shuffle") { - withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { - val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1") - checkCometExchange(df, 1, true) - checkSparkAnswerAndOperator(df) - } - } - } - test("hash-based columnar shuffle") { Seq(10, 200, 201).foreach { numPartitions => withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { @@ -907,43 +897,29 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar } } - // TODO: separate this into `CometNativeShuffleSuite`? - test("Comet native operator after Comet shuffle") { - Seq(true, false).foreach { columnarShuffle => - withSQLConf( - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> columnarShuffle.toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") - - val shuffled1 = df - .repartition(10, $"_2") - .select($"_1", $"_1" + 1, $"_2" + 2) - .repartition(10, $"_1") - .filter($"_1" > 1) - - // 2 Comet shuffle exchanges are expected - checkCometExchange(shuffled1, 2, !columnarShuffle) - checkSparkAnswer(shuffled1) - - val shuffled2 = df - .repartitionByRange(10, $"_2") - .select($"_1", $"_1" + 1, $"_2" + 2) - .repartition(10, $"_1") - .filter($"_1" > 1) - - // 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled - if (columnarShuffle) { - checkCometExchange(shuffled2, 2, !columnarShuffle) - } else { - // Because the first exchange from the bottom is range exchange which native shuffle - // doesn't support. So Comet exec operators stop before the first exchange and thus - // there is no Comet exchange. - checkCometExchange(shuffled2, 0, true) - } - checkSparkAnswer(shuffled2) - } - } + test("native operator after columnar shuffle") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") + + val shuffled1 = df + .repartition(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) + + // 2 Comet shuffle exchanges are expected + checkCometExchange(shuffled1, 2, false) + checkSparkAnswer(shuffled1) + + val shuffled2 = df + .repartitionByRange(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) + + // 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled + checkCometExchange(shuffled2, 2, false) + checkSparkAnswer(shuffled2) } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index f047f0724..0b027a68a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -19,8 +19,10 @@ package org.apache.comet.exec +import org.scalactic.source.Position +import org.scalatest.Tag + import org.apache.hadoop.fs.Path -import org.apache.spark.SparkConf import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -30,12 +32,16 @@ import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper { - override protected def sparkConf: SparkConf = { - val conf = super.sparkConf - conf - .set(CometConf.COMET_EXEC_ENABLED.key, "true") - .set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "false") - .set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + testFun + } + } } import testImplicits._ @@ -61,9 +67,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper allTypes = allTypes.filterNot(Set(14, 17).contains) } allTypes.map(i => s"_$i").foreach { c => - withSQLConf( - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - "parquet.enable.dictionary" -> dictionaryEnabled.toString) { + withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString) { readParquetFile(path.toString) { df => val shuffled = df .select($"_1") @@ -98,20 +102,49 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } test("columnar shuffle: single partition") { - Seq(true, false).foreach { execEnabled => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled = df.repartition(1) + val shuffled = df.repartition(1) - checkCometExchange(shuffled, 1, execEnabled) - checkSparkAnswer(shuffled) - } - } + checkCometExchange(shuffled, 1, true) + checkSparkAnswer(shuffled) + } + } + + test("native operator after native shuffle") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") + + val shuffled1 = df + .repartition(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) + + // 2 Comet shuffle exchanges are expected + checkCometExchange(shuffled1, 2, true) + checkSparkAnswer(shuffled1) + + val shuffled2 = df + .repartitionByRange(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) + + // Because the first exchange from the bottom is range exchange which native shuffle + // doesn't support. So Comet exec operators stop before the first exchange and thus + // there is no Comet exchange. + checkCometExchange(shuffled2, 0, true) + checkSparkAnswer(shuffled2) + } + } + + test("grouped aggregate: native shuffle") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { + val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1") + checkCometExchange(df, 1, true) + checkSparkAnswerAndOperator(df) } } From 70d6fea3e0e14d5b4a9d339f31cc6f1fa1f69752 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Sun, 25 Feb 2024 17:00:37 -0800 Subject: [PATCH 03/13] remove non-fast merge test combination --- .../comet/exec/CometColumnarShuffleSuite.scala | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index da9d73023..e57565365 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -35,14 +35,12 @@ import org.apache.comet.CometConf abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper { protected val adaptiveExecutionEnabled: Boolean - protected val fastMergeEnabled: Boolean = true protected val numElementsForceSpillThreshold: Int = 10 override protected def sparkConf: SparkConf = { val conf = super.sparkConf conf .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveExecutionEnabled.toString) - .set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString) } protected val asyncShuffleEnable: Boolean @@ -967,22 +965,6 @@ class CometAsyncShuffleSuite extends CometColumnarShuffleSuite { protected val adaptiveExecutionEnabled: Boolean = true } -class CometAsyncNonFastMergeShuffleSuite extends CometColumnarShuffleSuite { - override protected val fastMergeEnabled: Boolean = false - - protected val adaptiveExecutionEnabled: Boolean = true - - protected val asyncShuffleEnable: Boolean = true -} - -class CometNonFastMergeShuffleSuite extends CometColumnarShuffleSuite { - override protected val fastMergeEnabled: Boolean = false - - protected val adaptiveExecutionEnabled: Boolean = true - - protected val asyncShuffleEnable: Boolean = false -} - class CometShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = false From 26e69d747f60277f51f48142f6cd8ffdcf38d4b0 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 26 Feb 2024 09:57:32 -0800 Subject: [PATCH 04/13] reduce time in shuffle --- .../scala/org/apache/comet/exec/CometNativeShuffleSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index 0b027a68a..62fdc023a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -49,7 +49,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper // TODO: this test takes ~5mins to run, we should reduce the test time. test("fix: Too many task completion listener of ArrowReaderIterator causes OOM") { withSQLConf(CometConf.COMET_BATCH_SIZE.key -> "1") { - withParquetTable((0 until 1000000).map(i => (1, (i + 1).toLong)), "tbl") { + withParquetTable((0 until 100000).map(i => (1, (i + 1).toLong)), "tbl") { assert( sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") .count()) From 102bdfec67c547fcc8ba67b09d318e026a5a72b9 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 26 Feb 2024 10:06:26 -0800 Subject: [PATCH 05/13] reduce time in aggregate --- .../comet/exec/CometAggregateSuite.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 04735b5d1..fdc438833 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -537,23 +537,23 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf(CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) { // Test all combinations of different aggregation & group-by types - (1 to 4).foreach { col => - (1 to 14).foreach { gCol => - withView("v") { + (1 to 14).foreach { gCol => + withView("v") { + (1 to 4).foreach { col => sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _$col FROM tbl ORDER BY _$col") checkSparkAnswer(s"SELECT _g$gCol, FIRST(_$col) FROM v GROUP BY _g$gCol") checkSparkAnswer(s"SELECT _g$gCol, LAST(_$col) FROM v GROUP BY _g$gCol") } - checkSparkAnswer(s"SELECT _g$gCol, SUM(_$col) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer( - s"SELECT _g$gCol, SUM(DISTINCT _$col) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer(s"SELECT _g$gCol, COUNT(_$col) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer( - s"SELECT _g$gCol, COUNT(DISTINCT _$col) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer( - s"SELECT _g$gCol, MIN(_$col), MAX(_$col) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer(s"SELECT _g$gCol, AVG(_$col) FROM tbl GROUP BY _g$gCol") } + checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2), SUM(FROM tbl GROUP BY _g$gCol") + checkSparkAnswer( + s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer(s"SELECT _g$gCol, COUNT(_3), COUNT(_4) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer( + s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer( + s"SELECT _g$gCol, MIN(_1), MAX(_4) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer(s"SELECT _g$gCol, AVG(_2), AVG(_4) FROM tbl GROUP BY _g$gCol") } } } From edd03a8e6a52a781319a512aca71abfc36b63df6 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 26 Feb 2024 10:10:03 -0800 Subject: [PATCH 06/13] try parallel --- pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pom.xml b/pom.xml index d7cd0764e..9c34fddba 100644 --- a/pom.xml +++ b/pom.xml @@ -651,6 +651,10 @@ under the License. true ${project.build.directory}/tmp + + suites + perSuite + 2 From f1b4f9b2cec4ee21bf2356e4eb30cfae943fba3c Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 26 Feb 2024 10:21:26 -0800 Subject: [PATCH 07/13] fix format --- .../apache/comet/exec/CometAggregateSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index fdc438833..fe0af5eda 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -538,21 +538,21 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Test all combinations of different aggregation & group-by types (1 to 14).foreach { gCol => - withView("v") { - (1 to 4).foreach { col => + (1 to 4).foreach { col => + withView("v") { sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _$col FROM tbl ORDER BY _$col") checkSparkAnswer(s"SELECT _g$gCol, FIRST(_$col) FROM v GROUP BY _g$gCol") checkSparkAnswer(s"SELECT _g$gCol, LAST(_$col) FROM v GROUP BY _g$gCol") } } - checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2), SUM(FROM tbl GROUP BY _g$gCol") checkSparkAnswer( - s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer(s"SELECT _g$gCol, COUNT(_3), COUNT(_4) FROM tbl GROUP BY _g$gCol") + s"SELECT _g$gCol, SUM(_1), SUM(_2) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer(s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol") checkSparkAnswer( - s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY _g$gCol") + s"SELECT _g$gCol, COUNT(_3), COUNT(_4) FROM tbl GROUP BY _g$gCol") checkSparkAnswer( - s"SELECT _g$gCol, MIN(_1), MAX(_4) FROM tbl GROUP BY _g$gCol") + s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer(s"SELECT _g$gCol, MIN(_1), MAX(_4) FROM tbl GROUP BY _g$gCol") checkSparkAnswer(s"SELECT _g$gCol, AVG(_2), AVG(_4) FROM tbl GROUP BY _g$gCol") } } From c2cde0d0204e49548bdd677aa6f01bcf530da183 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 26 Feb 2024 14:31:46 -0800 Subject: [PATCH 08/13] fix format --- .../test/scala/org/apache/comet/exec/CometAggregateSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index fe0af5eda..1a414e7f5 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -545,8 +545,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkAnswer(s"SELECT _g$gCol, LAST(_$col) FROM v GROUP BY _g$gCol") } } - checkSparkAnswer( - s"SELECT _g$gCol, SUM(_1), SUM(_2) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2) FROM tbl GROUP BY _g$gCol") checkSparkAnswer(s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol") checkSparkAnswer( s"SELECT _g$gCol, COUNT(_3), COUNT(_4) FROM tbl GROUP BY _g$gCol") From b3b14e952b8665decec2c09052016ec4b40f210c Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 26 Feb 2024 20:18:05 -0800 Subject: [PATCH 09/13] fix config --- pom.xml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 9c34fddba..31abf6bdc 100644 --- a/pom.xml +++ b/pom.xml @@ -652,9 +652,8 @@ under the License. ${project.build.directory}/tmp - suites - perSuite - 2 + true + always From 510a4d1c1cb4de3765bfc55fed19b84593bc90ec Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 27 Feb 2024 00:11:30 -0800 Subject: [PATCH 10/13] revert pom changes --- pom.xml | 3 --- 1 file changed, 3 deletions(-) diff --git a/pom.xml b/pom.xml index 31abf6bdc..d7cd0764e 100644 --- a/pom.xml +++ b/pom.xml @@ -651,9 +651,6 @@ under the License. true ${project.build.directory}/tmp - - true - always From 7b58ae5db9aafa8e3b513b4fb983cb7d76c083be Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 27 Feb 2024 21:26:07 -0800 Subject: [PATCH 11/13] review --- .../org/apache/comet/CometCastSuite.scala | 4 ++-- .../comet/exec/CometAggregateSuite.scala | 18 +++++++----------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 317371fb9..565d2264b 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -90,13 +90,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { Range(0, len).map(_ => chars.charAt(r.nextInt(chars.length))).mkString } - private def fuzzCastFromString(chars: String, maxLen: Int, toType: DataType): Unit = { + private def fuzzCastFromString(chars: String, maxLen: Int, toType: DataType) { val r = new Random(0) val inputs = Range(0, 10000).map(_ => genString(r, chars, maxLen)) castTest(inputs.toDF("a"), toType) } - private def castTest(input: DataFrame, toType: DataType): Unit = { + private def castTest(input: DataFrame, toType: DataType) { withTempPath { dir => val df = roundtripParquet(input, dir) .withColumn("converted", col("a").cast(toType)) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 1a414e7f5..d64a3a3ae 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -538,21 +538,17 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Test all combinations of different aggregation & group-by types (1 to 14).foreach { gCol => - (1 to 4).foreach { col => - withView("v") { - sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _$col FROM tbl ORDER BY _$col") - checkSparkAnswer(s"SELECT _g$gCol, FIRST(_$col) FROM v GROUP BY _g$gCol") - checkSparkAnswer(s"SELECT _g$gCol, LAST(_$col) FROM v GROUP BY _g$gCol") - } + withView("v") { + sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _1, _2, _3, _4 " + + "FROM tbl ORDER BY _1, _2, _3, _4") + checkSparkAnswer(s"SELECT _g$gCol, FIRST(_1), FIRST(_2), FIRST(_3), " + + s"FIRST(_4), LAST(_1), LAST(_2), LAST(_3), LAST(_4) FROM v GROUP BY _g$gCol") } - checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2) FROM tbl GROUP BY _g$gCol") + checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2), COUNT(_3), COUNT(_4), " + + s"MIN(_1), MAX(_4), AVG(_2), AVG(_4) FROM tbl GROUP BY _g$gCol") checkSparkAnswer(s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer( - s"SELECT _g$gCol, COUNT(_3), COUNT(_4) FROM tbl GROUP BY _g$gCol") checkSparkAnswer( s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer(s"SELECT _g$gCol, MIN(_1), MAX(_4) FROM tbl GROUP BY _g$gCol") - checkSparkAnswer(s"SELECT _g$gCol, AVG(_2), AVG(_4) FROM tbl GROUP BY _g$gCol") } } } From 2ec3d3e93465c860dd4165df3fc11a123fc99e1b Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 27 Feb 2024 22:58:47 -0800 Subject: [PATCH 12/13] more --- .../comet/exec/CometNativeShuffleSuite.scala | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index 62fdc023a..9e2b17e4e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -58,22 +58,30 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } test("native shuffle: different data type") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 1000) - var allTypes: Seq[Int] = (1 to 20) - if (isSpark34Plus) { - allTypes = allTypes.filterNot(Set(14, 17).contains) - } - allTypes.map(i => s"_$i").foreach { c => - withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString) { - readParquetFile(path.toString) { df => - val shuffled = df - .select($"_1") - .repartition(10, col(c)) - checkCometExchange(shuffled, 1, true) - checkSparkAnswerAndOperator(shuffled) + Seq(false).foreach { execEnabled => + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 1000) + var allTypes: Seq[Int] = (1 to 20) + if (isSpark34Plus) { + allTypes = allTypes.filterNot(Set(14, 17).contains) + } + allTypes.map(i => s"_$i").foreach { c => + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, + "parquet.enable.dictionary" -> dictionaryEnabled.toString) { + readParquetFile(path.toString) { df => + val shuffled = df + .select($"_1") + .repartition(10, col(c)) + checkCometExchange(shuffled, 1, true) + if (execEnabled) { + checkSparkAnswerAndOperator(shuffled) + } else { + checkSparkAnswer(shuffled) + } + } } } } From dbbdc9365b22e1f0e35a2b2b82e08739e22dd58c Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 28 Feb 2024 10:10:36 -0800 Subject: [PATCH 13/13] update --- .../exec/CometColumnarShuffleSuite.scala | 131 ++++++------------ .../comet/exec/CometNativeShuffleSuite.scala | 74 +++++----- 2 files changed, 88 insertions(+), 117 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index e57565365..fec6197d6 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.{Partitioner, SparkConf} -import org.apache.spark.sql.{CometTestBase, Row} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, CometShuffleExchangeExec, CometShuffleManager} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.col @@ -61,25 +61,6 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar import testImplicits._ - test("Native shuffle with dictionary of binary") { - Seq("true", "false").foreach { dictionaryEnabled => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { - withParquetTable( - (0 until 1000).map(i => (i % 5, (i % 5).toString.getBytes())), - "tbl", - dictionaryEnabled.toBoolean) { - val shuffled = sql("SELECT * FROM tbl").repartition(2, $"_2") - - checkCometExchange(shuffled, 1, true) - checkSparkAnswer(shuffled) - } - } - } - } - test("columnar shuffle on nested struct including nulls") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => @@ -93,8 +74,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .repartition(numPartitions, $"_1", $"_2", $"_3") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } } } @@ -113,8 +93,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } } } @@ -134,9 +113,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_2") - checkSparkAnswer(df) // Array map key array element fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } withParquetTable((0 until 50).map(i => (Map(i -> Seq(i, i + 1)), i + 1)), "tbl") { @@ -145,9 +123,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_2") - checkSparkAnswer(df) // Array map value array element fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } withParquetTable((0 until 50).map(i => (Map((i, i.toString) -> i), i + 1)), "tbl") { @@ -156,9 +133,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_2") - checkSparkAnswer(df) // Struct map key array element fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } withParquetTable((0 until 50).map(i => (Map(i -> (i, i.toString)), i + 1)), "tbl") { @@ -167,9 +143,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_2") - checkSparkAnswer(df) // Struct map value array element fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } } } @@ -192,9 +167,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_2") - checkSparkAnswer(df) // Map array element fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } } } @@ -285,8 +259,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Byte key @@ -309,8 +282,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Short key @@ -333,8 +305,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Int key @@ -357,8 +328,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Long key @@ -381,8 +351,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Float key @@ -405,8 +374,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Double key @@ -429,8 +397,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Date key @@ -455,8 +422,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Timestamp key @@ -483,8 +449,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Decimal key @@ -511,8 +476,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // String key @@ -535,8 +499,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } // Binary key @@ -561,8 +524,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_13") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } } } @@ -593,8 +555,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .repartition(numPartitions, $"_1", $"_2", $"_3", $"_4", $"_5") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } } } @@ -614,9 +575,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_1") - checkSparkAnswer(df) // Nested array fallback to Spark shuffle for now - checkCometExchange(df, 0, false) + checkShuffleAnswer(df, 0) } } } @@ -637,8 +597,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .repartition(numPartitions, $"_1", $"_2") .sortWithinPartitions($"_1") - checkSparkAnswer(df) - checkCometExchange(df, 1, false) + checkShuffleAnswer(df, 1) } } } @@ -663,7 +622,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar val df = sql( "select a, b, count(distinct h) from tbl_a, tbl_b " + "where c = e and b = '2222222' and a not like '2' group by a, b") - checkSparkAnswer(df) + checkShuffleAnswer(df, 4) } } } @@ -681,7 +640,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar "SELECT * FROM tbl") .count()) val shuffled = sql("SELECT * FROM tbl").repartition(numPartitions, $"_1") - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } } @@ -696,7 +655,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar "SELECT * FROM tbl") .count()) val shuffled = sql("SELECT * FROM tbl").select($"_1").repartition(numPartitions, $"_1") - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } } @@ -711,7 +670,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar "SELECT * FROM tbl") .count()) val shuffled = sql("SELECT * FROM tbl").select($"_1").repartition(numPartitions, $"_1") - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } } @@ -750,7 +709,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar $"_20").foreach { col => readParquetFile(path.toString) { df => val shuffled = df.select(col).repartition(numPartitions, col) - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } } @@ -766,7 +725,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar readParquetFile(dir.getCanonicalPath) { df => { val shuffled = df.repartition(numPartitions, $"dec") - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } } @@ -782,7 +741,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar data.write.parquet(dir.getCanonicalPath) readParquetFile(dir.getCanonicalPath) { df => val shuffled = df.repartition(numPartitions, $"dec") - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } } @@ -847,24 +806,21 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar df.repartitionByRange(numPartitions, $"_2").limit(2).repartition(numPartitions, $"_1") // 3 exchanges are expected: 1) shuffle to repartition by range, 2) shuffle to global limit, 3) hash shuffle - checkCometExchange(shuffled1, 3, false) - checkSparkAnswer(shuffled1) + checkShuffleAnswer(shuffled1, 3) val shuffled2 = df .repartitionByRange(numPartitions, $"_2") .limit(2) .repartition(numPartitions, $"_1", $"_2") - checkCometExchange(shuffled2, 3, false) - checkSparkAnswer(shuffled2) + checkShuffleAnswer(shuffled2, 3) val shuffled3 = df .repartitionByRange(numPartitions, $"_2") .limit(2) .repartition(numPartitions, $"_2", $"_1") - checkCometExchange(shuffled3, 3, false) - checkSparkAnswer(shuffled3) + checkShuffleAnswer(shuffled3, 3) } } } @@ -906,8 +862,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .filter($"_1" > 1) // 2 Comet shuffle exchanges are expected - checkCometExchange(shuffled1, 2, false) - checkSparkAnswer(shuffled1) + checkShuffleAnswer(shuffled1, 2) val shuffled2 = df .repartitionByRange(10, $"_2") @@ -916,8 +871,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar .filter($"_1" > 1) // 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled - checkCometExchange(shuffled2, 2, false) - checkSparkAnswer(shuffled2) + checkShuffleAnswer(shuffled2, 2) } } @@ -927,8 +881,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar val shuffled = df.repartition(1) - checkCometExchange(shuffled, 1, false) - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) } } @@ -937,8 +890,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) val shuffled = df.repartition(201, $"_1") - checkCometExchange(shuffled, 1, false) - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) // Materialize the shuffled data shuffled.collect() @@ -957,6 +909,15 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar assert(metrics("shuffleWriteTime").value > 0) } } + + /** + * Checks that `df` produces the same answer as Spark does, and has the `expectedNum` Comet + * exchange operators. + */ + private def checkShuffleAnswer(df: DataFrame, expectedNum: Int): Unit = { + checkCometExchange(df, expectedNum, false) + checkSparkAnswer(df) + } } class CometAsyncShuffleSuite extends CometColumnarShuffleSuite { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index 9e2b17e4e..c35763c34 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -23,7 +23,7 @@ import org.scalactic.source.Position import org.scalatest.Tag import org.apache.hadoop.fs.Path -import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.col @@ -46,7 +46,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper import testImplicits._ - // TODO: this test takes ~5mins to run, we should reduce the test time. + // TODO: this test takes a long time to run, we should reduce the test time. test("fix: Too many task completion listener of ArrowReaderIterator causes OOM") { withSQLConf(CometConf.COMET_BATCH_SIZE.key -> "1") { withParquetTable((0 until 100000).map(i => (1, (i + 1).toLong)), "tbl") { @@ -58,7 +58,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } test("native shuffle: different data type") { - Seq(false).foreach { execEnabled => + Seq(true, false).foreach { execEnabled => Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") @@ -75,12 +75,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper val shuffled = df .select($"_1") .repartition(10, col(c)) - checkCometExchange(shuffled, 1, true) - if (execEnabled) { - checkSparkAnswerAndOperator(shuffled) - } else { - checkSparkAnswer(shuffled) - } + checkShuffleAnswer(shuffled, 1, checkNativeOperators = execEnabled) } } } @@ -93,30 +88,34 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) val shuffled1 = df.repartition(10, $"_1") - - checkCometExchange(shuffled1, 1, true) - checkSparkAnswer(shuffled1) + checkShuffleAnswer(shuffled1, 1) val shuffled2 = df.repartition(10, $"_1", $"_2") - - checkCometExchange(shuffled2, 1, true) - checkSparkAnswer(shuffled2) + checkShuffleAnswer(shuffled2, 1) val shuffled3 = df.repartition(10, $"_2", $"_1") - - checkCometExchange(shuffled3, 1, true) - checkSparkAnswer(shuffled3) + checkShuffleAnswer(shuffled3, 1) } } - test("columnar shuffle: single partition") { + test("native shuffle: single partition") { withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) val shuffled = df.repartition(1) + checkShuffleAnswer(shuffled, 1) + } + } - checkCometExchange(shuffled, 1, true) - checkSparkAnswer(shuffled) + test("native shuffle with dictionary of binary") { + Seq("true", "false").foreach { dictionaryEnabled => + withParquetTable( + (0 until 1000).map(i => (i % 5, (i % 5).toString.getBytes())), + "tbl", + dictionaryEnabled.toBoolean) { + val shuffled = sql("SELECT * FROM tbl").repartition(2, $"_2") + checkShuffleAnswer(shuffled, 1) + } } } @@ -131,8 +130,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper .filter($"_1" > 1) // 2 Comet shuffle exchanges are expected - checkCometExchange(shuffled1, 2, true) - checkSparkAnswer(shuffled1) + checkShuffleAnswer(shuffled1, 2) val shuffled2 = df .repartitionByRange(10, $"_2") @@ -143,16 +141,14 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper // Because the first exchange from the bottom is range exchange which native shuffle // doesn't support. So Comet exec operators stop before the first exchange and thus // there is no Comet exchange. - checkCometExchange(shuffled2, 0, true) - checkSparkAnswer(shuffled2) + checkShuffleAnswer(shuffled2, 0) } } test("grouped aggregate: native shuffle") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1") - checkCometExchange(df, 1, true) - checkSparkAnswerAndOperator(df) + checkShuffleAnswer(df, 1, checkNativeOperators = true) } } @@ -161,8 +157,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) val shuffled = df.repartition(10, $"_1") - checkCometExchange(shuffled, 1, true) - checkSparkAnswer(shuffled) + checkShuffleAnswer(shuffled, 1) // Materialize the shuffled data shuffled.collect() @@ -193,14 +188,29 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } - test("fix: comet native shuffle with binary data") { + test("fix: Comet native shuffle with binary data") { withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl") val shuffled = df.repartition(1, $"binary") + checkShuffleAnswer(shuffled, 1) + } + } - checkCometExchange(shuffled, 1, true) - checkSparkAnswer(shuffled) + /** + * Checks that `df` produces the same answer as Spark does, and has the `expectedNum` Comet + * exchange operators. When `checkNativeOperators` is true, this also checks that all operators + * used by `df` are Comet native operators. + */ + private def checkShuffleAnswer( + df: DataFrame, + expectedNum: Int, + checkNativeOperators: Boolean = false): Unit = { + checkCometExchange(df, expectedNum, true) + if (checkNativeOperators) { + checkSparkAnswerAndOperator(df) + } else { + checkSparkAnswer(df) } } }