Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-51] add more datatype fallback logic in columnar operators #72

Merged
merged 1 commit into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,24 @@ case class ColumnarConditionProjectExec(
})
// check expr
if (condExpr != null) {
try {
ConverterUtils.checkIfTypeSupported(condExpr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${condExpr.dataType} is not supported in ColumnarConditionProjector.")
}
ColumnarExpressionConverter.replaceWithColumnarExpression(condExpr)
}
if (projectList != null) {
for (expr <- projectList) {
try {
ConverterUtils.checkIfTypeSupported(expr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${expr.dataType} is not supported in ColumnarConditionProjector.")
}
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ case class ColumnarHashAggregateExec(
listJars.foreach(jar => logInfo(s"Uploaded ${jar}"))

def buildCheck(): Unit = {
// check datatype
// check input datatype
for (attr <- child.output) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
Expand All @@ -157,6 +157,16 @@ case class ColumnarHashAggregateExec(
s"${attr.dataType} is not supported in ColumnarAggregation")
}
}
// check output datatype
resultExpressions.foreach(expr => {
try {
ConverterUtils.checkIfTypeSupported(expr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${expr.dataType} is not supported in ColumnarAggregation")
}
})
// check project
for (expr <- aggregateExpressions) {
val internalExpressionList = expr.aggregateFunction.children
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ class ColumnarBitwiseXor(left: Expression, right: Expression, original: Expressi

object ColumnarBinaryArithmetic {

def create(left: Expression, right: Expression, original: Expression): Expression =
original match {
def create(left: Expression, right: Expression, original: Expression): Expression = {
buildCheck(left, right)
original match {
case a: Add =>
new ColumnarAdd(left, right, a)
case s: Subtract =>
Expand All @@ -229,4 +230,16 @@ object ColumnarBinaryArithmetic {
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
}

def buildCheck(left: Expression, right: Expression): Unit = {
try {
ConverterUtils.checkIfTypeSupported(left.dataType)
ConverterUtils.checkIfTypeSupported(right.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${left.dataType} or ${right.dataType} is not supported in ColumnarBinaryArithmetic")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ class ColumnarShiftRight(left: Expression, right: Expression, original: Expressi

object ColumnarBinaryOperator {

def create(left: Expression, right: Expression, original: Expression): Expression =
def create(left: Expression, right: Expression, original: Expression): Expression = {
buildCheck(left, right)
original match {
case a: And =>
new ColumnarAnd(left, right, a)
Expand Down Expand Up @@ -443,4 +444,16 @@ object ColumnarBinaryOperator {
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
}

def buildCheck(left: Expression, right: Expression): Unit = {
try {
ConverterUtils.checkIfTypeSupported(left.dataType)
ConverterUtils.checkIfTypeSupported(right.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${left.dataType} or ${right.dataType} is not supported in ColumnarBinaryOperator")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ class ColumnarCaseWhen(
extends CaseWhen(branches: Seq[(Expression, Expression)] ,elseValue: Option[Expression])
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val exprs = branches.flatMap(b => b._1 :: b._2 :: Nil) ++ elseValue
exprs.foreach(expr => try {
ConverterUtils.checkIfTypeSupported(expr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${dataType} is not supported in ColumnarCaseWhen")
})
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
logInfo(s"children: ${branches.flatMap(b => b._1 :: b._2 :: Nil) ++ elseValue}")
logInfo(s"branches: $branches")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ class ColumnarCoalesce(exps: Seq[Expression], original: Expression)
extends Coalesce(exps: Seq[Expression])
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
exps.foreach(expr => try {
ConverterUtils.checkIfTypeSupported(expr.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${expr.dataType} is not supported in ColumnarCoalesce")
}
)
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val iter: Iterator[Expression] = exps.iterator
val exp = iter.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ class ColumnarConcat(exps: Seq[Expression], original: Expression)
extends Concat(exps: Seq[Expression])
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
exps.foreach(expr =>
if (expr.dataType != StringType) {
throw new UnsupportedOperationException(
s"${expr.dataType} is not supported in ColumnarConcat")
})
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val iter: Iterator[Expression] = exps.iterator
val exp = iter.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ class ColumnarIf(predicate: Expression, trueValue: Expression,
extends If(predicate: Expression, trueValue: Expression, falseValue: Expression)
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
try {
ConverterUtils.checkIfTypeSupported(predicate.dataType)
ConverterUtils.checkIfTypeSupported(trueValue.dataType)
ConverterUtils.checkIfTypeSupported(falseValue.dataType)
} catch {
case e : UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${predicate.dataType} or ${trueValue.dataType} or ${falseValue.dataType} " +
s"is not supported in ColumnarIf")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (predicate_node, predicateType): (TreeNode, ArrowType) =
predicate.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ class ColumnarLiteral(lit: Literal)
val resultType: ArrowType = buildCheck()

def buildCheck(): ArrowType = {
val supportedTypes = List(StringType, IntegerType, LongType, DoubleType, DateType,
BooleanType, CalendarIntervalType, BinaryType)
val supportedTypes =
List(StringType, IntegerType, LongType, DoubleType, DateType,
BooleanType, CalendarIntervalType, BinaryType)
if (supportedTypes.indexOf(dataType) == -1 && !dataType.isInstanceOf[DecimalType]) {
// Decimal is supported in ColumnarLiteral
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ class ColumnarSubString(str: Expression, pos: Expression, len: Expression, origi
extends Substring(str: Expression, pos: Expression, len: Expression)
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
if (str.dataType != StringType) {
throw new UnsupportedOperationException(
s"${str.dataType} is not supported in ColumnarSubString")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (str_node, strType): (TreeNode, ArrowType) =
str.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ class ColumnarIsNotNull(child: Expression, original: Expression)
extends IsNotNull(child: Expression)
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(ByteType, ShortType, IntegerType, LongType, FloatType,
DoubleType, DateType, TimestampType, BooleanType, StringType, BinaryType)
if (supportedTypes.indexOf(child.dataType) == -1 &&
!child.dataType.isInstanceOf[DecimalType]) {
throw new UnsupportedOperationException(
s"${child.dataType} is not supported in ColumnarIsNotNull.")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (child_node, childType): (TreeNode, ArrowType) =
child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
Expand All @@ -56,6 +69,19 @@ class ColumnarIsNull(child: Expression, original: Expression)
extends IsNotNull(child: Expression)
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(ByteType, ShortType, IntegerType, LongType, FloatType,
DoubleType, DateType, TimestampType, BooleanType, StringType, BinaryType)
if (supportedTypes.indexOf(child.dataType) == -1 &&
!child.dataType.isInstanceOf[DecimalType]) {
throw new UnsupportedOperationException(
s"${child.dataType} is not supported in ColumnarIsNull.")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (child_node, childType): (TreeNode, ArrowType) =
child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
Expand All @@ -71,6 +97,17 @@ class ColumnarYear(child: Expression, original: Expression)
extends Year(child: Expression)
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(LongType, StringType, DateType)
if (supportedTypes.indexOf(child.dataType) == -1) {
throw new UnsupportedOperationException(
s"${child.dataType} is not supported in ColumnarYear.")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (child_node, childType): (TreeNode, ArrowType) =
child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
Expand All @@ -91,6 +128,17 @@ class ColumnarNot(child: Expression, original: Expression)
extends Not(child: Expression)
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(BooleanType)
if (supportedTypes.indexOf(child.dataType) == -1) {
throw new UnsupportedOperationException(
s"${child.dataType} is not supported in ColumnarNot.")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (child_node, childType): (TreeNode, ArrowType) =
child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
Expand All @@ -111,11 +159,12 @@ class ColumnarAbs(child: Expression, original: Expression)

def buildCheck(): Unit = {
val supportedTypes = List(FloatType, DoubleType)
if (supportedTypes.indexOf(dataType) == -1) {
if (supportedTypes.indexOf(child.dataType) == -1) {
throw new UnsupportedOperationException(
s"${dataType} is not supported in ColumnarAbs")
s"${child.dataType} is not supported in ColumnarAbs")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (child_node, childType): (TreeNode, ArrowType) =
child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
Expand All @@ -131,6 +180,17 @@ class ColumnarUpper(child: Expression, original: Expression)
extends Upper(child: Expression)
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(StringType)
if (supportedTypes.indexOf(child.dataType) == -1) {
throw new UnsupportedOperationException(
s"${child.dataType} is not supported in ColumnarUpper")
}
}

override def doColumnarCodeGen(args: java.lang.Object): (TreeNode, ArrowType) = {
val (child_node, childType): (TreeNode, ArrowType) =
child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
Expand All @@ -146,6 +206,17 @@ class ColumnarBitwiseNot(child: Expression, original: Expression)
extends BitwiseNot(child: Expression)
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(IntegerType, LongType)
if (supportedTypes.indexOf(child.dataType) == -1) {
throw new UnsupportedOperationException(
s"${child.dataType} is not supported in ColumnarBitwiseNot")
}
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (child_node, childType): (TreeNode, ArrowType) =
child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
Expand All @@ -162,6 +233,18 @@ class ColumnarCheckOverflow(child: Expression, original: CheckOverflow)
extends CheckOverflow(child: Expression, original.dataType: DecimalType, original.nullOnOverflow: Boolean)
with ColumnarExpression
with Logging {

buildCheck()

def buildCheck(): Unit = {
val supportedTypes = List(IntegerType, LongType, FloatType, DoubleType, StringType)
if (supportedTypes.indexOf(child.dataType) == -1 &&
!child.dataType.isInstanceOf[DecimalType]) {
throw new UnsupportedOperationException(
s"${child.dataType} is not supported in ColumnarCheckOverflow")
}
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (child_node, childType): (TreeNode, ArrowType) =
child.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
Expand Down