From b6487d73e70a0ad43213b65430e373fbc5b00200 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Tue, 30 Nov 2021 17:51:29 +0800 Subject: [PATCH] [NSE-581] Add trim, left trim, right trim support in expression (#586) * Initial commit * Add expression support: trim, regexp replace * Remove regexp replace support * Correct the code * Fix out of range issue for a seq * Judge whether contains subquery --- .../ColumnarExpressionConverter.scala | 12 ++ .../ColumnarString2TrimOperator.scala | 135 ++++++++++++++++++ .../expression/ColumnarTernaryOperator.scala | 44 +++++- 3 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarString2TrimOperator.scala diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala index 7121a8bec..cf169e066 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarExpressionConverter.scala @@ -153,6 +153,16 @@ object ColumnarExpressionConverter extends Logging { attributeSeq, convertBoundRefToAttrRef = convertBoundRefToAttrRef), expr) + case st: String2TrimExpression => + check_if_no_calculation = false + logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.") + val exps = st.children.map { expr => + replaceWithColumnarExpression( + expr, + attributeSeq, + convertBoundRefToAttrRef = convertBoundRefToAttrRef) + } + ColumnarString2TrimOperator.create(exps, expr) case i: If => check_if_no_calculation = false logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.") @@ -351,6 +361,8 @@ object ColumnarExpressionConverter extends Logging { c.children.map(containsSubquery).exists(_ == true) case b: BinaryExpression => containsSubquery(b.left) || containsSubquery(b.right) + case s: String2TrimExpression => + s.children.map(containsSubquery).exists(_ == true) case expr => throw new UnsupportedOperationException( s" --> ${expr.getClass} | ${expr} is not currently supported.") diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarString2TrimOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarString2TrimOperator.scala new file mode 100644 index 000000000..5e2c18c20 --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarString2TrimOperator.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.expression + +import com.google.common.collect.Lists +import org.apache.arrow.gandiva.expression.{TreeBuilder, TreeNode} +import org.apache.arrow.vector.types.pojo.ArrowType + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Expression, StringTrim, StringTrimLeft, StringTrimRight} +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils +import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.util.ArrowUtils + +// StringTrim +class ColumnarStringTrim(srcStr: Expression, trimStr: Option[Expression], original: Expression) + extends StringTrim(srcStr: Expression, trimStr: Option[Expression]) + with ColumnarExpression + with Logging { + + buildCheck() + + def buildCheck(): Unit = { + val supportedTypes = List( + StringType + ) + // It is not supported to specify trimStr. By default, space is trimmed. + if (supportedTypes.indexOf(srcStr.dataType) == -1 || !trimStr.isEmpty) { + throw new UnsupportedOperationException( + s"${srcStr.dataType} is not supported in ColumnarStringTrim.") + } + } + + override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = { + val (srcStr_node, srcStrType): (TreeNode, ArrowType) = + srcStr.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + + val resultType = ArrowUtils.toArrowType(StringType, + SparkSchemaUtils.getLocalTimezoneID()) + val funcNode = + TreeBuilder.makeFunction("btrim", Lists.newArrayList(srcStr_node), resultType) + (funcNode, resultType) + } +} + +// StringTrimLeft +class ColumnarStringTrimLeft(srcStr: Expression, trimStr: Option[Expression], original: Expression) + extends StringTrimLeft(srcStr: Expression, trimStr: Option[Expression]) + with ColumnarExpression + with Logging { + + buildCheck() + + def buildCheck(): Unit = { + val supportedTypes = List( + StringType + ) + if (supportedTypes.indexOf(srcStr.dataType) == -1 || !trimStr.isEmpty) { + throw new UnsupportedOperationException( + s"${srcStr.dataType} is not supported in ColumnarStringTrimLeft.") + } + } + + override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = { + val (srcStr_node, srcStrType): (TreeNode, ArrowType) = + srcStr.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + + val resultType = ArrowUtils.toArrowType(StringType, + SparkSchemaUtils.getLocalTimezoneID()) + val funcNode = + TreeBuilder.makeFunction("ltrim", Lists.newArrayList(srcStr_node), resultType) + (funcNode, resultType) + } +} + +// StringTrimRight +class ColumnarStringTrimRight(child: Expression, trimStr: Option[Expression], original: Expression) + extends StringTrimRight(child: Expression, trimStr: Option[Expression]) + with ColumnarExpression + with Logging { + + buildCheck() + + def buildCheck(): Unit = { + val supportedTypes = List( + StringType + ) + if (supportedTypes.indexOf(child.dataType) == -1 || !trimStr.isEmpty) { + throw new UnsupportedOperationException( + s"${child.dataType} is not supported in ColumnarStringTrimRight.") + } + } + + override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = { + val (child_node, childType): (TreeNode, ArrowType) = + child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + + val resultType = ArrowUtils.toArrowType(StringType, + SparkSchemaUtils.getLocalTimezoneID()) + val funcNode = + TreeBuilder.makeFunction("rtrim", Lists.newArrayList(child_node), resultType) + (funcNode, resultType) + } +} + +object ColumnarString2TrimOperator { + + def create(value: Seq[Expression], + original: Expression): Expression = original match { + case a: StringTrim => + new ColumnarStringTrim(value.lift(0).get, value.lift(1), a) + case a: StringTrimLeft => + new ColumnarStringTrimLeft(value.lift(0).get, value.lift(1), a) + case a: StringTrimRight => + new ColumnarStringTrimRight(value.lift(0).get, value.lift(1), a) + + case other => + throw new UnsupportedOperationException(s"not currently supported: $other.") + } +} \ No newline at end of file diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarTernaryOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarTernaryOperator.scala index 976ec0f0b..49ca6b07c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarTernaryOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarTernaryOperator.scala @@ -72,11 +72,53 @@ class ColumnarSubString(str: Expression, pos: Expression, len: Expression, origi } } +// StringSplit, not functionality ready, need array type support. +class ColumnarStringSplit(child: Expression, regex: Expression, + limit: Expression, original: Expression) + extends StringSplit(child: Expression, + regex: Expression, limit: Expression) + with ColumnarExpression + with Logging { + + buildCheck() + + def buildCheck(): Unit = { + val supportedTypes = List( + StringType + ) + if (supportedTypes.indexOf(child.dataType) == -1) { + throw new UnsupportedOperationException( + s"${child.dataType} is not supported in ColumnarStringSplit.") + } + } + + override def doColumnarCodeGen(args: java.lang.Object) + : (TreeNode, ArrowType) = { + val (child_node, childType): (TreeNode, ArrowType) = + child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val (regex_node, regexType): (TreeNode, ArrowType) = + regex.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val (limit_node, limitType): (TreeNode, ArrowType) = + limit.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + + val resultType = new ArrowType.Bool() + val funcNode = + TreeBuilder.makeFunction( + "split_part", Lists.newArrayList(child_node, regex_node, + limit_node), resultType) + (funcNode, resultType) + } +} + object ColumnarTernaryOperator { - def create(str: Expression, pos: Expression, len: Expression, original: Expression): Expression = original match { + def create(str: Expression, pos: Expression, len: Expression, + original: Expression): Expression = original match { case ss: Substring => new ColumnarSubString(str, pos, len, ss) + // Currently not supported. +// case a: StringSplit => +// new ColumnarStringSplit(str, a.regex, a.limit, a) case other => throw new UnsupportedOperationException(s"not currently supported: $other.") }