Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-1019] [NSE-1020] Support more date formats and be aware of local time zone in handling unix timestamp #1021

Merged
merged 11 commits into from
Jul 15, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ package com.intel.oap.expression
import java.util.Collections

import com.google.common.collect.Lists
import com.intel.oap.expression.ColumnarDateTimeExpressions.castDateFromTimestamp
import com.intel.oap.expression.ColumnarDateTimeExpressions.unimplemented
import org.apache.arrow.gandiva.expression.TreeBuilder
import org.apache.arrow.gandiva.expression.TreeNode
import org.apache.arrow.vector.types.{DateUnit, TimeUnit}
import org.apache.arrow.vector.types.pojo.ArrowType

import org.apache.spark.sql.catalyst.expressions.CheckOverflow
import org.apache.spark.sql.catalyst.expressions.CurrentDate
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp
import org.apache.spark.sql.catalyst.expressions.DateDiff
Expand Down Expand Up @@ -444,12 +441,19 @@ object ColumnarDateTimeExpressions {
}

/**
* Converts time string with given pattern to Unix time stamp (in seconds), returns null if fail.
*/
* Converts time string with given pattern to Unix timestamp (in seconds), returns null if fail.
* The input is the date/time for local timezone (can be configured in spark) and the result is
* the timestamp for UTC. So we need consider timezone difference.
* */
class ColumnarUnixTimestamp(left: Expression, right: Expression)
extends UnixTimestamp(left, right) with
ColumnarExpression {

val yearMonthDayFormat = "yyyy-MM-dd"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After double check, castDATE_nullsafe currently still cannot handle no separator date format, i.e., yyyyMMdd, as before. Let's enable it if there is a requirement from users.

val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss"
val yearMonthDayTimeNoSepFormat = "yyyyMMddHHmmss"
var formatLiteral: String = null

buildCheck()

def buildCheck(): Unit = {
Expand All @@ -458,37 +462,67 @@ object ColumnarDateTimeExpressions {
throw new UnsupportedOperationException(
s"${left.dataType} is not supported in ColumnarUnixTimestamp.")
}
// The format is only applicable for StringType left input.
if (left.dataType == StringType) {
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
if (format.length > 10) {
this.formatLiteral = literal.value.toString.trim
// Only support yyyy-MM-dd or yyyy-MM-dd HH:mm:ss.
if (!this.formatLiteral.equals(yearMonthDayFormat) &&
!this.formatLiteral.equals(yearMonthDayTimeFormat) &&
!this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) {
throw new UnsupportedOperationException(
s"$format is not supported in ColumnarUnixTimestamp.")
s"$formatLiteral is not supported in ColumnarUnixTimestamp.")
}
case _ =>
throw new UnsupportedOperationException("Only literal format is" +
" supported for ColumnarUnixTimestamp!")
}
}
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (rightNode, rightType) = right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val outType = CodeGeneration.getResultType(dataType)
val milliType = new ArrowType.Date(DateUnit.MILLISECOND)
val dateNode = if (left.dataType == TimestampType) {
val milliNode = ConverterUtils.convertTimestampToMicro(leftNode, leftType)._1
TreeBuilder.makeFunction(
"unix_seconds", Lists.newArrayList(milliNode), CodeGeneration.getResultType(dataType))
} else if (left.dataType == StringType) {
// Convert from UTF8 to Date[Millis].
val dateNode = TreeBuilder.makeFunction(
"castDATE_nullsafe", Lists.newArrayList(leftNode), milliType)
val intNode = TreeBuilder.makeFunction("castBIGINT",
Lists.newArrayList(dateNode), outType)
// Convert from milliseconds to seconds.
TreeBuilder.makeFunction("divide", Lists.newArrayList(intNode,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType)
if (this.formatLiteral.equals(yearMonthDayFormat)) {
// Convert from UTF8 to Date[Millis].
val dateNode = TreeBuilder.makeFunction(
"castDATE_nullsafe", Lists.newArrayList(leftNode), milliType)
val intNode = TreeBuilder.makeFunction("castBIGINT",
Lists.newArrayList(dateNode), outType)
// Convert from milliseconds to seconds.
TreeBuilder.makeFunction("divide", Lists.newArrayList(
ConverterUtils.subtractTimestampOffset(intNode),
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType)
} else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) {
val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")
val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP_withCarrying",
Lists.newArrayList(leftNode), timestampType)
val castNode = TreeBuilder.makeFunction("castBIGINT",
Lists.newArrayList(timestampNode), outType)
TreeBuilder.makeFunction("divide", Lists.newArrayList(
ConverterUtils.subtractTimestampOffset(castNode),
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType)
} else if (this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) {
val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")
val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP_withCarrying_withoutSep",
Lists.newArrayList(leftNode), timestampType)
// The result is in milliseconds.
val castNode = TreeBuilder.makeFunction("castBIGINT",
Lists.newArrayList(timestampNode), outType)
// Convert to the timestamp in seconds.
TreeBuilder.makeFunction("divide", Lists.newArrayList(
ConverterUtils.subtractTimestampOffset(castNode),
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType)
} else {
throw new RuntimeException("Unexpected format for ColumnarUnixTimestamp!")
}
} else {
// Convert from Date[Day] to seconds.
TreeBuilder.makeFunction(
Expand Down Expand Up @@ -533,7 +567,7 @@ object ColumnarDateTimeExpressions {
if (left.dataType == StringType) {
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
val format = literal.value.toString.trim
// TODO: support other format.
if (!format.equals("yyyy-MM-dd")) {
throw new UnsupportedOperationException(
Expand All @@ -553,7 +587,7 @@ object ColumnarDateTimeExpressions {

right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
val format = literal.value.toString.trim
if (format.equals("yyyy-MM-dd")) {
val funcNode = TreeBuilder.makeFunction("castTIMESTAMP_with_validation_check",
Lists.newArrayList(leftNode), intermediateType)
Expand All @@ -572,10 +606,19 @@ object ColumnarDateTimeExpressions {
copy(leftChild = newLeft, rightChild = newRight)
}

/**
* The result is the date/time for local timezone (can be configured in spark). The input is
* the timestamp for UTC. So we need consider timezone difference.
*/
class ColumnarFromUnixTime(left: Expression, right: Expression)
extends FromUnixTime(left, right) with
ColumnarExpression {

var formatLiteral: String = null
val yearMonthDayFormat = "yyyy-MM-dd"
val yearMonthDayNoSepFormat = "yyyyMMdd"
val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss"

buildCheck()

def buildCheck(): Unit = {
Expand All @@ -587,45 +630,68 @@ object ColumnarDateTimeExpressions {
if (left.dataType == LongType) {
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
if (!format.equals("yyyy-MM-dd") && !format.equals("yyyyMMdd")) {
this.formatLiteral = literal.value.toString.trim
if (!formatLiteral.equals(yearMonthDayFormat) &&
!formatLiteral.equals(yearMonthDayNoSepFormat) &&
!formatLiteral.equals(yearMonthDayTimeFormat)) {
throw new UnsupportedOperationException(
s"$format is not supported in ColumnarFromUnixTime.")
}
case _ =>
throw new UnsupportedOperationException("Only literal format is supported!")
}
}
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
//val (rightNode, rightType) = right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (leftNode, _) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val outType = CodeGeneration.getResultType(dataType)
val date32LeftNode = if (left.dataType == LongType) {
// cast unix seconds to date64()
val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true))
val date64Node = TreeBuilder.makeFunction("castDATE",
Lists.newArrayList(milliNode), new ArrowType.Date(DateUnit.MILLISECOND))
TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node), new ArrowType.Date(DateUnit.DAY))
if (this.formatLiteral.equals(yearMonthDayFormat) ||
this.formatLiteral.equals(yearMonthDayNoSepFormat)) {
val date32LeftNode = if (left.dataType == LongType) {
// cast unix seconds to date64()
val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true))
val date64Node = TreeBuilder.makeFunction("castDATE",
Lists.newArrayList(ConverterUtils.addTimestampOffset(milliNode)),
new ArrowType.Date(DateUnit.MILLISECOND))
TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node),
new ArrowType.Date(DateUnit.DAY))
} else {
throw new UnsupportedOperationException(
s"${left.dataType} is not supported in ColumnarFromUnixTime.")
}
var formatLength = 0L
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString.trim
if (format.equals(yearMonthDayFormat)) {
formatLength = 10L
} else if (format.equals(yearMonthDayNoSepFormat)) {
formatLength = 8L
}
}
val dateNode = TreeBuilder.makeFunction(
"castVARCHAR", Lists.newArrayList(date32LeftNode,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(formatLength))), outType)
(dateNode, outType)
} else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) {
// Only millisecond based input is expected in following functions, but the raw input
// is second based. So we make the below conversion.
val tsInMilliSecNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(
leftNode, TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))),
new ArrowType.Int(64, true))
val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)
val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP",
Lists.newArrayList(ConverterUtils.addTimestampOffset(tsInMilliSecNode)), timestampType)
// The largest length for yyyy-MM-dd HH:mm:ss.
val lenNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(19L))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The length (19) is set for the output string length. It's a fixed value for 'yyyy-MM-dd HH:mm:ss' format.

