Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-394]Support ColumnarArrowEvalPython operator (#395)
Browse files Browse the repository at this point in the history
* Support columnarBatch to python udf

Support Arrow UDF
ColumnarArrowEvalPythonExec is basically runnable

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* Fixes ColumnarProject issue and now udf is runnable

Still need to re-walk for non-project input data retain and close

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* Add a configuration to disable columnar arrow udf

spark.oap.sql.columnar.arrowudf = false

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* Fix non-project multiple payload issue and add metrics

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
  • Loading branch information
xuechendi authored Jul 8, 2021
1 parent b584b08 commit 9c42e25
Show file tree
Hide file tree
Showing 7 changed files with 454 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, ColumnarArrowEvalPythonExec}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -60,10 +61,14 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
val enableColumnarShuffledHashJoin = columnarConf.enableColumnarShuffledHashJoin
val enableColumnarBroadcastExchange = columnarConf.enableColumnarBroadcastExchange
val enableColumnarBroadcastJoin = columnarConf.enableColumnarBroadcastJoin
val enableColumnarArrowUDF = columnarConf.enableColumnarArrowUDF

private def tryConvertToColumnar(plan: SparkPlan): Boolean = {
try {
val columnarPlan = plan match {
case plan: ArrowEvalPythonExec =>
if (!enableColumnarArrowUDF) return false
ColumnarArrowEvalPythonExec(plan.udfs, plan.resultAttrs, plan.child, plan.evalType)
case plan: BatchScanExec =>
if (!enableColumnarBatchScan) return false
new ColumnarBatchScanExec(plan.output, plan.scan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, ColumnarArrowEvalPythonExec}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -60,6 +61,9 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
}
logDebug(s"Columnar Processing for ${actualPlan.getClass} is under RowGuard.")
actualPlan.withNewChildren(actualPlan.children.map(replaceWithColumnarPlan))
case plan: ArrowEvalPythonExec =>
val columnarChild = replaceWithColumnarPlan(plan.child)
ColumnarArrowEvalPythonExec(plan.udfs, plan.resultAttrs, columnarChild, plan.evalType)
case plan: BatchScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarBatchScanExec(plan.output, plan.scan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging {
val enableColumnarBroadcastJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.broadcastJoin", "true").toBoolean && enableCpu

// enable or disable columnar columnar arrow udf
val enableColumnarArrowUDF: Boolean =
conf.getConfString("spark.oap.sql.columnar.arrowudf", "true").toBoolean && enableCpu

// enable or disable columnar wholestagecodegen
val enableColumnarWholeStageCodegen: Boolean =
conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean && enableCpu
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ class ColumnarProjection (
ConverterUtils.releaseArrowRecordBatch(inputRecordBatch)
outputVectors.toList
} else {
List[ArrowWritableColumnVector]()
val inputRecordBatch: ArrowRecordBatch = ConverterUtils.createArrowRecordBatch(numRows, inputColumnVector)
ArrowWritableColumnVector.loadColumns(numRows, outputArrowSchema, inputRecordBatch).toList
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,23 @@ object ConverterUtils extends Logging {
val vectors = (0 until columnarBatch.numCols)
.map(i => columnarBatch.column(i).asInstanceOf[ArrowWritableColumnVector])
.toList
if (schema == null) {
schema = new Schema(vectors.map(_.getValueVector().getField).asJava)
MessageSerializer.serialize(channel, schema, option)
}
val batch = ConverterUtils
.createArrowRecordBatch(columnarBatch.numRows, vectors.map(_.getValueVector))
try {
MessageSerializer.serialize(channel, batch, option)
} finally {
batch.close()
if (schema == null) {
schema = new Schema(vectors.map(_.getValueVector().getField).asJava)
MessageSerializer.serialize(channel, schema, option)
}
val batch = ConverterUtils
.createArrowRecordBatch(columnarBatch.numRows, vectors.map(_.getValueVector))
try {
MessageSerializer.serialize(channel, batch, option)
} finally {
batch.close()
}
} catch {
case e =>
System.err.println(s"conversion failed")
e.printStackTrace()
throw e
}
}
}
Expand Down
Loading

0 comments on commit 9c42e25

Please sign in to comment.