From 1de46390ce8ae169da0d6d2a0c74e18c29e84a7d Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 7 Mar 2022 22:03:56 +0800 Subject: [PATCH 1/2] Improve codegen check for expression This patch addes more codegen check on exprs. The parent operator will be marked support codegen only all its expressions support codegen. Signed-off-by: Yuan Zhou --- .../ColumnarBasicPhysicalOperators.scala | 22 ++++++++++++++++--- .../execution/ColumnarHashAggregateExec.scala | 15 +++++++++++-- .../oap/expression/ColumnarExpression.scala | 5 +++++ .../expression/ColumnarUnaryOperator.scala | 4 ++++ .../ColumnarCollapseCodegenStages.scala | 2 +- .../ext/expression_codegen_visitor.cc | 3 ++- .../ext/whole_stage_codegen_kernel.cc | 3 ++- 7 files changed, 46 insertions(+), 8 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala index 0a7243801..a7683d827 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala @@ -164,9 +164,25 @@ case class ColumnarConditionProjectExec( override def getChild: SparkPlan = child - override def supportColumnarCodegen: Boolean = true - - // override def canEqual(that: Any): Boolean = false + override def supportColumnarCodegen: Boolean = { + if (condition != null) { + val colCondExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(condition) + // support codegen if cond expression and proj expression both supports codegen + if (!colCondExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(Lists.newArrayList())) { + return false + } else { + if (projectList != null) { + for (expr <- projectList) { + val colExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + if (!colExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(Lists.newArrayList())) { + return false + } + } + } + } + } + true + } def getKernelFunction(childTreeNode: TreeNode): TreeNode = { val (filterNode, projectNode) = diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala index 29f629fa1..9c44ff130 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala @@ -637,9 +637,20 @@ case class ColumnarHashAggregateExec( override def getChild: SparkPlan = child - override def supportColumnarCodegen: Boolean = true + override def supportColumnarCodegen: Boolean = { + for (expr <- aggregateExpressions) { + val internalExpressionList = expr.aggregateFunction.children + for (expr <- internalExpressionList) { + logInfo(s"AA: $expr") + val colExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + if (!colExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(Lists.newArrayList())) { + return false + } + } - // override def canEqual(that: Any): Boolean = false + } + return true + } def getKernelFunction: TreeNode = { ColumnarHashAggregation.prepareKernelFunction( diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpression.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpression.scala index 97f919a89..967271768 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpression.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpression.scala @@ -27,6 +27,11 @@ import scala.collection.mutable.ListBuffer trait ColumnarExpression { + def supportColumnarCodegen(args: java.lang.Object): (Boolean) = { + // TODO: disable all codegen unless manuall enabled + true + } + def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = { throw new UnsupportedOperationException(s"Not support doColumnarCodeGen.") } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala index 91b71185d..5bd893c9d 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarUnaryOperator.scala @@ -319,6 +319,10 @@ class ColumnarFloor(child: Expression, original: Expression) TreeBuilder.makeFunction("floor", Lists.newArrayList(child_node), resultType) (funcNode, resultType) } + + override def supportColumnarCodegen(args: java.lang.Object): Boolean = { + false && child.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) + } } class ColumnarCeil(child: Expression, original: Expression) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala index 07e69f3d9..1af0211d1 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala @@ -276,7 +276,7 @@ case class ColumnarCollapseCodegenStages( if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] => plan.withNewChildren(plan.children.map(insertWholeStageCodegen)) case j: ColumnarHashAggregateExec => - if (!j.child.isInstanceOf[ColumnarHashAggregateExec] && existsJoins(j)) { + if (j.supportColumnarCodegen && !j.child.isInstanceOf[ColumnarHashAggregateExec] && existsJoins(j)) { ColumnarWholeStageCodegenExec(j.withNewChildren(j.children.map(insertInputAdapter)))( codegenStageCounter.incrementAndGet()) } else { diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc index 71162d234..b60aaa624 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/expression_codegen_visitor.cc @@ -1149,7 +1149,8 @@ arrow::Status ExpressionCodegenVisitor::Visit(const gandiva::FunctionNode& node) check_str_ = validity; } else { std::cout << "function name: " << func_name << std::endl; - return arrow::Status::NotImplemented(func_name, " is currently not supported."); + return arrow::Status::NotImplemented(func_name, + " is currently not supported in WSCG."); } return arrow::Status::OK(); } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc index 0e0f6a8a7..1a5e27759 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc @@ -217,7 +217,8 @@ class WholeStageCodeGenKernel::Impl { result_field_node_list, result_expr_node_list, out)); } else { - return arrow::Status::NotImplemented("Not supported function name:", func_name); + return arrow::Status::NotImplemented("WSCG Not supported function name:", + func_name); } return arrow::Status::OK(); } From 69e4f09923b472bc6fc31d68b5f5adf83eed750e Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Tue, 8 Mar 2022 06:00:10 +0800 Subject: [PATCH 2/2] adding more checks Signed-off-by: Yuan Zhou --- .../ColumnarBasicPhysicalOperators.scala | 15 +++++++-------- .../oap/execution/ColumnarHashAggregateExec.scala | 1 - .../intel/oap/expression/ColumnarArithmetic.scala | 4 ++++ .../oap/expression/ColumnarBinaryOperator.scala | 4 ++++ .../oap/expression/ColumnarCaseWhenOperator.scala | 5 +++++ .../expression/ColumnarDateTimeExpressions.scala | 4 ++++ .../intel/oap/expression/ColumnarIfOperator.scala | 8 ++++++++ .../oap/expression/ColumnarNamedExpressions.scala | 4 ++++ 8 files changed, 36 insertions(+), 9 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala index a7683d827..79e22175c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala @@ -170,14 +170,13 @@ case class ColumnarConditionProjectExec( // support codegen if cond expression and proj expression both supports codegen if (!colCondExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(Lists.newArrayList())) { return false - } else { - if (projectList != null) { - for (expr <- projectList) { - val colExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr) - if (!colExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(Lists.newArrayList())) { - return false - } - } + } + } + if (projectList != null) { + for (expr <- projectList) { + val colExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr) + if (!colExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(Lists.newArrayList())) { + return false } } } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala index 9c44ff130..bb8965e8c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala @@ -641,7 +641,6 @@ case class ColumnarHashAggregateExec( for (expr <- aggregateExpressions) { val internalExpressionList = expr.aggregateFunction.children for (expr <- internalExpressionList) { - logInfo(s"AA: $expr") val colExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr) if (!colExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(Lists.newArrayList())) { return false diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarArithmetic.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarArithmetic.scala index db8b4c2f1..bd412145c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarArithmetic.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarArithmetic.scala @@ -246,6 +246,10 @@ class ColumnarMultiply(left: Expression, right: Expression, original: Expression (funcNode, resultType) } } + + override def supportColumnarCodegen(args: java.lang.Object): Boolean = { + return left_val.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && right_val.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) + } } class ColumnarDivide(left: Expression, right: Expression, diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryOperator.scala index d1d88c63b..307df5282 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryOperator.scala @@ -150,6 +150,10 @@ class ColumnarContains(left: Expression, right: Expression, original: Expression TreeBuilder.makeFunction("is_substr", Lists.newArrayList(left_node, right_node), resultType) (funcNode, resultType) } + + override def supportColumnarCodegen(args: Object): Boolean = { + false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) + } } class ColumnarEqualTo(left: Expression, right: Expression, original: Expression) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarCaseWhenOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarCaseWhenOperator.scala index bbe6cfac8..844f2c149 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarCaseWhenOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarCaseWhenOperator.scala @@ -57,6 +57,11 @@ class ColumnarCaseWhen( }) } + override def supportColumnarCodegen(args: java.lang.Object): Boolean = { + val exprs = branches.flatMap(b => b._1 :: b._2 :: Nil) ++ elseValue + val exprList = { exprs.filter(expr => !expr.isInstanceOf[Literal]) } + !exprList.map(expr => expr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(Lists.newArrayList())).exists(_ == false) + } override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = { logInfo(s"children: ${branches.flatMap(b => b._1 :: b._2 :: Nil) ++ elseValue}") logInfo(s"branches: $branches") diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index 4f4884654..2b631ab9a 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -671,6 +671,10 @@ object ColumnarDateTimeExpressions { "date_diff", Lists.newArrayList(leftNode, rightNode), outType) (funcNode, outType) } + + override def supportColumnarCodegen(args: Object): Boolean = { + false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) + } } class ColumnarMakeDate( diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarIfOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarIfOperator.scala index 7fbf22772..828b1a914 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarIfOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarIfOperator.scala @@ -60,6 +60,14 @@ class ColumnarIf(predicate: Expression, trueValue: Expression, val funcNode = TreeBuilder.makeIf(predicate_node, true_node, false_node, trueType) (funcNode, trueType) } + + override def supportColumnarCodegen(args: java.lang.Object): Boolean = { + // return true only when all branches are true + val ret = (predicate.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && + trueValue.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && + falseValue.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args)) + return ret + } } object ColumnarIfOperator { diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarNamedExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarNamedExpressions.scala index c1d164a75..0032d59b2 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarNamedExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarNamedExpressions.scala @@ -39,6 +39,10 @@ class ColumnarAlias(child: Expression, name: String)( child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) } + override def supportColumnarCodegen(args: Object): Boolean = { + child.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) + } + } class ColumnarAttributeReference(