Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-375] Implement a series of datetime functions (#376)
Browse files Browse the repository at this point in the history
* [NSE-375] Implement a series of datetime functions

Close #375
  • Loading branch information
zhztheplayer authored Jun 27, 2021
1 parent cac302a commit c9f3c0b
Show file tree
Hide file tree
Showing 7 changed files with 953 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import org.apache.arrow.vector.types.pojo.Field
import org.apache.arrow.vector.types.IntervalUnit
import org.apache.arrow.vector.types.DateUnit
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

import scala.collection.mutable.ListBuffer

import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDateDiff
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixTimestamp

/**
* A version of add that supports columnar processing for longs.
*/
Expand Down Expand Up @@ -76,6 +79,10 @@ object ColumnarBinaryExpression {
original match {
case s: DateAddInterval =>
new ColumnarDateAddInterval(left, right, s)
case s: DateDiff =>
new ColumnarDateDiff(left, right)
case a: UnixTimestamp =>
new ColumnarUnixTimestamp(left, right)
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ColumnarLiteral(lit: Literal)
def buildCheck(): ArrowType = {
val supportedTypes =
List(StringType, IntegerType, LongType, DoubleType, DateType,
BooleanType, CalendarIntervalType, BinaryType)
BooleanType, CalendarIntervalType, BinaryType, TimestampType)
if (supportedTypes.indexOf(dataType) == -1 && !dataType.isInstanceOf[DecimalType]) {
// Decimal is supported in ColumnarLiteral
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -135,6 +135,13 @@ class ColumnarLiteral(lit: Literal)
case _ =>
(TreeBuilder.makeLiteral(value.asInstanceOf[java.lang.Boolean]), resultType)
}
case t : TimestampType =>
value match {
case null =>
(TreeBuilder.makeNull(resultType), resultType)
case _ =>
(TreeBuilder.makeLiteral(value.asInstanceOf[java.lang.Long]), resultType)
}
case c: CalendarIntervalType =>
value match {
case null =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.types._
import scala.collection.mutable.ListBuffer

import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDayOfMonth
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDayOfWeek
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDayOfYear
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarHour
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarHour
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarMicrosToTimestamp
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarMillisToTimestamp
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarMinute
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarSecond
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarSecondsToTimestamp
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixDate
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixMicros
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixMillis
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixSeconds
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixTimestamp
import org.apache.arrow.vector.types.TimeUnit

import org.apache.spark.sql.catalyst.util.DateTimeConstants
Expand Down Expand Up @@ -441,7 +456,7 @@ class ColumnarCast(
s"${child.dataType} is not supported in castFLOAT8")
}
} else if (dataType == DateType) {
val supported = List(IntegerType, LongType, DateType, TimestampType)
val supported = List(IntegerType, LongType, DateType, TimestampType, StringType)
if (supported.indexOf(child.dataType) == -1) {
throw new UnsupportedOperationException(s"${child.dataType} is not supported in castDATE")
}
Expand Down Expand Up @@ -573,6 +588,11 @@ class ColumnarCast(
val localizedDateNode = TreeBuilder.makeFunction("castDATE",
Lists.newArrayList(localizedTimestampNode), toType)
localizedDateNode
case s: StringType =>
val intermediate = new ArrowType.Date(DateUnit.MILLISECOND)
TreeBuilder.makeFunction("castDATE", Lists
.newArrayList(TreeBuilder.makeFunction("castDATE", Lists
.newArrayList(child_node0), intermediate)), toType)
case other => TreeBuilder.makeFunction("castDATE", Lists.newArrayList(child_node0),
toType)
}
Expand Down Expand Up @@ -741,11 +761,23 @@ object ColumnarUnaryOperator {
case i: IsNotNull =>
new ColumnarIsNotNull(child, i)
case y: Year =>
new ColumnarYear(child, y)
if (child.dataType.isInstanceOf[TimestampType]) {
new ColumnarDateTimeExpressions.ColumnarYear(child)
} else {
new ColumnarYear(child, y)
}
case m: Month =>
new ColumnarMonth(child, m)
if (child.dataType.isInstanceOf[TimestampType]) {
new ColumnarDateTimeExpressions.ColumnarMonth(child)
} else {
new ColumnarMonth(child, m)
}
case d: DayOfMonth =>
new ColumnarDayOfMonth(child, d)
if (child.dataType.isInstanceOf[TimestampType]) {
new ColumnarDateTimeExpressions.ColumnarDayOfMonth(child)
} else {
new ColumnarDayOfMonth(child, d)
}
case n: Not =>
new ColumnarNot(child, n)
case a: Abs =>
Expand All @@ -768,7 +800,40 @@ object ColumnarUnaryOperator {
child
case a: CheckOverflow =>
new ColumnarCheckOverflow(child, a)
case a: UnixDate =>
new ColumnarUnixDate(child)
case a: UnixSeconds =>
new ColumnarUnixSeconds(child)
case a: UnixMillis =>
new ColumnarUnixMillis(child)
case a: UnixMicros =>
new ColumnarUnixMicros(child)
case a: SecondsToTimestamp =>
new ColumnarSecondsToTimestamp(child)
case a: MillisToTimestamp =>
new ColumnarMillisToTimestamp(child)
case a: MicrosToTimestamp =>
new ColumnarMicrosToTimestamp(child)
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
child.dataType match {
case _: DateType => other match {
case a: DayOfYear =>
new ColumnarDayOfYear(new ColumnarCast(child, TimestampType, None, null))
case a: DayOfWeek =>
new ColumnarDayOfWeek(new ColumnarCast(child, TimestampType, None, null))
}
case _: TimestampType => other match {
case a: Hour =>
new ColumnarHour(child)
case a: Minute =>
new ColumnarMinute(child)
case a: Second =>
new ColumnarSecond(child)
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
case _ =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision}

import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils
Expand Down Expand Up @@ -545,6 +546,54 @@ object ConverterUtils extends Logging {
}
}

def toInt32(inNode: TreeNode, inType: ArrowType): (TreeNode, ArrowType) = {
val toType = ArrowUtils.toArrowType(IntegerType, null)
val toNode = TreeBuilder.makeFunction("castINT", Lists.newArrayList(inNode),
toType)
(toNode, toType)
}

// use this carefully
def toGandivaMicroUTCTimestamp(inNode: TreeNode, inType: ArrowType,
timeZoneId: Option[String] = None): (TreeNode, ArrowType) = {
val zoneId = timeZoneId.orElse(Some(SparkSchemaUtils.getLocalTimezoneID())).get
val utcTimestampNodeOriginal = inNode
val inTimestampType = asTimestampType(inType)
val inTimestampTypeUTC = new ArrowType.Timestamp(inTimestampType.getUnit,
"UTC")
ConverterUtils.convertTimestampToMicro(utcTimestampNodeOriginal,
inTimestampTypeUTC)
}

// use this carefully
def toGandivaTimestamp(inNode: TreeNode, inType: ArrowType,
timeZoneId: Option[String] = None): (TreeNode, ArrowType) = {
val zoneId = timeZoneId.orElse(Some(SparkSchemaUtils.getLocalTimezoneID())).get

val utcTimestampNodeOriginal = inNode
val utcTimestampNodeMilli = ConverterUtils.convertTimestampToMilli(utcTimestampNodeOriginal,
inType)._1
val utcTimestampNodeLong = TreeBuilder.makeFunction("castBIGINT",
Lists.newArrayList(utcTimestampNodeMilli), new ArrowType.Int(64,
true))
val diff = SparkSchemaUtils.getTimeZoneIDOffset(zoneId) *
DateTimeConstants.MILLIS_PER_SECOND

val localizedTimestampNodeLong = TreeBuilder.makeFunction("add",
Lists.newArrayList(utcTimestampNodeLong,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(diff))),
new ArrowType.Int(64, true))
val localized = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)
val localizedTimestampNode = TreeBuilder.makeFunction("castTIMESTAMP",
Lists.newArrayList(localizedTimestampNodeLong), localized)
(localizedTimestampNode, localized)
}

def toSparkTimestamp(inNode: TreeNode, inType: ArrowType,
timeZoneId: Option[String] = None): (TreeNode, ArrowType) = {
throw new UnsupportedOperationException()
}

def powerOfTen(pow: Int): (String, Int, Int) = {
val POWERS_OF_10: Array[(String, Int, Int)] = Array(
("1", 1, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object ArrowUtils {
throw new UnsupportedOperationException(
s"${TimestampType.catalogString} must supply timeZoneId parameter")
} else {
new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId)
new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")
}
case _ =>
throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}")
Expand Down
Loading

0 comments on commit c9f3c0b

Please sign in to comment.