From c7fdae569209374a5bfcf038dfc1995935a180da Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Nov 2020 01:19:21 -0800 Subject: [PATCH 1/9] Add subexpression elimination for interpreted expression evaluation. --- .../expressions/EvaluationRunTime.scala | 81 +++++++++++++++++++ .../expressions/ExpressionProxy.scala | 49 +++++++++++ .../InterpretedUnsafeProjection.scala | 17 +++- .../apache/spark/sql/internal/SQLConf.scala | 12 +++ 4 files changed, 157 insertions(+), 2 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala new file mode 100644 index 0000000000000..bc648f3354a33 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.internal.SQLConf + +/** + * This class helps subexpression elimination for interpreted evaluation + * in `InterpretedUnsafeProjection`. It maintains an evaluation cache. + * This class wraps `ExpressionProxy` around given expressions. The `ExpressionProxy` + * intercepts expression evaluation and loads from the cache first. + */ +class EvaluationRunTime { + + val cache: LoadingCache[ExpressionProxy, ResultProxy] = CacheBuilder.newBuilder() + .maximumSize(SQLConf.get.subexpressionEliminationCacheMaxEntries) + .build( + new CacheLoader[ExpressionProxy, ResultProxy]() { + override def load(expr: ExpressionProxy): ResultProxy = { + ResultProxy(expr.proxyEval(currentInput)) + } + }) + + private var currentInput: InternalRow = null + + /** + * Sets given input row as current row for evaluating expressions. This cleans up the cache + * too as new input comes. + */ + def setInput(input: InternalRow = null): Unit = { + currentInput = input + cache.cleanUp() + } + + /** + * Finds subexpressions and wraps them with `ExpressionProxy`. + */ + def proxyExpressions(expressions: Seq[Expression]): Seq[Expression] = { + val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions + + expressions.foreach(equivalentExpressions.addExprTree(_)) + + var proxyMap = Map.empty[Expression, ExpressionProxy] + + val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1) + commonExprs.foreach { e => + val expr = e.head + val proxy = ExpressionProxy(expr, this) + + proxyMap ++= e.map(_ -> proxy).toMap + } + + // Only adding proxy if we find subexpressions. + if (proxyMap.nonEmpty) { + expressions.map { expr => + expr.transformUp { + case e if proxyMap.contains(e) => proxyMap(e) + } + } + } else { + expressions + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala new file mode 100644 index 0000000000000..2654f55f7a43b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.types.DataType + +/** + * A proxy for an catalyst `Expression`. Given a runtime object `EvaluationRunTime`, when this + * is asked to evaluate, it will load the evaluation cache in the runtime first. + */ +case class ExpressionProxy(child: Expression, runtime: EvaluationRunTime) extends Expression { + + final override def dataType: DataType = child.dataType + final override def nullable: Boolean = child.nullable + final override def children: Seq[Expression] = child :: Nil + + // `ExpressionProxy` is for interpreted expression evaluation only. So cannot `doGenCode`. + final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + throw new UnsupportedOperationException(s"Cannot generate code for expression: $this") + + def proxyEval(input: InternalRow = null): Any = { + child.eval(input) + } + + override def eval(input: InternalRow = null): Any = { + runtime.cache.get(this).result + } +} + +/** + * A simple wrapper for holding `Any` in the cache of `EvaluationRunTime`. + */ +case class ResultProxy(result: Any) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala index 39a16e917c4a5..9d4332a7dae43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala @@ -20,6 +20,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{UserDefinedType, _} import org.apache.spark.unsafe.Platform @@ -33,6 +34,14 @@ import org.apache.spark.unsafe.Platform class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { import InterpretedUnsafeProjection._ + private[this] val subExprElimination = SQLConf.get.subexpressionEliminationEnabled + private[this] lazy val runtime = new EvaluationRunTime() + private[this] val proxyExpressions = if (subExprElimination) { + runtime.proxyExpressions(expressions) + } else { + expressions.toSeq + } + /** Number of (top level) fields in the resulting row. */ private[this] val numFields = expressions.length @@ -63,17 +72,21 @@ class InterpretedUnsafeProjection(expressions: Array[Expression]) extends Unsafe } override def initialize(partitionIndex: Int): Unit = { - expressions.foreach(_.foreach { + proxyExpressions.foreach(_.foreach { case n: Nondeterministic => n.initialize(partitionIndex) case _ => }) } override def apply(row: InternalRow): UnsafeRow = { + if (subExprElimination) { + runtime.setInput(row) + } + // Put the expression results in the intermediate row. var i = 0 while (i < numFields) { - values(i) = expressions(i).eval(row) + values(i) = proxyExpressions(i).eval(row) i += 1 } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 21357a492e39e..b1198c83f4989 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -539,6 +539,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val SUBEXPRESSION_ELIMINATION_CACHE_MAX_ENTRIES = + buildConf("spark.sql.subexpressionElimination.cache.maxEntries") + .internal() + .doc("The maximum entries of the cache used for interpreted subexpression elimination.") + .version("3.1.0") + .intConf + .checkValue(maxEntries => maxEntries >= 0, "The maximum must not be negative") + .createWithDefault(100) + val CASE_SENSITIVE = buildConf("spark.sql.caseSensitive") .internal() .doc("Whether the query analyzer should be case sensitive or not. " + @@ -3214,6 +3223,9 @@ class SQLConf extends Serializable with Logging { def subexpressionEliminationEnabled: Boolean = getConf(SUBEXPRESSION_ELIMINATION_ENABLED) + def subexpressionEliminationCacheMaxEntries: Int = + getConf(SUBEXPRESSION_ELIMINATION_CACHE_MAX_ENTRIES) + def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD) def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR) From 33ac8b4d4fffb231054d16a80ab6366f30683c2d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Nov 2020 14:31:30 -0800 Subject: [PATCH 2/9] Catch cache exception and throw original exception. --- .../sql/catalyst/expressions/ExpressionProxy.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala index 2654f55f7a43b..075661e5345e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.catalyst.expressions +import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.types.DataType @@ -38,8 +40,14 @@ case class ExpressionProxy(child: Expression, runtime: EvaluationRunTime) extend child.eval(input) } - override def eval(input: InternalRow = null): Any = { + override def eval(input: InternalRow = null): Any = try { runtime.cache.get(this).result + } catch { + // Cache.get() may wrap the original exception. See the following URL + // http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/ + // Cache.html#get(K,%20java.util.concurrent.Callable) + case e@(_: UncheckedExecutionException | _: ExecutionError) => + throw e.getCause } } From 6bab83c6f247910d421aa02eada0d5fdf4610120 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Nov 2020 17:45:53 -0800 Subject: [PATCH 3/9] Use invalidateAll instead of cleanUp. --- .../spark/sql/catalyst/expressions/EvaluationRunTime.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala index bc648f3354a33..9d3047cbc8487 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala @@ -46,7 +46,7 @@ class EvaluationRunTime { */ def setInput(input: InternalRow = null): Unit = { currentInput = input - cache.cleanUp() + cache.invalidateAll() } /** From ddd3a96db1e22bd0b8c8a5b4c351479a63e1e907 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Nov 2020 20:50:22 -0800 Subject: [PATCH 4/9] Add tests. --- .../expressions/EvaluationRunTime.scala | 11 ++- .../expressions/EvaluationRunTimeSuite.scala | 79 +++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTimeSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala index 9d3047cbc8487..e169a90cde3ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala @@ -70,8 +70,15 @@ class EvaluationRunTime { // Only adding proxy if we find subexpressions. if (proxyMap.nonEmpty) { expressions.map { expr => - expr.transformUp { - case e if proxyMap.contains(e) => proxyMap(e) + // `transform` will cause stackoverflow because it keeps transforming into + // `ExpressionProxy`. But we cannot use `transformUp` because we want to use + // subexpressions at higher level. So we `transformDown` until finding first + // subexpression. + var transformed = false + expr.transform { + case e if !transformed && proxyMap.contains(e) => + transformed = true + proxyMap(e) } } } else { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTimeSuite.scala new file mode 100644 index 0000000000000..6861961b6f490 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTimeSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite + +class EvaluationRunTimeSuite extends SparkFunSuite { + + test("Evaluate ExpressionProxy should create cached result") { + val runtime = new EvaluationRunTime() + val proxy = ExpressionProxy(Literal(1), runtime) + assert(runtime.cache.size() == 0) + proxy.eval() + assert(runtime.cache.size() == 1) + assert(runtime.cache.get(proxy) == ResultProxy(1)) + } + + test("setInput should empty cached result") { + val runtime = new EvaluationRunTime() + val proxy1 = ExpressionProxy(Literal(1), runtime) + assert(runtime.cache.size() == 0) + proxy1.eval() + assert(runtime.cache.size() == 1) + assert(runtime.cache.get(proxy1) == ResultProxy(1)) + + val proxy2 = ExpressionProxy(Literal(2), runtime) + proxy2.eval() + assert(runtime.cache.size() == 2) + assert(runtime.cache.get(proxy2) == ResultProxy(2)) + + runtime.setInput() + assert(runtime.cache.size() == 0) + } + + test("Wrap ExpressionProxy on subexpressions") { + val runtime = new EvaluationRunTime() + + val one = Literal(1) + val two = Literal(2) + val mul = Multiply(one, two) + val mul2 = Multiply(mul, mul) + val sqrt = Sqrt(mul2) + val sum = Add(mul2, sqrt) + + // ( (one * two) * (one * two) ) + sqrt( (one * two) * (one * two) ) + val proxyExpressions = runtime.proxyExpressions(Seq(sum)) + val proxys = proxyExpressions.flatMap(_.collect { + case p: ExpressionProxy => p + }) + // ( (one * two) * (one * two) ) + assert(proxys.size == 1) + val expected = ExpressionProxy(mul2, runtime) + assert(proxys.head == expected) + } + + test("ExpressionProxy won't be on non deterministic") { + val runtime = new EvaluationRunTime() + + val sum = Add(Rand(0), Rand(0)) + val proxys = runtime.proxyExpressions(Seq(sum, sum)).flatMap(_.collect { + case p: ExpressionProxy => p + }) + assert(proxys.isEmpty) + } +} From e8449ec6a550973aeb66c6deb2bf98a547b934d5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Nov 2020 22:42:55 -0800 Subject: [PATCH 5/9] For review comments. --- ...EvaluationRunTime.scala => EvaluationRuntime.scala} | 2 +- .../sql/catalyst/expressions/ExpressionProxy.scala | 8 ++++---- .../expressions/InterpretedUnsafeProjection.scala | 2 +- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- ...RunTimeSuite.scala => EvaluationRuntimeSuite.scala} | 10 +++++----- 5 files changed, 12 insertions(+), 12 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{EvaluationRunTime.scala => EvaluationRuntime.scala} (99%) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/{EvaluationRunTimeSuite.scala => EvaluationRuntimeSuite.scala} (91%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntime.scala similarity index 99% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntime.scala index e169a90cde3ad..1770d2e739e2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntime.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf * This class wraps `ExpressionProxy` around given expressions. The `ExpressionProxy` * intercepts expression evaluation and loads from the cache first. */ -class EvaluationRunTime { +class EvaluationRuntime { val cache: LoadingCache[ExpressionProxy, ResultProxy] = CacheBuilder.newBuilder() .maximumSize(SQLConf.get.subexpressionEliminationCacheMaxEntries) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala index 075661e5345e0..fda2287962a3d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala @@ -23,10 +23,10 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo import org.apache.spark.sql.types.DataType /** - * A proxy for an catalyst `Expression`. Given a runtime object `EvaluationRunTime`, when this + * A proxy for an catalyst `Expression`. Given a runtime object `EvaluationRuntime`, when this * is asked to evaluate, it will load the evaluation cache in the runtime first. */ -case class ExpressionProxy(child: Expression, runtime: EvaluationRunTime) extends Expression { +case class ExpressionProxy(child: Expression, runtime: EvaluationRuntime) extends Expression { final override def dataType: DataType = child.dataType final override def nullable: Boolean = child.nullable @@ -46,12 +46,12 @@ case class ExpressionProxy(child: Expression, runtime: EvaluationRunTime) extend // Cache.get() may wrap the original exception. See the following URL // http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/ // Cache.html#get(K,%20java.util.concurrent.Callable) - case e@(_: UncheckedExecutionException | _: ExecutionError) => + case e @ (_: UncheckedExecutionException | _: ExecutionError) => throw e.getCause } } /** - * A simple wrapper for holding `Any` in the cache of `EvaluationRunTime`. + * A simple wrapper for holding `Any` in the cache of `EvaluationRuntime`. */ case class ResultProxy(result: Any) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala index 9d4332a7dae43..c8ffbe79a6a4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala @@ -35,7 +35,7 @@ class InterpretedUnsafeProjection(expressions: Array[Expression]) extends Unsafe import InterpretedUnsafeProjection._ private[this] val subExprElimination = SQLConf.get.subexpressionEliminationEnabled - private[this] lazy val runtime = new EvaluationRunTime() + private[this] lazy val runtime = new EvaluationRuntime() private[this] val proxyExpressions = if (subExprElimination) { runtime.proxyExpressions(expressions) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b1198c83f4989..bf78f0bf0b5be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -545,7 +545,7 @@ object SQLConf { .doc("The maximum entries of the cache used for interpreted subexpression elimination.") .version("3.1.0") .intConf - .checkValue(maxEntries => maxEntries >= 0, "The maximum must not be negative") + .checkValue(_ >= 0, "The maximum must not be negative") .createWithDefault(100) val CASE_SENSITIVE = buildConf("spark.sql.caseSensitive") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntimeSuite.scala similarity index 91% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTimeSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntimeSuite.scala index 6861961b6f490..a2035a8cba355 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRunTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntimeSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite -class EvaluationRunTimeSuite extends SparkFunSuite { +class EvaluationRuntimeSuite extends SparkFunSuite { test("Evaluate ExpressionProxy should create cached result") { - val runtime = new EvaluationRunTime() + val runtime = new EvaluationRuntime() val proxy = ExpressionProxy(Literal(1), runtime) assert(runtime.cache.size() == 0) proxy.eval() @@ -30,7 +30,7 @@ class EvaluationRunTimeSuite extends SparkFunSuite { } test("setInput should empty cached result") { - val runtime = new EvaluationRunTime() + val runtime = new EvaluationRuntime() val proxy1 = ExpressionProxy(Literal(1), runtime) assert(runtime.cache.size() == 0) proxy1.eval() @@ -47,7 +47,7 @@ class EvaluationRunTimeSuite extends SparkFunSuite { } test("Wrap ExpressionProxy on subexpressions") { - val runtime = new EvaluationRunTime() + val runtime = new EvaluationRuntime() val one = Literal(1) val two = Literal(2) @@ -68,7 +68,7 @@ class EvaluationRunTimeSuite extends SparkFunSuite { } test("ExpressionProxy won't be on non deterministic") { - val runtime = new EvaluationRunTime() + val runtime = new EvaluationRuntime() val sum = Add(Rand(0), Rand(0)) val proxys = runtime.proxyExpressions(Seq(sum, sum)).flatMap(_.collect { From 4780b653f079c2fd4c950d43758587875d89e41a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 13 Nov 2020 22:25:07 -0800 Subject: [PATCH 6/9] For review comments. --- .../expressions/EvaluationRuntime.scala | 88 ------------ .../expressions/ExpressionProxy.scala | 57 -------- .../InterpretedUnsafeProjection.scala | 3 +- .../SubExprEvaluationRuntime.scala | 131 ++++++++++++++++++ ...la => SubExprEvaluationRuntimeSuite.scala} | 12 +- 5 files changed, 139 insertions(+), 152 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntime.scala delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/{EvaluationRuntimeSuite.scala => SubExprEvaluationRuntimeSuite.scala} (89%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntime.scala deleted file mode 100644 index 1770d2e739e2a..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntime.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.catalyst.expressions - -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.internal.SQLConf - -/** - * This class helps subexpression elimination for interpreted evaluation - * in `InterpretedUnsafeProjection`. It maintains an evaluation cache. - * This class wraps `ExpressionProxy` around given expressions. The `ExpressionProxy` - * intercepts expression evaluation and loads from the cache first. - */ -class EvaluationRuntime { - - val cache: LoadingCache[ExpressionProxy, ResultProxy] = CacheBuilder.newBuilder() - .maximumSize(SQLConf.get.subexpressionEliminationCacheMaxEntries) - .build( - new CacheLoader[ExpressionProxy, ResultProxy]() { - override def load(expr: ExpressionProxy): ResultProxy = { - ResultProxy(expr.proxyEval(currentInput)) - } - }) - - private var currentInput: InternalRow = null - - /** - * Sets given input row as current row for evaluating expressions. This cleans up the cache - * too as new input comes. - */ - def setInput(input: InternalRow = null): Unit = { - currentInput = input - cache.invalidateAll() - } - - /** - * Finds subexpressions and wraps them with `ExpressionProxy`. - */ - def proxyExpressions(expressions: Seq[Expression]): Seq[Expression] = { - val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions - - expressions.foreach(equivalentExpressions.addExprTree(_)) - - var proxyMap = Map.empty[Expression, ExpressionProxy] - - val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1) - commonExprs.foreach { e => - val expr = e.head - val proxy = ExpressionProxy(expr, this) - - proxyMap ++= e.map(_ -> proxy).toMap - } - - // Only adding proxy if we find subexpressions. - if (proxyMap.nonEmpty) { - expressions.map { expr => - // `transform` will cause stackoverflow because it keeps transforming into - // `ExpressionProxy`. But we cannot use `transformUp` because we want to use - // subexpressions at higher level. So we `transformDown` until finding first - // subexpression. - var transformed = false - expr.transform { - case e if !transformed && proxyMap.contains(e) => - transformed = true - proxyMap(e) - } - } - } else { - expressions - } - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala deleted file mode 100644 index fda2287962a3d..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionProxy.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.catalyst.expressions - -import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.types.DataType - -/** - * A proxy for an catalyst `Expression`. Given a runtime object `EvaluationRuntime`, when this - * is asked to evaluate, it will load the evaluation cache in the runtime first. - */ -case class ExpressionProxy(child: Expression, runtime: EvaluationRuntime) extends Expression { - - final override def dataType: DataType = child.dataType - final override def nullable: Boolean = child.nullable - final override def children: Seq[Expression] = child :: Nil - - // `ExpressionProxy` is for interpreted expression evaluation only. So cannot `doGenCode`. - final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = - throw new UnsupportedOperationException(s"Cannot generate code for expression: $this") - - def proxyEval(input: InternalRow = null): Any = { - child.eval(input) - } - - override def eval(input: InternalRow = null): Any = try { - runtime.cache.get(this).result - } catch { - // Cache.get() may wrap the original exception. See the following URL - // http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/ - // Cache.html#get(K,%20java.util.concurrent.Callable) - case e @ (_: UncheckedExecutionException | _: ExecutionError) => - throw e.getCause - } -} - -/** - * A simple wrapper for holding `Any` in the cache of `EvaluationRuntime`. - */ -case class ResultProxy(result: Any) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala index c8ffbe79a6a4a..7c0a74e008949 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala @@ -35,7 +35,8 @@ class InterpretedUnsafeProjection(expressions: Array[Expression]) extends Unsafe import InterpretedUnsafeProjection._ private[this] val subExprElimination = SQLConf.get.subexpressionEliminationEnabled - private[this] lazy val runtime = new EvaluationRuntime() + private[this] lazy val runtime = + new SubExprEvaluationRuntime(SQLConf.get.subexpressionEliminationCacheMaxEntries) private[this] val proxyExpressions = if (subExprElimination) { runtime.proxyExpressions(expressions) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala new file mode 100644 index 0000000000000..091a1aae84db4 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import scala.collection.mutable + +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.types.DataType + +/** + * This class helps subexpression elimination for interpreted evaluation + * such as `InterpretedUnsafeProjection`. It maintains an evaluation cache. + * This class wraps `ExpressionProxy` around given expressions. The `ExpressionProxy` + * intercepts expression evaluation and loads from the cache first. + */ +class SubExprEvaluationRuntime(cacheMaxEntries: Int) { + + private[sql] val cache: LoadingCache[ExpressionProxy, ResultProxy] = CacheBuilder.newBuilder() + .maximumSize(cacheMaxEntries) + .build( + new CacheLoader[ExpressionProxy, ResultProxy]() { + override def load(expr: ExpressionProxy): ResultProxy = { + ResultProxy(expr.proxyEval(currentInput)) + } + }) + + private var currentInput: InternalRow = null + + def getEval(proxy: ExpressionProxy): Any = try { + cache.get(proxy).result + } catch { + // Cache.get() may wrap the original exception. See the following URL + // http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/ + // Cache.html#get(K,%20java.util.concurrent.Callable) + case e @ (_: UncheckedExecutionException | _: ExecutionError) => + throw e.getCause + } + + /** + * Sets given input row as current row for evaluating expressions. This cleans up the cache + * too as new input comes. + */ + def setInput(input: InternalRow = null): Unit = { + currentInput = input + cache.invalidateAll() + } + + /** + * Recursively replaces expression with its proxy expression in `proxyMap`. + */ + private def replaceWithProxy( + expr: Expression, + proxyMap: Map[Expression, ExpressionProxy]): Expression = { + expr match { + case e if proxyMap.contains(e) => + proxyMap(e) + case _ => + expr.mapChildren(replaceWithProxy(_, proxyMap)) + } + } + + /** + * Finds subexpressions and wraps them with `ExpressionProxy`. + */ + def proxyExpressions(expressions: Seq[Expression]): Seq[Expression] = { + val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions + + expressions.foreach(equivalentExpressions.addExprTree(_)) + + val proxyMap = mutable.Map.empty[Expression, ExpressionProxy] + + val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1) + commonExprs.foreach { e => + val expr = e.head + val proxy = ExpressionProxy(expr, this) + + proxyMap ++= e.map(_ -> proxy).toMap + } + + // Only adding proxy if we find subexpressions. + if (proxyMap.nonEmpty) { + expressions.map(replaceWithProxy(_, proxyMap.toMap)) + } else { + expressions + } + } +} + +/** + * A proxy for an catalyst `Expression`. Given a runtime object `SubExprEvaluationRuntime`, + * when this is asked to evaluate, it will load from the evaluation cache in the runtime first. + */ +case class ExpressionProxy( + child: Expression, + runtime: SubExprEvaluationRuntime) extends Expression { + + final override def dataType: DataType = child.dataType + final override def nullable: Boolean = child.nullable + final override def children: Seq[Expression] = child :: Nil + + // `ExpressionProxy` is for interpreted expression evaluation only. So cannot `doGenCode`. + final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + throw new UnsupportedOperationException(s"Cannot generate code for expression: $this") + + def proxyEval(input: InternalRow = null): Any = child.eval(input) + + override def eval(input: InternalRow = null): Any = runtime.getEval(this) +} + +/** + * A simple wrapper for holding `Any` in the cache of `SubExprEvaluationRuntime`. + */ +case class ResultProxy(result: Any) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala similarity index 89% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntimeSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala index a2035a8cba355..a261c5003427a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/EvaluationRuntimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite -class EvaluationRuntimeSuite extends SparkFunSuite { +class SubExprEvaluationRuntimeSuite extends SparkFunSuite { test("Evaluate ExpressionProxy should create cached result") { - val runtime = new EvaluationRuntime() + val runtime = new SubExprEvaluationRuntime(1) val proxy = ExpressionProxy(Literal(1), runtime) assert(runtime.cache.size() == 0) proxy.eval() @@ -30,7 +30,7 @@ class EvaluationRuntimeSuite extends SparkFunSuite { } test("setInput should empty cached result") { - val runtime = new EvaluationRuntime() + val runtime = new SubExprEvaluationRuntime(2) val proxy1 = ExpressionProxy(Literal(1), runtime) assert(runtime.cache.size() == 0) proxy1.eval() @@ -47,7 +47,7 @@ class EvaluationRuntimeSuite extends SparkFunSuite { } test("Wrap ExpressionProxy on subexpressions") { - val runtime = new EvaluationRuntime() + val runtime = new SubExprEvaluationRuntime(1) val one = Literal(1) val two = Literal(2) @@ -62,13 +62,13 @@ class EvaluationRuntimeSuite extends SparkFunSuite { case p: ExpressionProxy => p }) // ( (one * two) * (one * two) ) - assert(proxys.size == 1) + assert(proxys.size == 2) val expected = ExpressionProxy(mul2, runtime) assert(proxys.head == expected) } test("ExpressionProxy won't be on non deterministic") { - val runtime = new EvaluationRuntime() + val runtime = new SubExprEvaluationRuntime(1) val sum = Add(Rand(0), Rand(0)) val proxys = runtime.proxyExpressions(Seq(sum, sum)).flatMap(_.collect { From 47bae358690c550a0259796b7bed137d1d38f28e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Nov 2020 09:54:24 -0800 Subject: [PATCH 7/9] For review comment and add one test. --- .../InterpretedUnsafeProjection.scala | 10 +++++----- .../SubExprEvaluationRuntime.scala | 7 +------ .../SubExprEvaluationRuntimeSuite.scala | 20 +++++++++++++++++++ 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala index 7c0a74e008949..f3ca4f06cd372 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala @@ -34,10 +34,10 @@ import org.apache.spark.unsafe.Platform class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { import InterpretedUnsafeProjection._ - private[this] val subExprElimination = SQLConf.get.subexpressionEliminationEnabled + private[this] val subExprEliminationEnabled = SQLConf.get.subexpressionEliminationEnabled private[this] lazy val runtime = new SubExprEvaluationRuntime(SQLConf.get.subexpressionEliminationCacheMaxEntries) - private[this] val proxyExpressions = if (subExprElimination) { + private[this] val exprs = if (subExprEliminationEnabled) { runtime.proxyExpressions(expressions) } else { expressions.toSeq @@ -73,21 +73,21 @@ class InterpretedUnsafeProjection(expressions: Array[Expression]) extends Unsafe } override def initialize(partitionIndex: Int): Unit = { - proxyExpressions.foreach(_.foreach { + exprs.foreach(_.foreach { case n: Nondeterministic => n.initialize(partitionIndex) case _ => }) } override def apply(row: InternalRow): UnsafeRow = { - if (subExprElimination) { + if (subExprEliminationEnabled) { runtime.setInput(row) } // Put the expression results in the intermediate row. var i = 0 while (i < numFields) { - values(i) = proxyExpressions(i).eval(row) + values(i) = exprs(i).eval(row) i += 1 } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala index 091a1aae84db4..bc8b0754a4ff1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala @@ -69,12 +69,7 @@ class SubExprEvaluationRuntime(cacheMaxEntries: Int) { private def replaceWithProxy( expr: Expression, proxyMap: Map[Expression, ExpressionProxy]): Expression = { - expr match { - case e if proxyMap.contains(e) => - proxyMap(e) - case _ => - expr.mapChildren(replaceWithProxy(_, proxyMap)) - } + proxyMap.getOrElse(expr, expr.mapChildren(replaceWithProxy(_, proxyMap))) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala index a261c5003427a..c67dba8f2418d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala @@ -29,6 +29,26 @@ class SubExprEvaluationRuntimeSuite extends SparkFunSuite { assert(runtime.cache.get(proxy) == ResultProxy(1)) } + test("SubExprEvaluationRuntime cannot exceed configured max entries") { + val runtime = new SubExprEvaluationRuntime(2) + assert(runtime.cache.size() == 0) + + val proxy1 = ExpressionProxy(Literal(1), runtime) + proxy1.eval() + assert(runtime.cache.size() == 1) + assert(runtime.cache.get(proxy1) == ResultProxy(1)) + + val proxy2 = ExpressionProxy(Literal(2), runtime) + proxy2.eval() + assert(runtime.cache.size() == 2) + assert(runtime.cache.get(proxy2) == ResultProxy(2)) + + val proxy3 = ExpressionProxy(Literal(3), runtime) + proxy3.eval() + assert(runtime.cache.size() == 2) + assert(runtime.cache.get(proxy3) == ResultProxy(3)) + } + test("setInput should empty cached result") { val runtime = new SubExprEvaluationRuntime(2) val proxy1 = ExpressionProxy(Literal(1), runtime) From 77168fe2dd687113ae1b9a2f086982861445ecda Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Nov 2020 23:32:55 -0800 Subject: [PATCH 8/9] Update benchmark results. --- .../SubExprEliminationBenchmark-jdk11-results.txt | 8 ++++---- .../benchmarks/SubExprEliminationBenchmark-results.txt | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/benchmarks/SubExprEliminationBenchmark-jdk11-results.txt b/sql/core/benchmarks/SubExprEliminationBenchmark-jdk11-results.txt index 49dc7adccbf3c..3d2b2e5c8edba 100644 --- a/sql/core/benchmarks/SubExprEliminationBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/SubExprEliminationBenchmark-jdk11-results.txt @@ -7,9 +7,9 @@ OpenJDK 64-Bit Server VM 11.0.9+11 on Mac OS X 10.15.6 Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz from_json as subExpr: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -subexpressionElimination off, codegen on 26809 27731 898 0.0 268094225.4 1.0X -subexpressionElimination off, codegen off 25117 26612 1357 0.0 251166638.4 1.1X -subexpressionElimination on, codegen on 2582 2906 282 0.0 25819408.7 10.4X -subexpressionElimination on, codegen off 25635 26131 804 0.0 256346873.1 1.0X +subexpressionElimination off, codegen on 25932 26908 916 0.0 259320042.3 1.0X +subexpressionElimination off, codegen off 26085 26159 65 0.0 260848905.0 1.0X +subexpressionElimination on, codegen on 2860 2939 72 0.0 28603312.9 9.1X +subexpressionElimination on, codegen off 2517 2617 93 0.0 25165157.7 10.3X diff --git a/sql/core/benchmarks/SubExprEliminationBenchmark-results.txt b/sql/core/benchmarks/SubExprEliminationBenchmark-results.txt index 3f131726bc53d..ca2a9c6497500 100644 --- a/sql/core/benchmarks/SubExprEliminationBenchmark-results.txt +++ b/sql/core/benchmarks/SubExprEliminationBenchmark-results.txt @@ -7,9 +7,9 @@ OpenJDK 64-Bit Server VM 1.8.0_265-b01 on Mac OS X 10.15.6 Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz from_json as subExpr: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -subexpressionElimination off, codegen on 24841 25365 803 0.0 248412787.5 1.0X -subexpressionElimination off, codegen off 25344 26205 941 0.0 253442656.5 1.0X -subexpressionElimination on, codegen on 2883 3019 119 0.0 28833086.8 8.6X -subexpressionElimination on, codegen off 24707 25688 903 0.0 247068775.9 1.0X +subexpressionElimination off, codegen on 26503 27622 1937 0.0 265033362.4 1.0X +subexpressionElimination off, codegen off 24920 25376 430 0.0 249196978.2 1.1X +subexpressionElimination on, codegen on 2421 2466 39 0.0 24213606.1 10.9X +subexpressionElimination on, codegen off 2360 2435 87 0.0 23604320.7 11.2X From db115d6d850942eca5dc6fac80896b1038561e51 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 16 Nov 2020 16:59:58 -0800 Subject: [PATCH 9/9] For review comment. --- .../SubExprEvaluationRuntime.scala | 35 ++++++++++++++----- .../SubExprEvaluationRuntimeSuite.scala | 17 ++++----- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala index bc8b0754a4ff1..3189d81289903 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala @@ -16,7 +16,9 @@ */ package org.apache.spark.sql.catalyst.expressions -import scala.collection.mutable +import java.util.IdentityHashMap + +import scala.collection.JavaConverters._ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} @@ -32,6 +34,10 @@ import org.apache.spark.sql.types.DataType * intercepts expression evaluation and loads from the cache first. */ class SubExprEvaluationRuntime(cacheMaxEntries: Int) { + // The id assigned to `ExpressionProxy`. `SubExprEvaluationRuntime` will use assigned ids of + // `ExpressionProxy` to decide the equality when loading from cache. `SubExprEvaluationRuntime` + // won't be use by multi-threads so we don't need to consider concurrency here. + private var proxyExpressionCurrentId = 0 private[sql] val cache: LoadingCache[ExpressionProxy, ResultProxy] = CacheBuilder.newBuilder() .maximumSize(cacheMaxEntries) @@ -68,8 +74,12 @@ class SubExprEvaluationRuntime(cacheMaxEntries: Int) { */ private def replaceWithProxy( expr: Expression, - proxyMap: Map[Expression, ExpressionProxy]): Expression = { - proxyMap.getOrElse(expr, expr.mapChildren(replaceWithProxy(_, proxyMap))) + proxyMap: IdentityHashMap[Expression, ExpressionProxy]): Expression = { + if (proxyMap.containsKey(expr)) { + proxyMap.get(expr) + } else { + expr.mapChildren(replaceWithProxy(_, proxyMap)) + } } /** @@ -80,19 +90,20 @@ class SubExprEvaluationRuntime(cacheMaxEntries: Int) { expressions.foreach(equivalentExpressions.addExprTree(_)) - val proxyMap = mutable.Map.empty[Expression, ExpressionProxy] + val proxyMap = new IdentityHashMap[Expression, ExpressionProxy] val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1) commonExprs.foreach { e => val expr = e.head - val proxy = ExpressionProxy(expr, this) + val proxy = ExpressionProxy(expr, proxyExpressionCurrentId, this) + proxyExpressionCurrentId += 1 - proxyMap ++= e.map(_ -> proxy).toMap + proxyMap.putAll(e.map(_ -> proxy).toMap.asJava) } // Only adding proxy if we find subexpressions. - if (proxyMap.nonEmpty) { - expressions.map(replaceWithProxy(_, proxyMap.toMap)) + if (!proxyMap.isEmpty) { + expressions.map(replaceWithProxy(_, proxyMap)) } else { expressions } @@ -105,6 +116,7 @@ class SubExprEvaluationRuntime(cacheMaxEntries: Int) { */ case class ExpressionProxy( child: Expression, + id: Int, runtime: SubExprEvaluationRuntime) extends Expression { final override def dataType: DataType = child.dataType @@ -118,6 +130,13 @@ case class ExpressionProxy( def proxyEval(input: InternalRow = null): Any = child.eval(input) override def eval(input: InternalRow = null): Any = runtime.getEval(this) + + override def equals(obj: Any): Boolean = obj match { + case other: ExpressionProxy => this.id == other.id + case _ => false + } + + override def hashCode(): Int = this.id.hashCode() } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala index c67dba8f2418d..badcd4fc3fdad 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.types.IntegerType class SubExprEvaluationRuntimeSuite extends SparkFunSuite { test("Evaluate ExpressionProxy should create cached result") { val runtime = new SubExprEvaluationRuntime(1) - val proxy = ExpressionProxy(Literal(1), runtime) + val proxy = ExpressionProxy(Literal(1), 0, runtime) assert(runtime.cache.size() == 0) proxy.eval() assert(runtime.cache.size() == 1) @@ -33,17 +34,17 @@ class SubExprEvaluationRuntimeSuite extends SparkFunSuite { val runtime = new SubExprEvaluationRuntime(2) assert(runtime.cache.size() == 0) - val proxy1 = ExpressionProxy(Literal(1), runtime) + val proxy1 = ExpressionProxy(Literal(1), 0, runtime) proxy1.eval() assert(runtime.cache.size() == 1) assert(runtime.cache.get(proxy1) == ResultProxy(1)) - val proxy2 = ExpressionProxy(Literal(2), runtime) + val proxy2 = ExpressionProxy(Literal(2), 1, runtime) proxy2.eval() assert(runtime.cache.size() == 2) assert(runtime.cache.get(proxy2) == ResultProxy(2)) - val proxy3 = ExpressionProxy(Literal(3), runtime) + val proxy3 = ExpressionProxy(Literal(3), 2, runtime) proxy3.eval() assert(runtime.cache.size() == 2) assert(runtime.cache.get(proxy3) == ResultProxy(3)) @@ -51,13 +52,13 @@ class SubExprEvaluationRuntimeSuite extends SparkFunSuite { test("setInput should empty cached result") { val runtime = new SubExprEvaluationRuntime(2) - val proxy1 = ExpressionProxy(Literal(1), runtime) + val proxy1 = ExpressionProxy(Literal(1), 0, runtime) assert(runtime.cache.size() == 0) proxy1.eval() assert(runtime.cache.size() == 1) assert(runtime.cache.get(proxy1) == ResultProxy(1)) - val proxy2 = ExpressionProxy(Literal(2), runtime) + val proxy2 = ExpressionProxy(Literal(2), 1, runtime) proxy2.eval() assert(runtime.cache.size() == 2) assert(runtime.cache.get(proxy2) == ResultProxy(2)) @@ -83,8 +84,8 @@ class SubExprEvaluationRuntimeSuite extends SparkFunSuite { }) // ( (one * two) * (one * two) ) assert(proxys.size == 2) - val expected = ExpressionProxy(mul2, runtime) - assert(proxys.head == expected) + val expected = ExpressionProxy(mul2, 0, runtime) + assert(proxys.forall(_ == expected)) } test("ExpressionProxy won't be on non deterministic") {