val resultNode = TreeBuilder.makeFunction("castVARCHAR",
Lists.newArrayList(timestampNode, lenNode), outType)
(resultNode, outType)
} else {
throw new UnsupportedOperationException(
s"${left.dataType} is not supported in ColumnarFromUnixTime.")
throw new RuntimeException("Unexpected format is used!")
}
var formatLength = 0L
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
if (format.equals("yyyy-MM-dd")) {
formatLength = 10L
} else if (format.equals("yyyyMMdd")) {
formatLength = 8L
}
}
val dateNode = TreeBuilder.makeFunction(
"castVARCHAR", Lists.newArrayList(date32LeftNode,
TreeBuilder.makeLiteral(java.lang.Long.valueOf(formatLength))), outType)
(dateNode, outType)
}
}

Expand Down Expand Up @@ -678,7 +744,8 @@ object ColumnarDateTimeExpressions {
}

override def supportColumnarCodegen(args: Object): Boolean = {
false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args)
false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) &&
right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.arrow.gandiva.expression._
import org.apache.arrow.gandiva.expression.ExpressionTree
import org.apache.arrow.gandiva.ipc.GandivaTypes
import org.apache.arrow.gandiva.ipc.GandivaTypes.ExpressionList
import org.apache.arrow.gandiva.expression.TreeBuilder
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector._
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel, WriteChannel}
Expand Down Expand Up @@ -61,7 +62,7 @@ import org.apache.arrow.vector.types.TimeUnit
import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision}
import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils
Expand Down Expand Up @@ -699,6 +700,30 @@ object ConverterUtils extends Logging {
throw new UnsupportedOperationException()
}

