Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Support any operator as smj build side in wscg and divide two smjwscg…
Browse files Browse the repository at this point in the history
… if they connect in build side

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
  • Loading branch information
xuechendi committed Jan 10, 2021
1 parent 567ada1 commit 5d4df86
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ case class ColumnarConditionProjectExec(
Seq(child.executeColumnar())
}

override def getBuildPlans: Seq[SparkPlan] = child match {
override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
c.getBuildPlans
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ case class ColumnarBroadcastHashJoinExec(
case _ =>
Seq(streamedPlan.executeColumnar())
}
override def getBuildPlans: Seq[SparkPlan] = streamedPlan match {
override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = streamedPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
val childPlans = c.getBuildPlans
childPlans :+ this
childPlans :+ (this, null)
case _ =>
Seq(this)
Seq((this, null))
}

override def getStreamedLeafPlan: SparkPlan = streamedPlan match {
Expand Down Expand Up @@ -146,13 +146,14 @@ case class ColumnarBroadcastHashJoinExec(

override def supportColumnarCodegen: Boolean = true

val output_skip_alias =
if (projectList == null || projectList.isEmpty) super.output
else projectList.map(expr => ConverterUtils.getAttrFromExpr(expr, true))

def getKernelFunction: TreeNode = {

val buildInputAttributes: List[Attribute] = buildPlan.output.toList
val streamInputAttributes: List[Attribute] = streamedPlan.output.toList
val output_skip_alias =
if (projectList == null || projectList.isEmpty) super.output
else projectList.map(expr => ConverterUtils.getAttrFromExpr(expr, true))
ColumnarConditionedProbeJoin.prepareKernelFunction(
buildKeyExprs,
streamedKeyExprs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ case class ColumnarShuffledHashJoinExec(
case _ =>
Seq(streamedPlan.executeColumnar())
}
override def getBuildPlans: Seq[SparkPlan] = streamedPlan match {
override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = streamedPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
val childPlans = c.getBuildPlans
childPlans :+ this
childPlans :+ (this, null)
case _ =>
Seq(this)
Seq((this, null))
}

override def getStreamedLeafPlan: SparkPlan = streamedPlan match {
Expand All @@ -145,6 +145,9 @@ case class ColumnarShuffledHashJoinExec(

override def supportColumnarCodegen: Boolean = true

val output_skip_alias =
if (projectList == null || projectList.isEmpty) super.output
else projectList.map(expr => ConverterUtils.getAttrFromExpr(expr, true))
def getKernelFunction(_type: Int = 0): TreeNode = {

val buildInputAttributes = buildPlan.output.toList
Expand All @@ -153,9 +156,6 @@ case class ColumnarShuffledHashJoinExec(
// 1. create buildHashRelation RDD ?
// 2. create streamCodeGen and return

val output_skip_alias =
if (projectList == null || projectList.isEmpty) super.output
else projectList.map(expr => ConverterUtils.getAttrFromExpr(expr, true))
ColumnarConditionedProbeJoin.prepareKernelFunction(
buildKeyExprs,
streamedKeyExprs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ case class ColumnarSortExec(

override def supportColumnarCodegen: Boolean = true

override def getBuildPlans: Seq[SparkPlan] = child match {
override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
val childPlans = c.getBuildPlans
childPlans :+ this
childPlans :+ (this, null)
case _ =>
Seq(this)
Seq((this, null))
}

override def getStreamedLeafPlan: SparkPlan = child match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,14 @@ case class ColumnarSortMergeJoinExec(
case LeftExistence(_) =>
(rightKeys, leftKeys, right, left)
case _ =>
(leftKeys, rightKeys, left, right)
left match {
case p: ColumnarSortMergeJoinExec =>
(rightKeys, leftKeys, right, left)
case ColumnarConditionProjectExec(_, _, child: ColumnarSortMergeJoinExec) =>
(rightKeys, leftKeys, right, left)
case other =>
(leftKeys, rightKeys, left, right)
}
}

/***************** WSCG related function ******************/
Expand Down Expand Up @@ -242,17 +249,21 @@ case class ColumnarSortMergeJoinExec(
condition)
}

override def getBuildPlans: Seq[SparkPlan] = {
override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = {

val curBuildPlan = buildPlan match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
val curBuildPlan: Seq[(SparkPlan, SparkPlan)] = buildPlan match {
case s: ColumnarSortExec =>
Seq((s, this))
case c: ColumnarCodegenSupport
if !c.isInstanceOf[ColumnarSortExec] && c.supportColumnarCodegen == true =>
c.getBuildPlans
case _ =>
Seq()
case other =>
/* should be ColumnarInputAdapter or others */
Seq((other, this))
}
streamedPlan match {
case c: ColumnarCodegenSupport if c.isInstanceOf[ColumnarSortExec] =>
curBuildPlan ++ c.getBuildPlans
curBuildPlan ++ Seq((c, this))
case c: ColumnarCodegenSupport if !c.isInstanceOf[ColumnarSortExec] =>
c.getBuildPlans ++ curBuildPlan
case _ =>
Expand Down Expand Up @@ -303,7 +314,6 @@ case class ColumnarSortMergeJoinExec(
}
val triggerBuildSignature = getCodeGenSignature


/***********************************************************/
def getCodeGenSignature: String =
if (resultSchema.size > 0 && !leftKeys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait ColumnarCodegenSupport extends SparkPlan {
*/
def inputRDDs: Seq[RDD[ColumnarBatch]]

def getBuildPlans: Seq[SparkPlan]
def getBuildPlans: Seq[(SparkPlan, SparkPlan)]

def getStreamedLeafPlan: SparkPlan

Expand Down Expand Up @@ -145,7 +145,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
ColumnarCodegenContext(childCtx.inputSchema, childCtx.outputSchema, wholeStageCodeGenNode)
}

override def getBuildPlans: Seq[SparkPlan] = {
override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = {
child.asInstanceOf[ColumnarCodegenSupport].getBuildPlans
}

Expand Down Expand Up @@ -177,6 +177,45 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
metricsUpdated = true
}

def prepareRelationFunction(
keyAttributes: Seq[Attribute],
outputAttributes: Seq[Attribute]): TreeNode = {
val outputFieldList: List[Field] = outputAttributes.toList.map(attr => {
Field
.nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType))
})

val keyFieldList: List[Field] = keyAttributes.toList.map(attr => {
val field = Field
.nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType))
if (outputFieldList.indexOf(field) == -1) {
throw new UnsupportedOperationException(
s"CashedRelation not found ${attr.name}#${attr.exprId.id} in ${outputAttributes}")
}
field
});

val key_args_node = TreeBuilder.makeFunction(
"key_field",
keyFieldList
.map(field => {
TreeBuilder.makeField(field)
})
.asJava,
new ArrowType.Int(32, true) /*dummy ret type, won't be used*/ )

val cachedRelationFuncName = "CachedRelation"
val cached_relation_func = TreeBuilder.makeFunction(
cachedRelationFuncName,
Lists.newArrayList(key_args_node),
new ArrowType.Int(32, true) /*dummy ret type, won't be used*/ )

TreeBuilder.makeFunction(
"standalone",
Lists.newArrayList(cached_relation_func),
new ArrowType.Int(32, true))
}

/**
* Return built cpp library's signature
*/
Expand Down Expand Up @@ -240,7 +279,8 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
var idx = 0
var curRDD = inputRDDs()(0)
while (idx < buildPlans.length) {
val curPlan = buildPlans(idx).asInstanceOf[ColumnarCodegenSupport]
val curPlan = buildPlans(idx)._1
val parentPlan = buildPlans(idx)._2

curRDD = curPlan match {
case p: ColumnarBroadcastHashJoinExec =>
Expand All @@ -261,7 +301,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
serializableObjectHolder += hashRelationObject
val depIter =
new CloseableColumnBatchIterator(relation.getColumnarBatchAsIter)
val ctx = curPlan.dependentPlanCtx
val ctx = curPlan.asInstanceOf[ColumnarCodegenSupport].dependentPlanCtx
val expression =
TreeBuilder.makeExpression(
ctx.root,
Expand Down Expand Up @@ -294,7 +334,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
val buildPlan = p.getBuildPlan
curRDD.zipPartitions(buildPlan.executeColumnar()) { (iter, depIter) =>
ExecutorManager.tryTaskSet(numaBindingInfo)
val ctx = curPlan.dependentPlanCtx
val ctx = curPlan.asInstanceOf[ColumnarCodegenSupport].dependentPlanCtx
val expression =
TreeBuilder.makeExpression(
ctx.root,
Expand All @@ -319,22 +359,41 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
dependentKernelIterators += hashRelationKernel.finishByIterator()
iter
}
case p: ColumnarSortExec =>
curRDD.zipPartitions(p.executeColumnar()) { (iter, depIter) =>
case other =>
/* we should cache result from this operator */
curRDD.zipPartitions(other.executeColumnar()) { (iter, depIter) =>
ExecutorManager.tryTaskSet(numaBindingInfo)
// Chendi: We have an assumption here
// when curPlan is ColumnarSortExec,
// curRDD should be the other ColumnarSortExec
val ctx = curPlan.dependentPlanCtx
val curOutput = other match {
case p: ColumnarSortMergeJoinExec => p.output_skip_alias
case p: ColumnarBroadcastHashJoinExec => p.output_skip_alias
case p: ColumnarShuffledHashJoinExec => p.output_skip_alias
case p => p.output
}
val inputSchema = ConverterUtils.toArrowSchema(curOutput)
val outputSchema = ConverterUtils.toArrowSchema(curOutput)
if (!parentPlan.isInstanceOf[ColumnarSortMergeJoinExec]) {
if (parentPlan == null)
throw new UnsupportedOperationException(
s"Only support use ${other.getClass} as buildPlan in ColumnarSortMergeJoin, while this parent Plan is null")
else
throw new UnsupportedOperationException(
s"Only support use ${other.getClass} as buildPlan in ColumnarSortMergeJoin, while this parent Plan is ${parentPlan.getClass}")
}
val parent = parentPlan.asInstanceOf[ColumnarSortMergeJoinExec]
val keyAttributes =
if (other.equals(parent.buildPlan))
parent.buildKeys.map(ConverterUtils.getAttrFromExpr(_))
else parent.streamedKeys.map(ConverterUtils.getAttrFromExpr(_))
val cachedFunction = prepareRelationFunction(keyAttributes, curOutput)
val expression =
TreeBuilder.makeExpression(
ctx.root,
cachedFunction,
Field.nullable("result", new ArrowType.Int(32, true)))
val cachedRelationKernel = new ExpressionEvaluator()
cachedRelationKernel.build(
ctx.inputSchema,
inputSchema,
Lists.newArrayList(expression),
ctx.outputSchema,
outputSchema,
true)
while (depIter.hasNext) {
val dep_cb = depIter.next()
Expand All @@ -353,8 +412,6 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
build_elapse += System.nanoTime() - beforeEval
iter
}
case _ =>
throw new UnsupportedOperationException
}

idx += 1
Expand Down Expand Up @@ -413,9 +470,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
val output = ConverterUtils.fromArrowRecordBatch(resCtx.outputSchema, output_rb)
ConverterUtils.releaseArrowRecordBatch(output_rb)
eval_elapse += System.nanoTime() - beforeEval
new ColumnarBatch(
output.map(v => v.asInstanceOf[ColumnVector]).toArray,
outputNumRows)
new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]), outputNumRows)
}
}

Expand Down Expand Up @@ -452,9 +507,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
val output = ConverterUtils.fromArrowRecordBatch(resCtx.outputSchema, output_rb)
ConverterUtils.releaseArrowRecordBatch(output_rb)
eval_elapse += System.nanoTime() - beforeEval
new ColumnarBatch(
output.map(v => v.asInstanceOf[ColumnVector]).toArray,
outputNumRows)
new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]), outputNumRows)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,7 @@ class ColumnarSorter(
sort_elapse += System.nanoTime() - beforeSort
total_elapse += System.nanoTime() - beforeSort
}
if (has_next)
has_next = sort_iterator.hasNext()

if (has_next == false) {
//TODO(): should try to close sorter
//close()
}

return has_next

sort_iterator.hasNext()
}

override def next(): ColumnarBatch = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,19 @@ case class ColumnarCollapseCodegenStages(
plan match {
case p if !supportCodegen(p) =>
new ColumnarInputAdapter(insertWholeStageCodegen(p))
/*case j: ColumnarSortMergeJoinExec if !j.streamedPlan.isInstanceOf[ColumnarSortExec] =>
// we don't support any ColumnarSortMergeJoin whose child is not ColumnarSort
new ColumnarInputAdapter(insertWholeStageCodegen(j))*/
case j: ColumnarSortMergeJoinExec
if j.buildPlan.isInstanceOf[ColumnarSortMergeJoinExec] || (j.buildPlan
.isInstanceOf[ColumnarConditionProjectExec] && j.buildPlan
.children(0)
.isInstanceOf[ColumnarSortMergeJoinExec]) =>
// we don't support any ColumnarSortMergeJoin whose both children are ColumnarSortMergeJoin
j.withNewChildren(j.children.map(c => {
if (c.equals(j.buildPlan)) {
new ColumnarInputAdapter(insertWholeStageCodegen(c))
} else {
insertInputAdapter(c)
}
}))
case j: ColumnarSortExec =>
j.withNewChildren(
j.children.map(child => new ColumnarInputAdapter(insertWholeStageCodegen(child))))
Expand All @@ -226,10 +236,6 @@ case class ColumnarCollapseCodegenStages(
case plan
if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
//case plan: ColumnarCodegenSupport if supportCodegen(plan) =>
/*case j: ColumnarSortMergeJoinExec if !j.streamedPlan.isInstanceOf[ColumnarSortExec] =>
// we don't support any ColumnarSortMergeJoin whose child is not ColumnarSort
j.withNewChildren(j.children.map(insertWholeStageCodegen))*/
case plan: ColumnarCodegenSupport if supportCodegen(plan) && existsJoins(plan) =>
ColumnarWholeStageCodegenExec(insertInputAdapter(plan))(
codegenStageCounter.incrementAndGet())
Expand Down
Loading

0 comments on commit 5d4df86

Please sign in to comment.