Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Disable cast string to timestamp by default #337

Merged
merged 8 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,13 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_CAST_STRING_TO_TIMESTAMP: ConfigEntry[Boolean] = conf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should have this configuration at all? If users turn this on they could end up with incorrect data?

"spark.comet.cast.stringToTimestamp")
.doc(
"Comet is not currently fully compatible with Spark when casting from String to Timestamp.")
.booleanConf
.createWithDefault(false)

}

object ConfigHelpers {
Expand Down
6 changes: 6 additions & 0 deletions docs/source/user-guide/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ Comet currently delegates to Apache DataFusion for most cast operations, and thi
guaranteed to be consistent with Spark.

There is an [epic](https://github.com/apache/datafusion-comet/issues/286) where we are tracking the work to implement Spark-compatible cast expressions.

### Cast from String to Timestamp

Casting from String to Timestamp is disabled by default due to incompatibilities with Spark, including timezone
issues, and can be enabled by setting `spark.comet.castStringToTimestamp=true`. See the
[tracking issue](https://github.com/apache/datafusion-comet/issues/328) for more information.
17 changes: 16 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark32, isSpark34Plus, withInfo}
import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc}
import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo}
Expand Down Expand Up @@ -584,7 +585,21 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde {
// Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY
evalMode.toString
}
castToProto(timeZoneId, dt, childExpr, evalModeStr)
val supportedCast = (child.dataType, dt) match {
case (DataTypes.StringType, DataTypes.TimestampType)
if !CometConf.COMET_CAST_STRING_TO_TIMESTAMP.get() =>
// https://github.com/apache/datafusion-comet/issues/328
withInfo(expr, s"${CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key} is disabled")
false
case _ => true
}
if (supportedCast) {
castToProto(timeZoneId, dt, childExpr, evalModeStr)
} else {
// no need to call withInfo here since it was called when determining
// the value for `supportedCast`
None
}
} else {
withInfo(expr, child)
None
Expand Down
34 changes: 31 additions & 3 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,22 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

ignore("cast string to date") {
castTest(generateStrings(datePattern, 8).toDF("a"), DataTypes.DoubleType)
castTest(generateStrings(datePattern, 8).toDF("a"), DataTypes.DateType)
}

test("cast string to timestamp disabled by default") {
val values = Seq("2020-01-01T12:34:56.123456", "T2").toDF("a")
castFallbackTest(
values.toDF("a"),
DataTypes.TimestampType,
"spark.comet.cast.stringToTimestamp is disabled")
}

ignore("cast string to timestamp") {
val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ generateStrings(timestampPattern, 8)
castTest(values.toDF("a"), DataTypes.DoubleType)
withSQLConf((CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key, "true")) {
val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ generateStrings(timestampPattern, 8)
castTest(values.toDF("a"), DataTypes.TimestampType)
}
}

private def generateFloats(): DataFrame = {
Expand All @@ -126,6 +136,24 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
Range(0, dataSize).map(_ => generateString(r, chars, maxLen))
}

private def castFallbackTest(
input: DataFrame,
toType: DataType,
expectedMessage: String): Unit = {
withTempPath { dir =>
val data = roundtripParquet(input, dir).coalesce(1)
data.createOrReplaceTempView("t")

withSQLConf((SQLConf.ANSI_ENABLED.key, "false")) {
val df = data.withColumn("converted", col("a").cast(toType))
df.collect()
val str =
new ExtendedExplainInfo().generateExtendedInfo(df.queryExecution.executedPlan)
assert(str.contains(expectedMessage))
}
}
}

private def castTest(input: DataFrame, toType: DataType): Unit = {
withTempPath { dir =>
val data = roundtripParquet(input, dir).coalesce(1)
Expand Down
Loading