From 5d4df86c9ae2fd8504879c49e4e0ce3a617c0774 Mon Sep 17 00:00:00 2001 From: Chendi Xue Date: Sat, 9 Jan 2021 14:53:21 +0800 Subject: [PATCH] Support any operator as smj build side in wscg and divide two smjwscg if they connect in build side Signed-off-by: Chendi Xue --- .../ColumnarBasicPhysicalOperators.scala | 2 +- .../ColumnarBroadcastHashJoinExec.scala | 13 +-- .../ColumnarShuffledHashJoinExec.scala | 12 +-- .../oap/execution/ColumnarSortExec.scala | 6 +- .../execution/ColumnarSortMergeJoinExec.scala | 26 +++-- .../ColumnarWholeStageCodegenExec.scala | 97 ++++++++++++++----- .../intel/oap/expression/ColumnarSorter.scala | 11 +-- .../ColumnarCollapseCodegenStages.scala | 20 ++-- .../ext/conditioned_merge_join_kernel.cc | 16 ++- .../codegen/arrow_compute/ext/kernels_ext.cc | 6 +- 10 files changed, 135 insertions(+), 74 deletions(-) diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala index 5f4266284..535cb9986 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala @@ -100,7 +100,7 @@ case class ColumnarConditionProjectExec( Seq(child.executeColumnar()) } - override def getBuildPlans: Seq[SparkPlan] = child match { + override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = child match { case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true => c.getBuildPlans case _ => diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala index 0efd4cbab..29a1a597b 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala @@ -112,12 +112,12 @@ case class ColumnarBroadcastHashJoinExec( case _ => Seq(streamedPlan.executeColumnar()) } - override def getBuildPlans: Seq[SparkPlan] = streamedPlan match { + override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = streamedPlan match { case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true => val childPlans = c.getBuildPlans - childPlans :+ this + childPlans :+ (this, null) case _ => - Seq(this) + Seq((this, null)) } override def getStreamedLeafPlan: SparkPlan = streamedPlan match { @@ -146,13 +146,14 @@ case class ColumnarBroadcastHashJoinExec( override def supportColumnarCodegen: Boolean = true + val output_skip_alias = + if (projectList == null || projectList.isEmpty) super.output + else projectList.map(expr => ConverterUtils.getAttrFromExpr(expr, true)) + def getKernelFunction: TreeNode = { val buildInputAttributes: List[Attribute] = buildPlan.output.toList val streamInputAttributes: List[Attribute] = streamedPlan.output.toList - val output_skip_alias = - if (projectList == null || projectList.isEmpty) super.output - else projectList.map(expr => ConverterUtils.getAttrFromExpr(expr, true)) ColumnarConditionedProbeJoin.prepareKernelFunction( buildKeyExprs, streamedKeyExprs, diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala index e74b9023b..0d734740d 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala @@ -118,12 +118,12 @@ case class ColumnarShuffledHashJoinExec( case _ => Seq(streamedPlan.executeColumnar()) } - override def getBuildPlans: Seq[SparkPlan] = streamedPlan match { + override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = streamedPlan match { case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true => val childPlans = c.getBuildPlans - childPlans :+ this + childPlans :+ (this, null) case _ => - Seq(this) + Seq((this, null)) } override def getStreamedLeafPlan: SparkPlan = streamedPlan match { @@ -145,6 +145,9 @@ case class ColumnarShuffledHashJoinExec( override def supportColumnarCodegen: Boolean = true + val output_skip_alias = + if (projectList == null || projectList.isEmpty) super.output + else projectList.map(expr => ConverterUtils.getAttrFromExpr(expr, true)) def getKernelFunction(_type: Int = 0): TreeNode = { val buildInputAttributes = buildPlan.output.toList @@ -153,9 +156,6 @@ case class ColumnarShuffledHashJoinExec( // 1. create buildHashRelation RDD ? // 2. create streamCodeGen and return - val output_skip_alias = - if (projectList == null || projectList.isEmpty) super.output - else projectList.map(expr => ConverterUtils.getAttrFromExpr(expr, true)) ColumnarConditionedProbeJoin.prepareKernelFunction( buildKeyExprs, streamedKeyExprs, diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala index 0cd59f483..3964e6d06 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala @@ -96,12 +96,12 @@ case class ColumnarSortExec( override def supportColumnarCodegen: Boolean = true - override def getBuildPlans: Seq[SparkPlan] = child match { + override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = child match { case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true => val childPlans = c.getBuildPlans - childPlans :+ this + childPlans :+ (this, null) case _ => - Seq(this) + Seq((this, null)) } override def getStreamedLeafPlan: SparkPlan = child match { diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala index ddaffb742..385c7e688 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala @@ -214,7 +214,14 @@ case class ColumnarSortMergeJoinExec( case LeftExistence(_) => (rightKeys, leftKeys, right, left) case _ => - (leftKeys, rightKeys, left, right) + left match { + case p: ColumnarSortMergeJoinExec => + (rightKeys, leftKeys, right, left) + case ColumnarConditionProjectExec(_, _, child: ColumnarSortMergeJoinExec) => + (rightKeys, leftKeys, right, left) + case other => + (leftKeys, rightKeys, left, right) + } } /***************** WSCG related function ******************/ @@ -242,17 +249,21 @@ case class ColumnarSortMergeJoinExec( condition) } - override def getBuildPlans: Seq[SparkPlan] = { + override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = { - val curBuildPlan = buildPlan match { - case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true => + val curBuildPlan: Seq[(SparkPlan, SparkPlan)] = buildPlan match { + case s: ColumnarSortExec => + Seq((s, this)) + case c: ColumnarCodegenSupport + if !c.isInstanceOf[ColumnarSortExec] && c.supportColumnarCodegen == true => c.getBuildPlans - case _ => - Seq() + case other => + /* should be ColumnarInputAdapter or others */ + Seq((other, this)) } streamedPlan match { case c: ColumnarCodegenSupport if c.isInstanceOf[ColumnarSortExec] => - curBuildPlan ++ c.getBuildPlans + curBuildPlan ++ Seq((c, this)) case c: ColumnarCodegenSupport if !c.isInstanceOf[ColumnarSortExec] => c.getBuildPlans ++ curBuildPlan case _ => @@ -303,7 +314,6 @@ case class ColumnarSortMergeJoinExec( } val triggerBuildSignature = getCodeGenSignature - /***********************************************************/ def getCodeGenSignature: String = if (resultSchema.size > 0 && !leftKeys diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala index 55992aba9..b73b27fc9 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala @@ -61,7 +61,7 @@ trait ColumnarCodegenSupport extends SparkPlan { */ def inputRDDs: Seq[RDD[ColumnarBatch]] - def getBuildPlans: Seq[SparkPlan] + def getBuildPlans: Seq[(SparkPlan, SparkPlan)] def getStreamedLeafPlan: SparkPlan @@ -145,7 +145,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I ColumnarCodegenContext(childCtx.inputSchema, childCtx.outputSchema, wholeStageCodeGenNode) } - override def getBuildPlans: Seq[SparkPlan] = { + override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = { child.asInstanceOf[ColumnarCodegenSupport].getBuildPlans } @@ -177,6 +177,45 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I metricsUpdated = true } + def prepareRelationFunction( + keyAttributes: Seq[Attribute], + outputAttributes: Seq[Attribute]): TreeNode = { + val outputFieldList: List[Field] = outputAttributes.toList.map(attr => { + Field + .nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType)) + }) + + val keyFieldList: List[Field] = keyAttributes.toList.map(attr => { + val field = Field + .nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType)) + if (outputFieldList.indexOf(field) == -1) { + throw new UnsupportedOperationException( + s"CashedRelation not found ${attr.name}#${attr.exprId.id} in ${outputAttributes}") + } + field + }); + + val key_args_node = TreeBuilder.makeFunction( + "key_field", + keyFieldList + .map(field => { + TreeBuilder.makeField(field) + }) + .asJava, + new ArrowType.Int(32, true) /*dummy ret type, won't be used*/ ) + + val cachedRelationFuncName = "CachedRelation" + val cached_relation_func = TreeBuilder.makeFunction( + cachedRelationFuncName, + Lists.newArrayList(key_args_node), + new ArrowType.Int(32, true) /*dummy ret type, won't be used*/ ) + + TreeBuilder.makeFunction( + "standalone", + Lists.newArrayList(cached_relation_func), + new ArrowType.Int(32, true)) + } + /** * Return built cpp library's signature */ @@ -240,7 +279,8 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I var idx = 0 var curRDD = inputRDDs()(0) while (idx < buildPlans.length) { - val curPlan = buildPlans(idx).asInstanceOf[ColumnarCodegenSupport] + val curPlan = buildPlans(idx)._1 + val parentPlan = buildPlans(idx)._2 curRDD = curPlan match { case p: ColumnarBroadcastHashJoinExec => @@ -261,7 +301,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I serializableObjectHolder += hashRelationObject val depIter = new CloseableColumnBatchIterator(relation.getColumnarBatchAsIter) - val ctx = curPlan.dependentPlanCtx + val ctx = curPlan.asInstanceOf[ColumnarCodegenSupport].dependentPlanCtx val expression = TreeBuilder.makeExpression( ctx.root, @@ -294,7 +334,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I val buildPlan = p.getBuildPlan curRDD.zipPartitions(buildPlan.executeColumnar()) { (iter, depIter) => ExecutorManager.tryTaskSet(numaBindingInfo) - val ctx = curPlan.dependentPlanCtx + val ctx = curPlan.asInstanceOf[ColumnarCodegenSupport].dependentPlanCtx val expression = TreeBuilder.makeExpression( ctx.root, @@ -319,22 +359,41 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I dependentKernelIterators += hashRelationKernel.finishByIterator() iter } - case p: ColumnarSortExec => - curRDD.zipPartitions(p.executeColumnar()) { (iter, depIter) => + case other => + /* we should cache result from this operator */ + curRDD.zipPartitions(other.executeColumnar()) { (iter, depIter) => ExecutorManager.tryTaskSet(numaBindingInfo) - // Chendi: We have an assumption here - // when curPlan is ColumnarSortExec, - // curRDD should be the other ColumnarSortExec - val ctx = curPlan.dependentPlanCtx + val curOutput = other match { + case p: ColumnarSortMergeJoinExec => p.output_skip_alias + case p: ColumnarBroadcastHashJoinExec => p.output_skip_alias + case p: ColumnarShuffledHashJoinExec => p.output_skip_alias + case p => p.output + } + val inputSchema = ConverterUtils.toArrowSchema(curOutput) + val outputSchema = ConverterUtils.toArrowSchema(curOutput) + if (!parentPlan.isInstanceOf[ColumnarSortMergeJoinExec]) { + if (parentPlan == null) + throw new UnsupportedOperationException( + s"Only support use ${other.getClass} as buildPlan in ColumnarSortMergeJoin, while this parent Plan is null") + else + throw new UnsupportedOperationException( + s"Only support use ${other.getClass} as buildPlan in ColumnarSortMergeJoin, while this parent Plan is ${parentPlan.getClass}") + } + val parent = parentPlan.asInstanceOf[ColumnarSortMergeJoinExec] + val keyAttributes = + if (other.equals(parent.buildPlan)) + parent.buildKeys.map(ConverterUtils.getAttrFromExpr(_)) + else parent.streamedKeys.map(ConverterUtils.getAttrFromExpr(_)) + val cachedFunction = prepareRelationFunction(keyAttributes, curOutput) val expression = TreeBuilder.makeExpression( - ctx.root, + cachedFunction, Field.nullable("result", new ArrowType.Int(32, true))) val cachedRelationKernel = new ExpressionEvaluator() cachedRelationKernel.build( - ctx.inputSchema, + inputSchema, Lists.newArrayList(expression), - ctx.outputSchema, + outputSchema, true) while (depIter.hasNext) { val dep_cb = depIter.next() @@ -353,8 +412,6 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I build_elapse += System.nanoTime() - beforeEval iter } - case _ => - throw new UnsupportedOperationException } idx += 1 @@ -413,9 +470,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I val output = ConverterUtils.fromArrowRecordBatch(resCtx.outputSchema, output_rb) ConverterUtils.releaseArrowRecordBatch(output_rb) eval_elapse += System.nanoTime() - beforeEval - new ColumnarBatch( - output.map(v => v.asInstanceOf[ColumnVector]).toArray, - outputNumRows) + new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]), outputNumRows) } } @@ -452,9 +507,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I val output = ConverterUtils.fromArrowRecordBatch(resCtx.outputSchema, output_rb) ConverterUtils.releaseArrowRecordBatch(output_rb) eval_elapse += System.nanoTime() - beforeEval - new ColumnarBatch( - output.map(v => v.asInstanceOf[ColumnVector]).toArray, - outputNumRows) + new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]), outputNumRows) } } } diff --git a/core/src/main/scala/com/intel/oap/expression/ColumnarSorter.scala b/core/src/main/scala/com/intel/oap/expression/ColumnarSorter.scala index 0fe9b0389..0b459289a 100644 --- a/core/src/main/scala/com/intel/oap/expression/ColumnarSorter.scala +++ b/core/src/main/scala/com/intel/oap/expression/ColumnarSorter.scala @@ -153,16 +153,7 @@ class ColumnarSorter( sort_elapse += System.nanoTime() - beforeSort total_elapse += System.nanoTime() - beforeSort } - if (has_next) - has_next = sort_iterator.hasNext() - - if (has_next == false) { - //TODO(): should try to close sorter - //close() - } - - return has_next - + sort_iterator.hasNext() } override def next(): ColumnarBatch = { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala index 5a8edf762..106d62ecc 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala @@ -200,9 +200,19 @@ case class ColumnarCollapseCodegenStages( plan match { case p if !supportCodegen(p) => new ColumnarInputAdapter(insertWholeStageCodegen(p)) - /*case j: ColumnarSortMergeJoinExec if !j.streamedPlan.isInstanceOf[ColumnarSortExec] => - // we don't support any ColumnarSortMergeJoin whose child is not ColumnarSort - new ColumnarInputAdapter(insertWholeStageCodegen(j))*/ + case j: ColumnarSortMergeJoinExec + if j.buildPlan.isInstanceOf[ColumnarSortMergeJoinExec] || (j.buildPlan + .isInstanceOf[ColumnarConditionProjectExec] && j.buildPlan + .children(0) + .isInstanceOf[ColumnarSortMergeJoinExec]) => + // we don't support any ColumnarSortMergeJoin whose both children are ColumnarSortMergeJoin + j.withNewChildren(j.children.map(c => { + if (c.equals(j.buildPlan)) { + new ColumnarInputAdapter(insertWholeStageCodegen(c)) + } else { + insertInputAdapter(c) + } + })) case j: ColumnarSortExec => j.withNewChildren( j.children.map(child => new ColumnarInputAdapter(insertWholeStageCodegen(child)))) @@ -226,10 +236,6 @@ case class ColumnarCollapseCodegenStages( case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] => plan.withNewChildren(plan.children.map(insertWholeStageCodegen)) - //case plan: ColumnarCodegenSupport if supportCodegen(plan) => - /*case j: ColumnarSortMergeJoinExec if !j.streamedPlan.isInstanceOf[ColumnarSortExec] => - // we don't support any ColumnarSortMergeJoin whose child is not ColumnarSort - j.withNewChildren(j.children.map(insertWholeStageCodegen))*/ case plan: ColumnarCodegenSupport if supportCodegen(plan) && existsJoins(plan) => ColumnarWholeStageCodegenExec(insertInputAdapter(plan))( codegenStageCounter.incrementAndGet()) diff --git a/cpp/src/codegen/arrow_compute/ext/conditioned_merge_join_kernel.cc b/cpp/src/codegen/arrow_compute/ext/conditioned_merge_join_kernel.cc index f81da748d..c15f75b68 100644 --- a/cpp/src/codegen/arrow_compute/ext/conditioned_merge_join_kernel.cc +++ b/cpp/src/codegen/arrow_compute/ext/conditioned_merge_join_kernel.cc @@ -198,9 +198,13 @@ class ConditionedMergeJoinKernel::Impl { function_define_ss << "inline int JoinCompare_" << relation_id_[0] << "() {" << std::endl; + function_define_ss << "if (!sort_relation_" << relation_id_[0] + << "_->CheckRangeBound(0)) return -1;" << std::endl; function_define_ss << "auto idx_0 = sort_relation_" << relation_id_[0] << "_->GetItemIndexWithShift(0);" << std::endl; if (use_relation_for_stream) { + function_define_ss << "if (!sort_relation_" << relation_id_[1] + << "_->CheckRangeBound(0)) return 1;" << std::endl; function_define_ss << "auto idx_1 = sort_relation_" << relation_id_[1] << "_->GetItemIndexWithShift(0);" << std::endl; } @@ -397,11 +401,9 @@ class ConditionedMergeJoinKernel::Impl { codes_ss << "auto " << function_name << "_res = " << function_name << "();" << std::endl; codes_ss << "while (" << function_name << "_res < 0) {" << std::endl; - codes_ss << "if ((should_stop_ = !" << build_relation << "->NextNewKey())) break;" - << std::endl; + codes_ss << "if ((!" << build_relation << "->NextNewKey())) break;" << std::endl; codes_ss << function_name << "_res = " << function_name << "();" << std::endl; codes_ss << "}" << std::endl; - codes_ss << "if (should_stop_) break;" << std::endl; codes_ss << "if (" << function_name << "_res == 0) {" << std::endl; codes_ss << range_name << " = " << build_relation << "->GetSameKeyRange();" << std::endl; @@ -480,11 +482,9 @@ class ConditionedMergeJoinKernel::Impl { codes_ss << "auto " << function_name << "_res = " << function_name << "();" << std::endl; codes_ss << "while (" << function_name << "_res < 0) {" << std::endl; - codes_ss << "if ((should_stop_ = !" << build_relation << "->NextNewKey())) break;" - << std::endl; + codes_ss << "if ((!" << build_relation << "->NextNewKey())) break;" << std::endl; codes_ss << function_name << "_res = " << function_name << "();" << std::endl; codes_ss << "}" << std::endl; - codes_ss << "if (should_stop_) break;" << std::endl; codes_ss << "if (" << function_name << "_res == 0) {" << std::endl; if (cond_check) { codes_ss << found_match_name << " = true;" << std::endl; @@ -670,11 +670,9 @@ class ConditionedMergeJoinKernel::Impl { codes_ss << "auto " << function_name << "_res = " << function_name << "();" << std::endl; codes_ss << "while (" << function_name << "_res < 0) {" << std::endl; - codes_ss << "if ((should_stop_ = !" << build_relation << "->NextNewKey())) break;" - << std::endl; + codes_ss << "if ((!" << build_relation << "->NextNewKey())) break;" << std::endl; codes_ss << function_name << "_res = " << function_name << "();" << std::endl; codes_ss << "}" << std::endl; - codes_ss << "if (should_stop_) break;" << std::endl; codes_ss << "if (" << function_name << "_res != 0) {" << std::endl; codes_ss << found_match_name << " = false;" << std::endl; codes_ss << "} else {" << std::endl; diff --git a/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc b/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc index 0d79ebbad..9dbd6e8c1 100644 --- a/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc +++ b/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc @@ -1462,8 +1462,10 @@ class CachedRelationKernel::Impl { for (auto field : result_schema_->fields()) { std::shared_ptr col_out; RETURN_NOT_OK(MakeRelationColumn(field->type()->id(), &col_out)); - for (auto arr : cached_[idx]) { - RETURN_NOT_OK(col_out->AppendColumn(arr)); + if (cached_.size() == col_num_) { + for (auto arr : cached_[idx]) { + RETURN_NOT_OK(col_out->AppendColumn(arr)); + } } sort_relation_list.push_back(col_out); idx++;