Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-145] Support decimal in columnar window #151

Merged
merged 6 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,12 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.isSkewJoin)
case plan: WindowExec =>
if (!enableColumnarWindow) return false
new ColumnarWindowExec(
val window = ColumnarWindowExec.create(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
plan.child)
window
case p =>
p
}
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/com/intel/oap/ColumnarPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
new ColumnarBatchScanExec(plan.output, plan.scan)
}
case plan: ProjectExec =>
val columnarPlan = replaceWithColumnarPlan(plan.child)
val columnarChild = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
if (columnarPlan.isInstanceOf[ColumnarConditionProjectExec]) {
val cur_plan = columnarPlan.asInstanceOf[ColumnarConditionProjectExec]
ColumnarConditionProjectExec(cur_plan.condition, plan.projectList, cur_plan.child)
} else {
ColumnarConditionProjectExec(null, plan.projectList, columnarPlan)
columnarChild match {
case ch: ColumnarConditionProjectExec =>
if (ch.projectList == null) {
ColumnarConditionProjectExec(ch.condition, plan.projectList, ch.child)
} else {
ColumnarConditionProjectExec(null, plan.projectList, columnarChild)
}
case _ =>
ColumnarConditionProjectExec(null, plan.projectList, columnarChild)
}
case plan: FilterExec =>
val child = replaceWithColumnarPlan(plan.child)
Expand Down Expand Up @@ -234,11 +238,12 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
}
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
try {
return new ColumnarWindowExec(
val window = ColumnarWindowExec.create(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
coalesceBatchRemoved)
return window
} catch {
case _: Throwable =>
logInfo("Columnar Window: Falling back to regular Window...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ import org.apache.arrow.gandiva.expression.TreeBuilder
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, NamedExpression, Rank, SortOrder, WindowExpression, WindowFunction}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, Sum}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, MakeDecimal, NamedExpression, Rank, SortOrder, UnscaledValue, WindowExpression, WindowFunction}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Average, Sum}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.{ArrayType, DecimalType}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ExecutorManager

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.util.Random

class ColumnarWindowExec(windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
Expand Down Expand Up @@ -248,3 +249,67 @@ class ColumnarWindowExec(windowExpression: Seq[NamedExpression],
override def isComplex: Boolean = false
}
}

object ColumnarWindowExec {

def createWithProjection(
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan): SparkPlan = {

def makeInputProject(ex: Expression, inputProjects: ListBuffer[NamedExpression]): Expression = {
ex match {
case ae: AggregateExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects)))
case ae: WindowExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects)))
case func @ (_: AggregateFunction | _:WindowFunction) =>
val params = func.children
func.withNewChildren(params.map {
case param @ (_: Cast | _: UnscaledValue) =>
val aliasName = "__alias_%d__".format(Random.nextLong())
val alias = Alias(param, aliasName)()
inputProjects.append(alias)
alias.toAttribute
case other => other
})
case other => other
}
}

def makeOutputProject(ex: Expression, windows: ListBuffer[NamedExpression], inputProjects: ListBuffer[NamedExpression]): Expression = {
ex match {
case we: WindowExpression =>
val aliasName = "__alias_%d__".format(Random.nextLong())
val alias = Alias(makeInputProject(we, inputProjects), aliasName)()
windows.append(alias)
alias.toAttribute
case _ =>
ex.withNewChildren(ex.children.map(makeOutputProject(_, windows, inputProjects)))
}
}

val windows = ListBuffer[NamedExpression]()
val inProjectExpressions = ListBuffer[NamedExpression]()
val outProjectExpressions = windowExpression.map(e => e.asInstanceOf[Alias])
.map { a =>
a.withNewChildren(List(makeOutputProject(a.child, windows, inProjectExpressions)))
.asInstanceOf[NamedExpression]
}

val inputProject = ColumnarConditionProjectExec(null, child.output ++ inProjectExpressions, child)

val window = new ColumnarWindowExec(windows, partitionSpec, orderSpec, inputProject)

val outputProject = ColumnarConditionProjectExec(null, child.output ++ outProjectExpressions, window)

outputProject
}

def create(
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan): SparkPlan = {
createWithProjection(windowExpression, partitionSpec, orderSpec, child)
}
}
28 changes: 28 additions & 0 deletions core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,34 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
test("window query") {
runner.runTPCQuery("q67", 1, true)
}

test("window function with non-decimal input") {
val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_category_id)" +
" OVER (PARTITION BY i_class_id) FROM item LIMIT 1000")
df.explain()
df.show()
}

test("window function with decimal input") {
val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_current_price)" +
" OVER (PARTITION BY i_class_id) FROM item LIMIT 1000")
df.explain()
df.show()
}

test("window function with decimal input 2") {
val df = spark.sql("SELECT i_item_sk, i_class_id, RANK()" +
" OVER (PARTITION BY i_class_id ORDER BY i_current_price) FROM item LIMIT 1000")
df.explain()
df.show()
}

test("window function with decimal input 3") {
val df = spark.sql("SELECT i_item_sk, i_class_id, AVG(i_current_price)" +
" OVER (PARTITION BY i_class_id) FROM item LIMIT 1000")
df.explain()
df.show()
}
}

object TPCDSSuite {
Expand Down
Loading