Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-75]Support ColumnarHashAggregate in ColumnarWSCG #76

Merged
merged 6 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ class ColumnarPluginConfig(conf: SparkConf) {
conf.getBoolean("spark.sql.columnar.sort", defaultValue = false)
val enableColumnarNaNCheck: Boolean =
conf.getBoolean("spark.sql.columnar.nanCheck", defaultValue = false)
val enableCodegenHashAggregate: Boolean =
conf.getBoolean("spark.sql.columnar.codegen.hashAggregate", defaultValue = false)
val enableColumnarBroadcastJoin: Boolean =
conf.getBoolean("spark.sql.columnar.sort.broadcastJoin", defaultValue = true)
val enableColumnarWindow: Boolean =
Expand All @@ -51,7 +49,9 @@ class ColumnarPluginConfig(conf: SparkConf) {
val batchSize: Int =
conf.getInt("spark.sql.execution.arrow.maxRecordsPerBatch", defaultValue = 10000)
val enableMetricsTime: Boolean =
conf.getBoolean("spark.oap.sql.columnar.wholestagecodegen.breakdownTime", defaultValue = false)
conf.getBoolean(
"spark.oap.sql.columnar.wholestagecodegen.breakdownTime",
defaultValue = false)
val tmpFile: String =
conf.getOption("spark.sql.columnar.tmp_dir").getOrElse(null)
@deprecated val broadcastCacheTimeout: Int =
Expand Down Expand Up @@ -111,7 +111,7 @@ object ColumnarPluginConfig {
ins.batchSize
}
}
def getEnableMetricsTime: Boolean = synchronized{
def getEnableMetricsTime: Boolean = synchronized {
if (ins == null) {
false
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ package com.intel.oap.execution
import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.expression._
import com.intel.oap.vectorized._
import com.google.common.collect.Lists
import java.util.concurrent.TimeUnit._

import org.apache.arrow.gandiva.expression._
import org.apache.arrow.gandiva.evaluator._
import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.arrow.vector.types.pojo.Field
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.spark.TaskContext
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -60,22 +66,18 @@ case class ColumnarHashAggregateExec(
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends BaseAggregateExec
with BlockingOperatorWithCodegen
with ColumnarCodegenSupport
with AliasAwareOutputPartitioning {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf(sparkContext.getConf).numaBindingInfo
override def supportsColumnar = true

// Disable code generation
override def supportCodegen: Boolean = false

// Members declared in org.apache.spark.sql.execution.AliasAwareOutputPartitioning
override protected def outputExpressions: Seq[NamedExpression] = resultExpressions

// Members declared in org.apache.spark.sql.execution.CodegenSupport
protected def doProduce(ctx: CodegenContext): String = throw new UnsupportedOperationException()
def inputRDDs(): Seq[RDD[InternalRow]] = throw new UnsupportedOperationException()

// Members declared in org.apache.spark.sql.catalyst.plans.QueryPlan
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
Expand All @@ -90,27 +92,30 @@ case class ColumnarHashAggregateExec(
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation process"),
"totalTime" -> SQLMetrics
.createTimingMetric(sparkContext, "totaltime_hashagg"))
"processTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_hashagg"))

val numOutputRows = longMetric("numOutputRows")
val numOutputBatches = longMetric("numOutputBatches")
val numInputBatches = longMetric("numInputBatches")
val aggTime = longMetric("aggTime")
val totalTime = longMetric("totalTime")
val totalTime = longMetric("processTime")
numOutputRows.set(0)
numOutputBatches.set(0)
numInputBatches.set(0)

buildCheck()

val (listJars, signature): (Seq[String], String) =
if (ColumnarPluginConfig
.getConf(sparkConf)
.enableCodegenHashAggregate && groupingExpressions.nonEmpty) {
var signature: String = ""
try {
signature = ColumnarGroupbyHashAggregation.prebuild(
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitionsWithIndex { (partIndex, iter) =>
ExecutorManager.tryTaskSet(numaBindingInfo)
val hasInput = iter.hasNext
val res = if (!hasInput) {
// This is a grouped aggregate and the input iterator is empty,
// so return an empty iterator.
Iterator.empty
} else {
var aggregation = ColumnarAggregation.create(
partIndex,
groupingExpressions,
child.output,
aggregateExpressions,
Expand All @@ -123,36 +128,22 @@ case class ColumnarHashAggregateExec(
aggTime,
totalTime,
sparkConf)
} catch {
case e: UnsupportedOperationException
if e.getMessage == "Unsupport to generate native expression from replaceable expression." =>
logWarning(e.getMessage())
case e: Throwable =>
throw e
}
if (signature != "") {
if (sparkContext.listJars.filter(path => path.contains(s"${signature}.jar")).isEmpty) {
val tempDir = ColumnarPluginConfig.getRandomTempDir
val jarFileName =
s"${tempDir}/tmp/spark-columnar-plugin-codegen-precompile-${signature}.jar"
sparkContext.addJar(jarFileName)
}
(sparkContext.listJars.filter(path => path.contains(s"${signature}.jar")), signature)
} else {
(List(), "")
SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => {
aggregation.close()
})
new CloseableColumnBatchIterator(aggregation.createIterator(iter))
}
} else {
(List(), "")
res
}
listJars.foreach(jar => logInfo(s"Uploaded ${jar}"))
}

def buildCheck(): Unit = {
// check datatype
for (attr <- child.output) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e : UnsupportedOperationException =>
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarAggregation")
}
Expand All @@ -174,11 +165,12 @@ case class ColumnarHashAggregateExec(
val aggregateFunction = expr.aggregateFunction
aggregateFunction match {
case Average(_) | Sum(_) | Count(_) | Max(_) | Min(_) =>
case StddevSamp(_) => mode match {
case Partial | Final =>
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
case StddevSamp(_) =>
mode match {
case Partial | Final =>
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
Expand All @@ -190,72 +182,80 @@ case class ColumnarHashAggregateExec(
}
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitionsWithIndex { (partIndex, iter) =>
ExecutorManager.tryTaskSet(numaBindingInfo)
val hasInput = iter.hasNext
val res = if (!hasInput) {
// This is a grouped aggregate and the input iterator is empty,
// so return an empty iterator.
Iterator.empty
} else {
if (ColumnarPluginConfig
.getConf(sparkConf)
.enableCodegenHashAggregate && groupingExpressions.nonEmpty) {
val execTempDir = ColumnarPluginConfig.getTempFile
val jarList = listJars
.map(jarUrl => {
logInfo(s"HashAggregate Get Codegened library Jar ${jarUrl}")
UserAddedJarUtils.fetchJarFromSpark(
jarUrl,
execTempDir,
s"spark-columnar-plugin-codegen-precompile-${signature}.jar",
sparkConf)
s"${execTempDir}/spark-columnar-plugin-codegen-precompile-${signature}.jar"
})
val aggregation = ColumnarGroupbyHashAggregation.create(
groupingExpressions,
child.output,
aggregateExpressions,
aggregateAttributes,
resultExpressions,
output,
jarList,
numInputBatches,
numOutputBatches,
numOutputRows,
aggTime,
totalTime,
sparkConf)
SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => {
aggregation.close()
})
new CloseableColumnBatchIterator(aggregation.createIterator(iter))
} else {
var aggregation = ColumnarAggregation.create(
partIndex,
groupingExpressions,
child.output,
aggregateExpressions,
aggregateAttributes,
resultExpressions,
output,
numInputBatches,
numOutputBatches,
numOutputRows,
aggTime,
totalTime,
sparkConf)
SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => {
aggregation.close()
})
new CloseableColumnBatchIterator(aggregation.createIterator(iter))
}
}
res
/** ColumnarCodegenSupport **/
override def inputRDDs(): Seq[RDD[ColumnarBatch]] = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.inputRDDs
case _ =>
Seq(child.executeColumnar())
}

override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.getBuildPlans
case _ =>
Seq()
}

override def getStreamedLeafPlan: SparkPlan = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.getStreamedLeafPlan
case _ =>
this
}

override def updateMetrics(out_num_rows: Long, process_time: Long): Unit = {
val numOutputRows = longMetric("numOutputRows")
val procTime = longMetric("processTime")
procTime.set(process_time / 1000000)
numOutputRows += out_num_rows
}

override def getChild: SparkPlan = child

override def supportColumnarCodegen: Boolean = true

// override def canEqual(that: Any): Boolean = false

def getKernelFunction: TreeNode = {
ColumnarHashAggregationWithCodegen.prepareKernelFunction(
groupingExpressions,
child.output,
aggregateExpressions,
aggregateAttributes,
resultExpressions,
output,
sparkConf)
}

override def doCodeGen: ColumnarCodegenContext = {

val childCtx = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.doCodeGen
case _ =>
null
}
val (codeGenNode, inputSchema) = if (childCtx != null) {
(
TreeBuilder.makeFunction(
s"child",
Lists.newArrayList(getKernelFunction, childCtx.root),
new ArrowType.Int(32, true)),
childCtx.inputSchema)
} else {
(
TreeBuilder.makeFunction(
s"child",
Lists.newArrayList(getKernelFunction),
new ArrowType.Int(32, true)),
ConverterUtils.toArrowSchema(child.output))
}
val outputSchema = ConverterUtils.toArrowSchema(output)
ColumnarCodegenContext(inputSchema, outputSchema, codeGenNode)
}

/****************************/
override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields)

override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields)
Expand Down
Loading