From 74fa07c5702004ed2bcd83872687473122e13bab Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Tue, 8 Aug 2023 17:46:09 +0800 Subject: [PATCH] [SPARK-44236][SQL] Disable WholeStageCodegen when set `spark.sql.codegen.factoryMode` to NO_CODEGEN ### What changes were proposed in this pull request? After #41467 , we fix the `CodegenInterpretedPlanTest ` will execute codeGen even set `spark.sql.codegen.factoryMode` to `NO_CODEGEN`. Before this PR, `spark.sql.codegen.factoryMode` can't disable WholeStageCodegen, many test case want to disable codegen by set `spark.sql.codegen.factoryMode` to `NO_CODEGEN`, but it not work for WholeStageCodegen. So this PR change the `spark.sql.codegen.factoryMode` behavior, when set `NO_CODEGEN`, we will disable `WholeStageCodegen` too. ### Why are the changes needed? Fix the `spark.sql.codegen.factoryMode` config behavior. ### Does this PR introduce _any_ user-facing change? Yes, the config logic changed. ### How was this patch tested? add new test. Closes #41779 from Hisoka-X/SPARK-44236_wholecodegen_disable. Authored-by: Jia Fan Signed-off-by: Wenchen Fan --- .../CodeGeneratorWithInterpretedFallback.scala | 3 +-- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 2 ++ .../apache/spark/sql/catalyst/plans/PlanTest.scala | 3 +-- .../spark/sql/execution/WholeStageCodegenExec.scala | 3 ++- .../spark/sql/execution/WholeStageCodegenSuite.scala | 11 +++++++++++ 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala index 0509b852cfdde..62a1afecfd7f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala @@ -38,8 +38,7 @@ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends Logging { def createObject(in: IN): OUT = { // We are allowed to choose codegen-only or no-codegen modes if under tests. - val config = SQLConf.get.getConf(SQLConf.CODEGEN_FACTORY_MODE) - val fallbackMode = CodegenObjectFactoryMode.withName(config) + val fallbackMode = CodegenObjectFactoryMode.withName(SQLConf.get.codegenFactoryMode) fallbackMode match { case CodegenObjectFactoryMode.CODEGEN_ONLY => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bcf8ce2bc5407..e4f335a9a08f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4721,6 +4721,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK) + def codegenFactoryMode: String = getConf(CODEGEN_FACTORY_MODE) + def codegenComments: Boolean = getConf(StaticSQLConf.CODEGEN_COMMENTS) def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index ebf48c5f863d2..e90a956ab4fde 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -47,8 +47,7 @@ trait CodegenInterpretedPlanTest extends PlanTest { super.test(testName + " (codegen path)", testTags: _*)( withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenMode) { testFun })(pos) super.test(testName + " (interpreted path)", testTags: _*)( - withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> interpretedMode) { - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { testFun }})(pos) + withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> interpretedMode) { testFun })(pos) } protected def testFallback( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 5fc51cc6e310b..40de623f73d59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -942,7 +942,8 @@ case class CollapseCodegenStages( } def apply(plan: SparkPlan): SparkPlan = { - if (conf.wholeStageEnabled) { + if (conf.wholeStageEnabled && CodegenObjectFactoryMode.withName(conf.codegenFactoryMode) + != CodegenObjectFactoryMode.NO_CODEGEN) { insertWholeStageCodegen(plan) } else { plan diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 0aaeedd5f06d1..5a413c77754f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode} +import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} @@ -182,6 +183,16 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) } + test("SPARK-44236: disable WholeStageCodegen when set spark.sql.codegen.factoryMode is " + + "NO_CODEGEN") { + withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.NO_CODEGEN.toString) { + val df = spark.range(10).select($"id" + 1) + val plan = df.queryExecution.executedPlan + assert(!plan.exists(_.isInstanceOf[WholeStageCodegenExec])) + checkAnswer(df, 1L to 10L map { i => Row(i) }) + } + } + test("Full Outer ShuffledHashJoin and SortMergeJoin should be included in WholeStageCodegen") { val df1 = spark.range(5).select($"id".as("k1")) val df2 = spark.range(10).select($"id".as("k2"))