From 2e1841b9c806339c466ac9a0aaf06a50e4bd5c04 Mon Sep 17 00:00:00 2001 From: philo Date: Mon, 11 Jul 2022 16:57:11 +0800 Subject: [PATCH 01/11] Trim user-specified format in time expression --- .../expression/ColumnarDateTimeExpressions.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index 3381050ae..e529669a4 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -461,7 +461,7 @@ object ColumnarDateTimeExpressions { if (left.dataType == StringType) { right match { case literal: ColumnarLiteral => - val format = literal.value.toString + val format = literal.value.toString.trim if (format.length > 10) { throw new UnsupportedOperationException( s"$format is not supported in ColumnarUnixTimestamp.") @@ -533,7 +533,7 @@ object ColumnarDateTimeExpressions { if (left.dataType == StringType) { right match { case literal: ColumnarLiteral => - val format = literal.value.toString + val format = literal.value.toString.trim // TODO: support other format. if (!format.equals("yyyy-MM-dd")) { throw new UnsupportedOperationException( @@ -553,7 +553,7 @@ object ColumnarDateTimeExpressions { right match { case literal: ColumnarLiteral => - val format = literal.value.toString + val format = literal.value.toString.trim if (format.equals("yyyy-MM-dd")) { val funcNode = TreeBuilder.makeFunction("castTIMESTAMP_with_validation_check", Lists.newArrayList(leftNode), intermediateType) @@ -587,7 +587,7 @@ object ColumnarDateTimeExpressions { if (left.dataType == LongType) { right match { case literal: ColumnarLiteral => - val format = literal.value.toString + val format = literal.value.toString.trim if (!format.equals("yyyy-MM-dd") && !format.equals("yyyyMMdd")) { throw new UnsupportedOperationException( s"$format is not supported in ColumnarFromUnixTime.") @@ -615,7 +615,7 @@ object ColumnarDateTimeExpressions { var formatLength = 0L right match { case literal: ColumnarLiteral => - val format = literal.value.toString + val format = literal.value.toString.trim if (format.equals("yyyy-MM-dd")) { formatLength = 10L } else if (format.equals("yyyyMMdd")) { @@ -678,7 +678,8 @@ object ColumnarDateTimeExpressions { } override def supportColumnarCodegen(args: Object): Boolean = { - false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) + false && left.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) && + right.asInstanceOf[ColumnarExpression].supportColumnarCodegen(args) } } From 9751fdcdf5bfb4f56e6c819858aebe362c5683fc Mon Sep 17 00:00:00 2001 From: philo Date: Mon, 11 Jul 2022 19:32:43 +0800 Subject: [PATCH 02/11] Support other formats --- .../ColumnarDateTimeExpressions.scala | 41 ++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index e529669a4..90704826f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -450,6 +450,11 @@ object ColumnarDateTimeExpressions { extends UnixTimestamp(left, right) with ColumnarExpression { + val yearMonthDayFormat = "yyyy-MM-dd" + val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss" + val yearMonthDayTimeNoSepFormat = "yyyyMMddHHmmss" + var format: String + buildCheck() def buildCheck(): Unit = { @@ -458,15 +463,21 @@ object ColumnarDateTimeExpressions { throw new UnsupportedOperationException( s"${left.dataType} is not supported in ColumnarUnixTimestamp.") } + // The format is only applicable for StringType left input. if (left.dataType == StringType) { right match { case literal: ColumnarLiteral => - val format = literal.value.toString.trim - if (format.length > 10) { + this.format = literal.value.toString.trim + // Only support yyyy-MM-dd or yyyy-MM-dd HH:mm:ss. + if (!this.format.equals(yearMonthDayFormat) && + !this.format.equals(yearMonthDayTimeFormat) && + !this.format.equals(yearMonthDayTimeNoSepFormat)) { throw new UnsupportedOperationException( s"$format is not supported in ColumnarUnixTimestamp.") } case _ => + throw new UnsupportedOperationException("Only literal format is" + + " supported for ColumnarUnixTimestamp!") } } } @@ -481,14 +492,24 @@ object ColumnarDateTimeExpressions { TreeBuilder.makeFunction( "unix_seconds", Lists.newArrayList(milliNode), CodeGeneration.getResultType(dataType)) } else if (left.dataType == StringType) { - // Convert from UTF8 to Date[Millis]. - val dateNode = TreeBuilder.makeFunction( - "castDATE_nullsafe", Lists.newArrayList(leftNode), milliType) - val intNode = TreeBuilder.makeFunction("castBIGINT", - Lists.newArrayList(dateNode), outType) - // Convert from milliseconds to seconds. - TreeBuilder.makeFunction("divide", Lists.newArrayList(intNode, - TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) + if (format.equals(yearMonthDayFormat)) { + // Convert from UTF8 to Date[Millis]. + val dateNode = TreeBuilder.makeFunction( + "castDATE_nullsafe", Lists.newArrayList(leftNode), milliType) + val intNode = TreeBuilder.makeFunction("castBIGINT", + Lists.newArrayList(dateNode), outType) + // Convert from milliseconds to seconds. + TreeBuilder.makeFunction("divide", Lists.newArrayList(intNode, + TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) + } else if (format.equals(yearMonthDayTimeFormat)) { + TreeBuilder.makeFunction("castTIMESTAMP_withCarrying", + Lists.newArrayList(leftNode), outType) + } else if (format.equals(yearMonthDayTimeNoSepFormat)) { + TreeBuilder.makeFunction("castTIMESTAMP_withCarrying_withoutSep", + Lists.newArrayList(leftNode), outType) + } else { + throw new RuntimeException("Unexpected format for ColumnarUnixTimestamp!") + } } else { // Convert from Date[Day] to seconds. TreeBuilder.makeFunction( From 11f09775cbffec3b68b73768a8529440a9f83e3c Mon Sep 17 00:00:00 2001 From: philo Date: Mon, 11 Jul 2022 19:36:15 +0800 Subject: [PATCH 03/11] Change arrow branch [will revert at last] --- arrow-data-source/script/build_arrow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-data-source/script/build_arrow.sh b/arrow-data-source/script/build_arrow.sh index d8ec40128..5c7baaeb0 100755 --- a/arrow-data-source/script/build_arrow.sh +++ b/arrow-data-source/script/build_arrow.sh @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}" echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}" mkdir -p $ARROW_SOURCE_DIR mkdir -p $ARROW_INSTALL_DIR -git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR +git clone https://github.com/PHILO-HE/arrow.git --branch unixtime-func $ARROW_SOURCE_DIR pushd $ARROW_SOURCE_DIR cmake ./cpp \ From 478981e473821ea1372e3e2224e929aea04d55e2 Mon Sep 17 00:00:00 2001 From: philo Date: Mon, 11 Jul 2022 19:49:14 +0800 Subject: [PATCH 04/11] Fix issues --- .../ColumnarDateTimeExpressions.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index 90704826f..f2ba72376 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -453,7 +453,7 @@ object ColumnarDateTimeExpressions { val yearMonthDayFormat = "yyyy-MM-dd" val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss" val yearMonthDayTimeNoSepFormat = "yyyyMMddHHmmss" - var format: String + var formatLiteral: String = null buildCheck() @@ -467,13 +467,13 @@ object ColumnarDateTimeExpressions { if (left.dataType == StringType) { right match { case literal: ColumnarLiteral => - this.format = literal.value.toString.trim + this.formatLiteral = literal.value.toString.trim // Only support yyyy-MM-dd or yyyy-MM-dd HH:mm:ss. - if (!this.format.equals(yearMonthDayFormat) && - !this.format.equals(yearMonthDayTimeFormat) && - !this.format.equals(yearMonthDayTimeNoSepFormat)) { + if (!this.formatLiteral.equals(yearMonthDayFormat) && + !this.formatLiteral.equals(yearMonthDayTimeFormat) && + !this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) { throw new UnsupportedOperationException( - s"$format is not supported in ColumnarUnixTimestamp.") + s"$formatLiteral is not supported in ColumnarUnixTimestamp.") } case _ => throw new UnsupportedOperationException("Only literal format is" + @@ -492,7 +492,7 @@ object ColumnarDateTimeExpressions { TreeBuilder.makeFunction( "unix_seconds", Lists.newArrayList(milliNode), CodeGeneration.getResultType(dataType)) } else if (left.dataType == StringType) { - if (format.equals(yearMonthDayFormat)) { + if (this.formatLiteral.equals(yearMonthDayFormat)) { // Convert from UTF8 to Date[Millis]. val dateNode = TreeBuilder.makeFunction( "castDATE_nullsafe", Lists.newArrayList(leftNode), milliType) @@ -501,10 +501,10 @@ object ColumnarDateTimeExpressions { // Convert from milliseconds to seconds. TreeBuilder.makeFunction("divide", Lists.newArrayList(intNode, TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) - } else if (format.equals(yearMonthDayTimeFormat)) { + } else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) { TreeBuilder.makeFunction("castTIMESTAMP_withCarrying", Lists.newArrayList(leftNode), outType) - } else if (format.equals(yearMonthDayTimeNoSepFormat)) { + } else if (this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) { TreeBuilder.makeFunction("castTIMESTAMP_withCarrying_withoutSep", Lists.newArrayList(leftNode), outType) } else { From 18c8e7ebd06e86e1d7165c15b8bd486f0e14c560 Mon Sep 17 00:00:00 2001 From: philo Date: Tue, 12 Jul 2022 11:19:46 +0800 Subject: [PATCH 05/11] Do some converts --- .../ColumnarDateTimeExpressions.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index f2ba72376..66c4bcc62 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -484,7 +484,6 @@ object ColumnarDateTimeExpressions { override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = { val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) - val (rightNode, rightType) = right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) val outType = CodeGeneration.getResultType(dataType) val milliType = new ArrowType.Date(DateUnit.MILLISECOND) val dateNode = if (left.dataType == TimestampType) { @@ -502,11 +501,23 @@ object ColumnarDateTimeExpressions { TreeBuilder.makeFunction("divide", Lists.newArrayList(intNode, TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) } else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) { - TreeBuilder.makeFunction("castTIMESTAMP_withCarrying", - Lists.newArrayList(leftNode), outType) + val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC") + val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP_withCarrying", + Lists.newArrayList(leftNode), timestampType) + val castNode = TreeBuilder.makeFunction("castBIGINT", + Lists.newArrayList(timestampNode), outType) + TreeBuilder.makeFunction("divide", Lists.newArrayList(castNode, + TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) } else if (this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) { - TreeBuilder.makeFunction("castTIMESTAMP_withCarrying_withoutSep", - Lists.newArrayList(leftNode), outType) + val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC") + val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP_withCarrying_withoutSep", + Lists.newArrayList(leftNode), timestampType) + // The result is in milliseconds. + val castNode = TreeBuilder.makeFunction("castBIGINT", + Lists.newArrayList(timestampNode), outType) + // Convert to the timestamp in seconds. + TreeBuilder.makeFunction("divide", Lists.newArrayList(castNode, + TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) } else { throw new RuntimeException("Unexpected format for ColumnarUnixTimestamp!") } From 66d10d3a8a207433b7196903ab9112e45a4b8823 Mon Sep 17 00:00:00 2001 From: philo Date: Tue, 12 Jul 2022 16:23:59 +0800 Subject: [PATCH 06/11] Support more format for from_unixtime --- .../ColumnarDateTimeExpressions.scala | 81 ++++++++++++------- 1 file changed, 54 insertions(+), 27 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index 66c4bcc62..b0473919c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -608,6 +608,11 @@ object ColumnarDateTimeExpressions { extends FromUnixTime(left, right) with ColumnarExpression { + var formatLiteral: String = null + val yearMonthDayFormat = "yyyy-MM-dd" + val yearMonthDayNoSepFormat = "yyyyMMdd" + val yearMonthDayTimeFormat = "yyyy-MM-dd HH:mm:ss" + buildCheck() def buildCheck(): Unit = { @@ -619,45 +624,67 @@ object ColumnarDateTimeExpressions { if (left.dataType == LongType) { right match { case literal: ColumnarLiteral => - val format = literal.value.toString.trim - if (!format.equals("yyyy-MM-dd") && !format.equals("yyyyMMdd")) { + this.formatLiteral = literal.value.toString.trim + if (!formatLiteral.equals(yearMonthDayFormat) && + !formatLiteral.equals(yearMonthDayNoSepFormat) && + !formatLiteral.equals(yearMonthDayTimeFormat)) { throw new UnsupportedOperationException( s"$format is not supported in ColumnarFromUnixTime.") } case _ => + throw new UnsupportedOperationException("Only literal format is supported!") } } } override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = { - val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) - //val (rightNode, rightType) = right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + val (leftNode, _) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) val outType = CodeGeneration.getResultType(dataType) - val date32LeftNode = if (left.dataType == LongType) { - // cast unix seconds to date64() - val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode, - TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true)) - val date64Node = TreeBuilder.makeFunction("castDATE", - Lists.newArrayList(milliNode), new ArrowType.Date(DateUnit.MILLISECOND)) - TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node), new ArrowType.Date(DateUnit.DAY)) + if (this.formatLiteral.equals(yearMonthDayFormat) || + this.formatLiteral.equals(yearMonthDayNoSepFormat)) { + val date32LeftNode = if (left.dataType == LongType) { + // cast unix seconds to date64() + val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode, + TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true)) + val date64Node = TreeBuilder.makeFunction("castDATE", + Lists.newArrayList(milliNode), new ArrowType.Date(DateUnit.MILLISECOND)) + TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node), + new ArrowType.Date(DateUnit.DAY)) + } else { + throw new UnsupportedOperationException( + s"${left.dataType} is not supported in ColumnarFromUnixTime.") + } + var formatLength = 0L + right match { + case literal: ColumnarLiteral => + val format = literal.value.toString.trim + if (format.equals("yyyy-MM-dd")) { + formatLength = 10L + } else if (format.equals("yyyyMMdd")) { + formatLength = 8L + } + } + val dateNode = TreeBuilder.makeFunction( + "castVARCHAR", Lists.newArrayList(date32LeftNode, + TreeBuilder.makeLiteral(java.lang.Long.valueOf(formatLength))), outType) + (dateNode, outType) + } else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) { + // Only millisecond based input is expected in following functions, but the raw input + // is second based. So we make the below conversion. + val timestampInMilliSecNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList( + leftNode, TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), + new ArrowType.Int(64, true)) + val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC") + val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP", + Lists.newArrayList(timestampInMilliSecNode), timestampType) + // The longest length for yyyy-MM-dd HH:mm:ss. + val lenNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(19L)) + val resultNode = TreeBuilder.makeFunction("castVARCHAR", + Lists.newArrayList(timestampNode, lenNode), outType) + (resultNode, outType) } else { - throw new UnsupportedOperationException( - s"${left.dataType} is not supported in ColumnarFromUnixTime.") + throw new RuntimeException("Unexpected format is used!") } - var formatLength = 0L - right match { - case literal: ColumnarLiteral => - val format = literal.value.toString.trim - if (format.equals("yyyy-MM-dd")) { - formatLength = 10L - } else if (format.equals("yyyyMMdd")) { - formatLength = 8L - } - } - val dateNode = TreeBuilder.makeFunction( - "castVARCHAR", Lists.newArrayList(date32LeftNode, - TreeBuilder.makeLiteral(java.lang.Long.valueOf(formatLength))), outType) - (dateNode, outType) } } From 62646f3b4a5e9c1ce1e8130af4854fce73964b0d Mon Sep 17 00:00:00 2001 From: philo Date: Wed, 13 Jul 2022 09:35:20 +0800 Subject: [PATCH 07/11] Align with spark's timezone awareness --- .../ColumnarDateTimeExpressions.scala | 22 +++++++++------ .../intel/oap/expression/ConverterUtils.scala | 27 ++++++++++++++++++- .../apache/spark/sql/util/ArrowUtils.scala | 2 +- 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index b0473919c..e8a47fe37 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -26,7 +26,6 @@ import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.gandiva.expression.TreeNode import org.apache.arrow.vector.types.{DateUnit, TimeUnit} import org.apache.arrow.vector.types.pojo.ArrowType - import org.apache.spark.sql.catalyst.expressions.CheckOverflow import org.apache.spark.sql.catalyst.expressions.CurrentDate import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp @@ -55,6 +54,8 @@ import org.apache.spark.sql.catalyst.expressions.UnixMillis import org.apache.spark.sql.catalyst.expressions.UnixSeconds import org.apache.spark.sql.catalyst.expressions.UnixTimestamp import org.apache.spark.sql.catalyst.expressions.Year +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ByteType, DataType, DateType, IntegerType, LongType, ShortType, StringType, TimestampType} import org.apache.spark.sql.util.ArrowUtils @@ -498,7 +499,8 @@ object ColumnarDateTimeExpressions { val intNode = TreeBuilder.makeFunction("castBIGINT", Lists.newArrayList(dateNode), outType) // Convert from milliseconds to seconds. - TreeBuilder.makeFunction("divide", Lists.newArrayList(intNode, + TreeBuilder.makeFunction("divide", Lists.newArrayList( + ConverterUtils.subtractTimestampOffset(intNode), TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) } else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) { val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC") @@ -506,7 +508,8 @@ object ColumnarDateTimeExpressions { Lists.newArrayList(leftNode), timestampType) val castNode = TreeBuilder.makeFunction("castBIGINT", Lists.newArrayList(timestampNode), outType) - TreeBuilder.makeFunction("divide", Lists.newArrayList(castNode, + TreeBuilder.makeFunction("divide", Lists.newArrayList( + ConverterUtils.subtractTimestampOffset(castNode), TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) } else if (this.formatLiteral.equals(yearMonthDayTimeNoSepFormat)) { val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC") @@ -516,7 +519,8 @@ object ColumnarDateTimeExpressions { val castNode = TreeBuilder.makeFunction("castBIGINT", Lists.newArrayList(timestampNode), outType) // Convert to the timestamp in seconds. - TreeBuilder.makeFunction("divide", Lists.newArrayList(castNode, + TreeBuilder.makeFunction("divide", Lists.newArrayList( + ConverterUtils.subtractTimestampOffset(castNode), TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), outType) } else { throw new RuntimeException("Unexpected format for ColumnarUnixTimestamp!") @@ -647,7 +651,8 @@ object ColumnarDateTimeExpressions { val milliNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList(leftNode, TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(8 * 8, true)) val date64Node = TreeBuilder.makeFunction("castDATE", - Lists.newArrayList(milliNode), new ArrowType.Date(DateUnit.MILLISECOND)) + Lists.newArrayList(ConverterUtils.addTimestampOffset(milliNode)), + new ArrowType.Date(DateUnit.MILLISECOND)) TreeBuilder.makeFunction("castDATE", Lists.newArrayList(date64Node), new ArrowType.Date(DateUnit.DAY)) } else { @@ -671,12 +676,13 @@ object ColumnarDateTimeExpressions { } else if (this.formatLiteral.equals(yearMonthDayTimeFormat)) { // Only millisecond based input is expected in following functions, but the raw input // is second based. So we make the below conversion. - val timestampInMilliSecNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList( + val tsInMilliSecNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList( leftNode, TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(64, true)) - val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC") + val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, + SparkSchemaUtils.getLocalTimezoneID()) val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP", - Lists.newArrayList(timestampInMilliSecNode), timestampType) + Lists.newArrayList(ConverterUtils.addTimestampOffset(tsInMilliSecNode)), timestampType) // The longest length for yyyy-MM-dd HH:mm:ss. val lenNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(19L)) val resultNode = TreeBuilder.makeFunction("castVARCHAR", diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala index 6dfbcece5..d84b5d865 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala @@ -32,6 +32,7 @@ import org.apache.arrow.gandiva.expression._ import org.apache.arrow.gandiva.expression.ExpressionTree import org.apache.arrow.gandiva.ipc.GandivaTypes import org.apache.arrow.gandiva.ipc.GandivaTypes.ExpressionList +import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector._ import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel, WriteChannel} @@ -61,7 +62,7 @@ import org.apache.arrow.vector.types.TimeUnit import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision} -import org.apache.spark.sql.catalyst.util.DateTimeConstants +import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils @@ -699,6 +700,30 @@ object ConverterUtils extends Logging { throw new UnsupportedOperationException() } + /** + * Add offset in millisecond for given timestamp node for timezone awareness. + * Used to convert timestamp counted from unix epoch (UTC) to local date/time. + */ + def addTimestampOffset(timestampNode: TreeNode): TreeNode = { + // the offset in millisecond needs to be added for local timestamp configured in spark sql. + val offset = DateTimeUtils.getTimeZone(SparkSchemaUtils.getLocalTimezoneID()).getOffset(0) + val offsetNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(offset)) + TreeBuilder.makeFunction("add", Lists.newArrayList(timestampNode, offsetNode), + new ArrowType.Int(64, true)) + } + + /** + * Subtract offset in millisecond for given timestamp node. Used to get timestamp + * counted from unix epoch (UTC) for a local date/time. + */ + def subtractTimestampOffset(timestampNode: TreeNode): TreeNode = { + // the offset in millisecond needs to be added for local timestamp configured in spark sql. + val offset = DateTimeUtils.getTimeZone(SparkSchemaUtils.getLocalTimezoneID()).getOffset(0) + val offsetNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(offset)) + TreeBuilder.makeFunction("subtract", Lists.newArrayList(timestampNode, offsetNode), + new ArrowType.Int(64, true)) + } + def powerOfTen(pow: Int): (String, Int, Int) = { val POWERS_OF_10: Array[(String, Int, Int)] = Array( ("1", 1, 0), diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala index bf3a4c184..2213e94fc 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala @@ -52,7 +52,7 @@ object ArrowUtils { throw new UnsupportedOperationException( s"${TimestampType.catalogString} must supply timeZoneId parameter") } else { - new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC") + new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId) } case at: ArrayType => ArrowType.List.INSTANCE case _ => From e77e1b7a9a914de3203211316530c5aa6728b002 Mon Sep 17 00:00:00 2001 From: philo Date: Wed, 13 Jul 2022 10:13:18 +0800 Subject: [PATCH 08/11] Refine the code --- .../oap/expression/ColumnarDateTimeExpressions.scala | 9 ++------- .../com/intel/oap/expression/ConverterUtils.scala | 12 ++++++------ .../scala/org/apache/spark/sql/util/ArrowUtils.scala | 2 +- 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index e8a47fe37..c76b25bd0 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -20,13 +20,11 @@ package com.intel.oap.expression import java.util.Collections import com.google.common.collect.Lists -import com.intel.oap.expression.ColumnarDateTimeExpressions.castDateFromTimestamp -import com.intel.oap.expression.ColumnarDateTimeExpressions.unimplemented import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.gandiva.expression.TreeNode import org.apache.arrow.vector.types.{DateUnit, TimeUnit} import org.apache.arrow.vector.types.pojo.ArrowType -import org.apache.spark.sql.catalyst.expressions.CheckOverflow + import org.apache.spark.sql.catalyst.expressions.CurrentDate import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp import org.apache.spark.sql.catalyst.expressions.DateDiff @@ -54,8 +52,6 @@ import org.apache.spark.sql.catalyst.expressions.UnixMillis import org.apache.spark.sql.catalyst.expressions.UnixSeconds import org.apache.spark.sql.catalyst.expressions.UnixTimestamp import org.apache.spark.sql.catalyst.expressions.Year -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ByteType, DataType, DateType, IntegerType, LongType, ShortType, StringType, TimestampType} import org.apache.spark.sql.util.ArrowUtils @@ -679,8 +675,7 @@ object ColumnarDateTimeExpressions { val tsInMilliSecNode = TreeBuilder.makeFunction("multiply", Lists.newArrayList( leftNode, TreeBuilder.makeLiteral(java.lang.Long.valueOf(1000L))), new ArrowType.Int(64, true)) - val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, - SparkSchemaUtils.getLocalTimezoneID()) + val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null) val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP", Lists.newArrayList(ConverterUtils.addTimestampOffset(tsInMilliSecNode)), timestampType) // The longest length for yyyy-MM-dd HH:mm:ss. diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala index d84b5d865..273889bb4 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala @@ -701,11 +701,11 @@ object ConverterUtils extends Logging { } /** - * Add offset in millisecond for given timestamp node for timezone awareness. - * Used to convert timestamp counted from unix epoch (UTC) to local date/time. + * Add an offset (can be negative) in millisecond for given timestamp node to + * align with spark's timezone awareness. It can be used in converting timestamp + * counted from unix epoch (UTC) to local date/time. */ def addTimestampOffset(timestampNode: TreeNode): TreeNode = { - // the offset in millisecond needs to be added for local timestamp configured in spark sql. val offset = DateTimeUtils.getTimeZone(SparkSchemaUtils.getLocalTimezoneID()).getOffset(0) val offsetNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(offset)) TreeBuilder.makeFunction("add", Lists.newArrayList(timestampNode, offsetNode), @@ -713,11 +713,11 @@ object ConverterUtils extends Logging { } /** - * Subtract offset in millisecond for given timestamp node. Used to get timestamp - * counted from unix epoch (UTC) for a local date/time. + * Subtract an offset (can be negative) in millisecond for given timestamp node. + * It can be used in getting timestamp counted from unix epoch (UTC) for a given + * local date/time. */ def subtractTimestampOffset(timestampNode: TreeNode): TreeNode = { - // the offset in millisecond needs to be added for local timestamp configured in spark sql. val offset = DateTimeUtils.getTimeZone(SparkSchemaUtils.getLocalTimezoneID()).getOffset(0) val offsetNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(offset)) TreeBuilder.makeFunction("subtract", Lists.newArrayList(timestampNode, offsetNode), diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala index 2213e94fc..bf3a4c184 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala @@ -52,7 +52,7 @@ object ArrowUtils { throw new UnsupportedOperationException( s"${TimestampType.catalogString} must supply timeZoneId parameter") } else { - new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId) + new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC") } case at: ArrayType => ArrowType.List.INSTANCE case _ => From 0f418150230734ca8e46fd95a776d873a740b7aa Mon Sep 17 00:00:00 2001 From: philo Date: Wed, 13 Jul 2022 11:13:28 +0800 Subject: [PATCH 09/11] Add some comment --- .../expression/ColumnarDateTimeExpressions.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index c76b25bd0..bb428f8cc 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -441,8 +441,10 @@ object ColumnarDateTimeExpressions { } /** - * Converts time string with given pattern to Unix time stamp (in seconds), returns null if fail. - */ + * Converts time string with given pattern to Unix timestamp (in seconds), returns null if fail. + * The input is the date/time for local timezone (can be configured in spark) and the result is + * the timestamp for UTC. So we need consider timezone difference. + * */ class ColumnarUnixTimestamp(left: Expression, right: Expression) extends UnixTimestamp(left, right) with ColumnarExpression { @@ -604,6 +606,10 @@ object ColumnarDateTimeExpressions { copy(leftChild = newLeft, rightChild = newRight) } + /** + * The result is the date/time for local timezone (can be configured in spark). The input is + * the timestamp for UTC. So we need consider timezone difference. + */ class ColumnarFromUnixTime(left: Expression, right: Expression) extends FromUnixTime(left, right) with ColumnarExpression { @@ -659,9 +665,9 @@ object ColumnarDateTimeExpressions { right match { case literal: ColumnarLiteral => val format = literal.value.toString.trim - if (format.equals("yyyy-MM-dd")) { + if (format.equals(yearMonthDayFormat)) { formatLength = 10L - } else if (format.equals("yyyyMMdd")) { + } else if (format.equals(yearMonthDayNoSepFormat)) { formatLength = 8L } } @@ -678,7 +684,7 @@ object ColumnarDateTimeExpressions { val timestampType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null) val timestampNode = TreeBuilder.makeFunction("castTIMESTAMP", Lists.newArrayList(ConverterUtils.addTimestampOffset(tsInMilliSecNode)), timestampType) - // The longest length for yyyy-MM-dd HH:mm:ss. + // The largest length for yyyy-MM-dd HH:mm:ss. val lenNode = TreeBuilder.makeLiteral(java.lang.Long.valueOf(19L)) val resultNode = TreeBuilder.makeFunction("castVARCHAR", Lists.newArrayList(timestampNode, lenNode), outType) From ec240e586c2fdde0d705dd65b98745a52b362bbd Mon Sep 17 00:00:00 2001 From: philo Date: Wed, 13 Jul 2022 19:21:16 +0800 Subject: [PATCH 10/11] Correct the expected results in a UT --- .../src/test/scala/com/intel/oap/misc/DateTimeSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala index 0f9e41ef4..9778fef9c 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala @@ -782,9 +782,9 @@ class DateTimeSuite extends QueryTest with SharedSparkSession { .isInstanceOf[ColumnarConditionProjectExec]).isDefined) checkAnswer( frame, - Seq(Row(java.lang.Long.valueOf(1248912000L)), - Row(java.lang.Long.valueOf(1248998400L)), - Row(java.lang.Long.valueOf(1249084800L)))) + Seq(Row(java.lang.Long.valueOf(1248940800L)), + Row(java.lang.Long.valueOf(1249027200L)), + Row(java.lang.Long.valueOf(1249113600L)))) } } } From 18e21929dc82a74267a14538a1f4ce441afbb214 Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 15 Jul 2022 10:07:16 +0800 Subject: [PATCH 11/11] Revert "Change arrow branch [will revert at last]" This reverts commit 11f09775cbffec3b68b73768a8529440a9f83e3c. --- arrow-data-source/script/build_arrow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-data-source/script/build_arrow.sh b/arrow-data-source/script/build_arrow.sh index 5c7baaeb0..d8ec40128 100755 --- a/arrow-data-source/script/build_arrow.sh +++ b/arrow-data-source/script/build_arrow.sh @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}" echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}" mkdir -p $ARROW_SOURCE_DIR mkdir -p $ARROW_INSTALL_DIR -git clone https://github.com/PHILO-HE/arrow.git --branch unixtime-func $ARROW_SOURCE_DIR +git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR pushd $ARROW_SOURCE_DIR cmake ./cpp \