/**
* Add an offset (can be negative) in millisecond for given timestamp node to
* align with spark's timezone awareness. It can be used in converting timestamp
* counted from unix epoch (UTC) to local date/time.
*/
def addTimestampOffset(timestampNode: TreeNode): TreeNode = {
val offset = DateTimeUtils.getTimeZone(SparkSchemaUtils.getLocalTimezoneID()).getOffset(0)
val offsetNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(offset))
TreeBuilder.makeFunction("add", Lists.newArrayList(timestampNode, offsetNode),
new ArrowType.Int(64, true))
}

/**
* Subtract an offset (can be negative) in millisecond for given timestamp node.
* It can be used in getting timestamp counted from unix epoch (UTC) for a given
* local date/time.
*/
def subtractTimestampOffset(timestampNode: TreeNode): TreeNode = {
val offset = DateTimeUtils.getTimeZone(SparkSchemaUtils.getLocalTimezoneID()).getOffset(0)
val offsetNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(offset))
TreeBuilder.makeFunction("subtract", Lists.newArrayList(timestampNode, offsetNode),
new ArrowType.Int(64, true))
}

def powerOfTen(pow: Int): (String, Int, Int) = {
val POWERS_OF_10: Array[(String, Int, Int)] = Array(
("1", 1, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,9 @@ class DateTimeSuite extends QueryTest with SharedSparkSession {
.isInstanceOf[ColumnarConditionProjectExec]).isDefined)
checkAnswer(
frame,
Seq(Row(java.lang.Long.valueOf(1248912000L)),
Row(java.lang.Long.valueOf(1248998400L)),
Row(java.lang.Long.valueOf(1249084800L))))
Seq(Row(java.lang.Long.valueOf(1248940800L)),
Row(java.lang.Long.valueOf(1249027200L)),
Row(java.lang.Long.valueOf(1249113600L))))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected result has been corrected, which is verified by checking spark's result.

}
}
}