Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-581] Add trim, left trim, right trim support in expression (#586)
Browse files Browse the repository at this point in the history
* Initial commit

* Add expression support: trim, regexp replace

* Remove regexp replace support

* Correct the code

* Fix out of range issue for a seq

* Judge whether contains subquery
  • Loading branch information
PHILO-HE authored Nov 30, 2021
1 parent 32e3783 commit b6487d7
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ object ColumnarExpressionConverter extends Logging {
attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
expr)
case st: String2TrimExpression =>
check_if_no_calculation = false
logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.")
val exps = st.children.map { expr =>
replaceWithColumnarExpression(
expr,
attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef)
}
ColumnarString2TrimOperator.create(exps, expr)
case i: If =>
check_if_no_calculation = false
logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.")
Expand Down Expand Up @@ -351,6 +361,8 @@ object ColumnarExpressionConverter extends Logging {
c.children.map(containsSubquery).exists(_ == true)
case b: BinaryExpression =>
containsSubquery(b.left) || containsSubquery(b.right)
case s: String2TrimExpression =>
s.children.map(containsSubquery).exists(_ == true)
case expr =>
throw new UnsupportedOperationException(
s" --> ${expr.getClass} | ${expr} is not currently supported.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.expression

import com.google.common.collect.Lists
import org.apache.arrow.gandiva.expression.{TreeBuilder, TreeNode}
import org.apache.arrow.vector.types.pojo.ArrowType

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Expression, StringTrim, StringTrimLeft, StringTrimRight}
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.util.ArrowUtils

// StringTrim
class ColumnarStringTrim(srcStr: Expression, trimStr: Option[Expression], original: Expression)
extends StringTrim(srcStr: Expression, trimStr: Option[Expression])
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(
StringType
)
// It is not supported to specify trimStr. By default, space is trimmed.
if (supportedTypes.indexOf(srcStr.dataType) == -1 || !trimStr.isEmpty) {
throw new UnsupportedOperationException(
s"${srcStr.dataType} is not supported in ColumnarStringTrim.")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (srcStr_node, srcStrType): (TreeNode, ArrowType) =
srcStr.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)

val resultType = ArrowUtils.toArrowType(StringType,
SparkSchemaUtils.getLocalTimezoneID())
val funcNode =
TreeBuilder.makeFunction("btrim", Lists.newArrayList(srcStr_node), resultType)
(funcNode, resultType)
}
}

// StringTrimLeft
class ColumnarStringTrimLeft(srcStr: Expression, trimStr: Option[Expression], original: Expression)
extends StringTrimLeft(srcStr: Expression, trimStr: Option[Expression])
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(
StringType
)
if (supportedTypes.indexOf(srcStr.dataType) == -1 || !trimStr.isEmpty) {
throw new UnsupportedOperationException(
s"${srcStr.dataType} is not supported in ColumnarStringTrimLeft.")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (srcStr_node, srcStrType): (TreeNode, ArrowType) =
srcStr.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)

val resultType = ArrowUtils.toArrowType(StringType,
SparkSchemaUtils.getLocalTimezoneID())
val funcNode =
TreeBuilder.makeFunction("ltrim", Lists.newArrayList(srcStr_node), resultType)
(funcNode, resultType)
}
}

// StringTrimRight
class ColumnarStringTrimRight(child: Expression, trimStr: Option[Expression], original: Expression)
extends StringTrimRight(child: Expression, trimStr: Option[Expression])
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(
StringType
)
if (supportedTypes.indexOf(child.dataType) == -1 || !trimStr.isEmpty) {
throw new UnsupportedOperationException(
s"${child.dataType} is not supported in ColumnarStringTrimRight.")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (child_node, childType): (TreeNode, ArrowType) =
child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)

val resultType = ArrowUtils.toArrowType(StringType,
SparkSchemaUtils.getLocalTimezoneID())
val funcNode =
TreeBuilder.makeFunction("rtrim", Lists.newArrayList(child_node), resultType)
(funcNode, resultType)
}
}

object ColumnarString2TrimOperator {

def create(value: Seq[Expression],
original: Expression): Expression = original match {
case a: StringTrim =>
new ColumnarStringTrim(value.lift(0).get, value.lift(1), a)
case a: StringTrimLeft =>
new ColumnarStringTrimLeft(value.lift(0).get, value.lift(1), a)
case a: StringTrimRight =>
new ColumnarStringTrimRight(value.lift(0).get, value.lift(1), a)

case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,53 @@ class ColumnarSubString(str: Expression, pos: Expression, len: Expression, origi
}
}

// StringSplit, not functionality ready, need array type support.
class ColumnarStringSplit(child: Expression, regex: Expression,
limit: Expression, original: Expression)
extends StringSplit(child: Expression,
regex: Expression, limit: Expression)
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(
StringType
)
if (supportedTypes.indexOf(child.dataType) == -1) {
throw new UnsupportedOperationException(
s"${child.dataType} is not supported in ColumnarStringSplit.")
}
}

override def doColumnarCodeGen(args: java.lang.Object)
: (TreeNode, ArrowType) = {
val (child_node, childType): (TreeNode, ArrowType) =
child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (regex_node, regexType): (TreeNode, ArrowType) =
regex.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (limit_node, limitType): (TreeNode, ArrowType) =
limit.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)

val resultType = new ArrowType.Bool()
val funcNode =
TreeBuilder.makeFunction(
"split_part", Lists.newArrayList(child_node, regex_node,
limit_node), resultType)
(funcNode, resultType)
}
}

object ColumnarTernaryOperator {

def create(str: Expression, pos: Expression, len: Expression, original: Expression): Expression = original match {
def create(str: Expression, pos: Expression, len: Expression,
original: Expression): Expression = original match {
case ss: Substring =>
new ColumnarSubString(str, pos, len, ss)
// Currently not supported.
// case a: StringSplit =>
// new ColumnarStringSplit(str, a.regex, a.limit, a)
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
Expand Down

0 comments on commit b6487d7

Please sign in to comment.