From 1b42014c83ea06425f0af92a8c40ed4a265663c5 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Tue, 29 Mar 2022 18:14:41 +0800 Subject: [PATCH] [NSE-772] Code refactor for ColumnarBatchScan (#805) * Override doCanonicalize in ColumnarBatchScanExec * Refactor ColumnarBatchScan --- .../oap/execution/ColumnarBatchScanExec.scala | 78 +++++++++++++++++++ .../oap/extension/ColumnarOverrides.scala | 23 +----- .../columnar/ColumnarGuardRule.scala | 22 +----- .../python/ColumnarArrowPythonRunner.scala | 1 - .../oap/execution/ColumnarBatchScanExec.scala | 52 ------------- .../execution/ColumnarBatchScanExecBase.scala | 30 +++++++ .../sql/shims/spark311/Spark311Shims.scala | 1 - .../oap/execution/ColumnarBatchScanExec.scala | 53 ------------- .../execution/ColumnarBatchScanExecBase.scala | 31 ++++++++ .../sql/shims/spark321/Spark321Shims.scala | 1 - 10 files changed, 141 insertions(+), 151 deletions(-) create mode 100644 native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala delete mode 100644 shims/spark311/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala create mode 100644 shims/spark311/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExecBase.scala delete mode 100644 shims/spark321/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala create mode 100644 shims/spark321/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExecBase.scala diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala new file mode 100644 index 000000000..a1c7540a6 --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.execution + +import com.intel.oap.GazellePluginConfig +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Literal, _} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.connector.read.{Scan} +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.vectorized.ColumnarBatch + + +class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan, + runtimeFilters: Seq[Expression]) + extends ColumnarBatchScanExecBase(output, scan, runtimeFilters) { + val tmpDir: String = GazellePluginConfig.getConf.tmpFile + override def supportsColumnar(): Boolean = true + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"), + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"), + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_batchscan"), + "inputSize" -> SQLMetrics.createSizeMetric(sparkContext, "input size in bytes")) + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputRows = longMetric("numOutputRows") + val numInputBatches = longMetric("numInputBatches") + val numOutputBatches = longMetric("numOutputBatches") + val scanTime = longMetric("scanTime") + val inputSize = longMetric("inputSize") + val inputColumnarRDD = + new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory, + true, scanTime, numInputBatches, inputSize, tmpDir) + inputColumnarRDD.map { r => + numOutputRows += r.numRows() + numOutputBatches += 1 + r + } + } + + override def doCanonicalize(): ColumnarBatchScanExec = { + if (runtimeFilters == null) { + // For spark3.1. + new ColumnarBatchScanExec(output.map(QueryPlan.normalizeExpressions(_, output)), scan, null) + } else { + // For spark3.2. + new ColumnarBatchScanExec( + output.map(QueryPlan.normalizeExpressions(_, output)), scan, + QueryPlan.normalizePredicates( + runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)), + output)) + } + } + + override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBatchScanExec] + + override def equals(other: Any): Boolean = other match { + case that: ColumnarBatchScanExec => + (that canEqual this) && super.equals(that) + case _ => false + } +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 43a39f43c..75ec6316c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -27,7 +27,6 @@ import com.intel.oap.sql.shims.SparkShimLoader import org.apache.spark.{MapOutputStatistics, SparkContext} import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.BuildLeft @@ -51,7 +50,6 @@ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, ColumnarArrowEvalPythonExec} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ShufflePartitionUtils @@ -90,26 +88,7 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { case plan: BatchScanExec => logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") val runtimeFilters = SparkShimLoader.getSparkShims.getRuntimeFilters(plan) - new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters) { - // This method is a commonly shared implementation for ColumnarBatchScanExec. - // We move it outside of shim layer to break the cyclic dependency caused by - // ColumnarDataSourceRDD. - override def doExecuteColumnar(): RDD[ColumnarBatch] = { - val numOutputRows = longMetric("numOutputRows") - val numInputBatches = longMetric("numInputBatches") - val numOutputBatches = longMetric("numOutputBatches") - val scanTime = longMetric("scanTime") - val inputSize = longMetric("inputSize") - val inputColumnarRDD = - new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory, - true, scanTime, numInputBatches, inputSize, tmpDir) - inputColumnarRDD.map { r => - numOutputRows += r.numRows() - numOutputBatches += 1 - r - } - } - } + new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters) case plan: CoalesceExec => ColumnarCoalesceExec(plan.numPartitions, replaceWithColumnarPlan(plan.child)) case plan: InMemoryTableScanExec => diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala index 875704492..0ba694720 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.ArrowEvalPythonExec import org.apache.spark.sql.execution.python.ColumnarArrowEvalPythonExec import org.apache.spark.sql.execution.window.WindowExec -import org.apache.spark.sql.vectorized.ColumnarBatch case class RowGuard(child: SparkPlan) extends SparkPlan { def output: Seq[Attribute] = child.output @@ -81,26 +80,7 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { case plan: BatchScanExec => if (!enableColumnarBatchScan) return false val runtimeFilters = SparkShimLoader.getSparkShims.getRuntimeFilters(plan) - new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters) { - // This method is a commonly shared implementation for ColumnarBatchScanExec. - // We move it outside of shim layer to break the cyclic dependency caused by - // ColumnarDataSourceRDD. - override def doExecuteColumnar(): RDD[ColumnarBatch] = { - val numOutputRows = longMetric("numOutputRows") - val numInputBatches = longMetric("numInputBatches") - val numOutputBatches = longMetric("numOutputBatches") - val scanTime = longMetric("scanTime") - val inputSize = longMetric("inputSize") - val inputColumnarRDD = - new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory, - true, scanTime, numInputBatches, inputSize, tmpDir) - inputColumnarRDD.map { r => - numOutputRows += r.numRows() - numOutputBatches += 1 - r - } - } - } + new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters) case plan: FileSourceScanExec => if (plan.supportsColumnar) { return false diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/python/ColumnarArrowPythonRunner.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/python/ColumnarArrowPythonRunner.scala index 0e9c143a6..f245d835c 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/python/ColumnarArrowPythonRunner.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/python/ColumnarArrowPythonRunner.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.ArrowUtils -import org.apache.spark.sql.BasePythonRunnerChild import org.apache.spark.util.Utils /** diff --git a/shims/spark311/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala b/shims/spark311/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala deleted file mode 100644 index e0cfccd9d..000000000 --- a/shims/spark311/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.oap.execution - -//import com.intel.oap.GazellePluginConfig -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} - -/** For spark 3.1, the runtimeFilters: Seq[Expression] is not introduced in BatchScanExec. - * This class lacks the implementation for doExecuteColumnar. - */ -abstract class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan, - runtimeFilters: Seq[Expression]) - extends BatchScanExec(output, scan) { - // tmpDir is used by ParquetReader, which looks useless (may be removed in the future). - // Here, "/tmp" is directly used, no need to get it set through configuration. - // val tmpDir: String = GazellePluginConfig.getConf.tmpFile - val tmpDir: String = "/tmp" - override def supportsColumnar(): Boolean = true - override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"), - "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_batchscan"), - "inputSize" -> SQLMetrics.createSizeMetric(sparkContext, "input size in bytes")) - - override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBatchScanExec] - - override def equals(other: Any): Boolean = other match { - case that: ColumnarBatchScanExec => - (that canEqual this) && super.equals(that) - case _ => false - } -} diff --git a/shims/spark311/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExecBase.scala b/shims/spark311/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExecBase.scala new file mode 100644 index 000000000..19b015b72 --- /dev/null +++ b/shims/spark311/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExecBase.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.execution + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.connector.read.Scan + +/** For spark 3.1, the runtimeFilters: Seq[Expression] is not introduced in BatchScanExec. + */ +abstract class ColumnarBatchScanExecBase(output: Seq[AttributeReference], @transient scan: Scan, + runtimeFilters: Seq[Expression]) + extends BatchScanExec(output, scan) { + +} \ No newline at end of file diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index 7535c4eeb..46912bb70 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -16,7 +16,6 @@ package com.intel.oap.sql.shims.spark311 -import com.intel.oap.execution.ColumnarBatchScanExec import com.intel.oap.spark.sql.ArrowWriteQueue import com.intel.oap.sql.shims.{ShimDescriptor, SparkShims} import java.io.File diff --git a/shims/spark321/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala b/shims/spark321/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala deleted file mode 100644 index 99c12d394..000000000 --- a/shims/spark321/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.oap.execution - -//import com.intel.oap.GazellePluginConfig -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} - -/** - * The runtimeFilters is not actually used in ColumnarBatchScanExec currently. - * This class lacks the implementation for doExecuteColumnar. - */ -abstract class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan, - runtimeFilters: Seq[Expression]) - extends BatchScanExec(output, scan, runtimeFilters) { - // tmpDir is used by ParquetReader, which looks useless (may be removed in the future). - // Here, "/tmp" is directly used, no need to get it set through configuration. - // val tmpDir: String = GazellePluginConfig.getConf.tmpFile - val tmpDir: String = "/tmp" - override def supportsColumnar(): Boolean = true - override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"), - "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_batchscan"), - "inputSize" -> SQLMetrics.createSizeMetric(sparkContext, "input size in bytes")) - - override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBatchScanExec] - - override def equals(other: Any): Boolean = other match { - case that: ColumnarBatchScanExec => - (that canEqual this) && super.equals(that) - case _ => false - } -} diff --git a/shims/spark321/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExecBase.scala b/shims/spark321/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExecBase.scala new file mode 100644 index 000000000..64ee16eb2 --- /dev/null +++ b/shims/spark321/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExecBase.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.execution + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.connector.read.Scan + +/** + * This class is used to adapt to spark3.2 BatchScanExec with runtimeFilters. + */ +abstract class ColumnarBatchScanExecBase(output: Seq[AttributeReference], @transient scan: Scan, + runtimeFilters: Seq[Expression]) + extends BatchScanExec(output, scan, runtimeFilters) { + +} \ No newline at end of file diff --git a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala index 3fafa8d3f..34f5bd03c 100644 --- a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala @@ -16,7 +16,6 @@ package com.intel.oap.sql.shims.spark321 -import com.intel.oap.execution.ColumnarBatchScanExec import com.intel.oap.spark.sql.ArrowWriteQueue import com.intel.oap.sql.shims.{ShimDescriptor, SparkShims} import java.io.File