From 5aa7ece30c5f44aacc9d994c0fc4a05d214b046c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 12 Mar 2020 06:23:26 +0900 Subject: [PATCH] Change the approach: detect and apply workaround internally --- .../expressions/codegen/CodeGenerator.scala | 2 +- .../sql/catalyst/expressions/predicates.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 19 ------ .../spark/sql/execution/ExpandExec.scala | 6 +- .../sql/execution/WholeStageCodegenExec.scala | 59 +++++++++++++++++-- .../spark/sql/DataFrameAggregateSuite.scala | 3 +- 6 files changed, 59 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 58c95c94ba198..d2e30af079ab7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -113,7 +113,7 @@ private[codegen] case class NewFunctionSpec( * A context for codegen, tracking a list of objects that could be passed into generated Java * function. */ -class CodegenContext extends Logging { +class CodegenContext(val disallowSwitchStatement: Boolean = false) extends Logging { import CodeGenerator._ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 62e8b02aba17d..c3a60c9f36455 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -456,7 +456,7 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val sqlConf = SQLConf.get if (canBeComputedUsingSwitch && hset.size <= sqlConf.optimizerInSetSwitchThreshold && - sqlConf.codegenUseSwitchStatement) { + !ctx.disallowSwitchStatement) { genCodeWithSwitch(ctx, ev) } else { genCodeWithSet(ctx, ev) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bbe4e5fc9482d..4f493b472073b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1130,23 +1130,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val CODEGEN_USE_SWITCH_STATEMENT = - buildConf("spark.sql.codegen.useSwitchStatement") - .internal() - .doc("When true, Spark leverages switch statement while generating code. Otherwise Spark " + - "will leverage if ~ else if ~ else statement as an alternative. In normal case, " + - "'switch' statement is preferred against if ~ else if ~ else. This configuration is " + - "required to avoid Janino bug (https://github.com/janino-compiler/janino/issues/113); " + - "If InternalCompilerException has been thrown and following conditions are met, you " + - "may want to turn this off and try executing the query again." + - "1) The generated code contains 'switch' statement." + - "2) Exception message contains 'Operand stack inconsistent at offset xxx: Previous size 1" + - ", now 0'." + - "The configuration will be no-op and maybe removed once Spark upgrades Janino containing" + - " the fix.") - .booleanConf - .createWithDefault(true) - val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files. " + "This configuration is effective only when using file-based sources such as Parquet, JSON " + @@ -2781,8 +2764,6 @@ class SQLConf extends Serializable with Logging { def wholeStageSplitConsumeFuncByOperator: Boolean = getConf(WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR) - def codegenUseSwitchStatement: Boolean = getConf(CODEGEN_USE_SWITCH_STATEMENT) - def tableRelationCacheSize: Int = getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index bdcf12c089bee..4e5cdb74aa8ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -54,8 +54,6 @@ case class ExpandExec( private[this] val projection = (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output) - private val useSwitchStatement: Boolean = sqlContext.conf.codegenUseSwitchStatement - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { val numOutputRows = longMetric("numOutputRows") @@ -169,7 +167,7 @@ case class ExpandExec( } } - // Part 2: switch/case statements, or if/else if statements via configuration + // Part 2: switch/case statements(, or if ~ else if statements when needed) val updates = projections.map { exprs => var updateCode = "" @@ -189,7 +187,7 @@ case class ExpandExec( // the name needs to be known to build conditions val i = ctx.freshName("i") - val loopContent = if (useSwitchStatement) { + val loopContent = if (!ctx.disallowSwitchStatement) { val cases = updates.zipWithIndex.map { case (updateCode, row) => s""" |case $row: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 10fe0f252322f..ade3fd1b21ef3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -629,6 +629,10 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) */ def doCodeGen(): (CodegenContext, CodeAndComment) = { val ctx = new CodegenContext + (ctx, doCodeGen(ctx)) + } + + private def doCodeGen(ctx: CodegenContext): CodeAndComment = { val code = child.asInstanceOf[CodegenSupport].produce(ctx, this) // main next function. @@ -647,7 +651,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) } ${ctx.registerComment( - s"""Codegend pipeline for stage (id=$codegenStageId) + s"""Codegen pipeline for stage (id=$codegenStageId) |${this.treeString.trim}""".stripMargin, "wsc_codegenPipeline")} ${ctx.registerComment(s"codegenStageId=$codegenStageId", "wsc_codegenStageId", true)} @@ -679,7 +683,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) new CodeAndComment(CodeFormatter.stripExtraNewLines(source), ctx.getPlaceHolderToComments())) logDebug(s"\n${CodeFormatter.format(cleanedSource)}") - (ctx, cleanedSource) + cleanedSource } override def doExecuteColumnar(): RDD[ColumnarBatch] = { @@ -688,11 +692,56 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) child.executeColumnar() } - override def doExecute(): RDD[InternalRow] = { + private type CompileResult = (CodegenContext, CodeAndComment, GeneratedClass, ByteCodeStats) + + /** + * NOTE: This method handles the known Janino bug: + * - https://github.com/janino-compiler/janino/issues/113 + * + * It tries to generate code and compile in normal path. If the compilation fails and the reason + * is due to the known bug, it generates workaround code via touching flag in CodegenContext and + * compile again. + */ + private def doGenCodeAndCompile(): CompileResult = { + def containsMsg(exception: Throwable, msg: String): Boolean = { + def contain(msg1: String, msg2: String): Boolean = { + msg1.toLowerCase(Locale.ROOT).contains(msg2.toLowerCase(Locale.ROOT)) + } + + var e = exception + var contains = contain(e.getMessage, msg) + while (e.getCause != null && !contains) { + e = e.getCause + contains = contain(e.getMessage, msg) + } + contains + } + val (ctx, cleanedSource) = doCodeGen() + try { + val (genClass, maxCodeSize) = CodeGenerator.compile(cleanedSource) + (ctx, cleanedSource, genClass, maxCodeSize) + } catch { + case NonFatal(e) if cleanedSource.body.contains("switch") && + containsMsg(e, "Operand stack inconsistent at offset") => + // It might hit known Janino bug (https://github.com/janino-compiler/janino/issues/113) + // Try to disallow "switch" statement during codegen, and compile again. + // The log level is matched with the log level for compilation error log message in + // Codegenerator.compile() to ensure the log message is shown if end users see the log + // for compilation error. + logError("Generated code hits known Janino bug - applying workaround and recompiling...") + + val newCtx = new CodegenContext(disallowSwitchStatement = true) + val newCleanedSource = doCodeGen(newCtx) + val (genClass, maxCodeSize) = CodeGenerator.compile(newCleanedSource) + (newCtx, newCleanedSource, genClass, maxCodeSize) + } + } + + override def doExecute(): RDD[InternalRow] = { // try to compile and fallback if it failed - val (_, compiledCodeStats) = try { - CodeGenerator.compile(cleanedSource) + val (ctx, cleanedSource, _, compiledCodeStats) = try { + doGenCodeAndCompile() } catch { case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback => // We should already saw the error message diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index c663d2305d168..2049652c8f88b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -978,8 +978,7 @@ class DataFrameAggregateSuite extends QueryTest (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true"), (SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key, "10000"), (SQLConf.CODEGEN_FALLBACK.key, "false"), - (SQLConf.CODEGEN_LOGGING_MAX_LINES.key, "-1"), - (SQLConf.CODEGEN_USE_SWITCH_STATEMENT.key, "false") + (SQLConf.CODEGEN_LOGGING_MAX_LINES.key, "-1") ) { var df = Seq(("1", "2", 1), ("1", "2", 2), ("2", "3", 3), ("2", "3", 4)).toDF("a", "b", "c")