From 646aeb84cc7288c2ca3a5ca330b301eafb9bcc23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 2 Oct 2024 15:42:22 +0300 Subject: [PATCH] [DOP-16967] Allow writing ArrayType(TimestampType) from Spark to Clickhouse --- CHANGELOG.md | 13 +++ build.gradle | 2 +- docs/data_type_mappings.md | 16 ++- .../spark35/ClickhouseDialectExtension.scala | 15 ++- .../clickhouse/ClickhouseDialectTest.scala | 105 ++++++++++++++++-- 5 files changed, 128 insertions(+), 23 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..bb4ad78 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,13 @@ +## 0.0.2 (2024-10-02) + +* Allow writing ``ArrayType(TimestampType())` Spark column as Clickhouse's `Array(DateTime64(6))`. +* Allow writing ``ArrayType(ShortType())` Spark column as Clickhouse's `Array(Int16)`. + +## 0.0.1 (2024-10-01) + +First release! 🎉 + +This version includes custom Clickhouse dialect for Apache Spark 3.5.x, with following enhancements: +* support for writing Spark's `ArrayType` to Clickhouse. Currently [only few types](https://github.com/ClickHouse/clickhouse-java/issues/1754) are supported, like `ArrayType(StringType)`, `ArrayType(ByteType)`, `ArrayType(LongType)`, `ArrayType(FloatType)`. Unfortunately, reading Arrays from Clickhouse to Spark is not fully supported for now. +* fixed issue when writing Spark's `TimestampType` lead to creating Clickhouse table with `DateTime64(0)` instead of `DateTime64(6)`, resulting a precision loss (fractions of seconds were dropped). +* fixed issue when writing Spark's `BooleanType` lead to creating Clickhouse table with `UInt64` column instead of `Bool`. diff --git a/build.gradle b/build.gradle index 036f09c..5ad365b 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ plugins { group = "io.github.mtsongithub.doetl" // TODO: crossbuild for Scala 2.13 archivesBaseName = "spark-dialect-extension_2.12" -version = "0.0.1" +version = "0.0.2" repositories { mavenCentral() diff --git a/docs/data_type_mappings.md b/docs/data_type_mappings.md index b94b518..f7ccfc7 100644 --- a/docs/data_type_mappings.md +++ b/docs/data_type_mappings.md @@ -24,7 +24,8 @@ Primitive types: | `Decimal64(N)` | `DecimalType(M, N)` | `Decimal64(M, N)` | `Decimal64(M, N)` | | `Decimal128(N)` | `DecimalType(M, N)` | `Decimal128(M, N)` | `Decimal128(M, N)` | | `Decimal256(N)` | unsupported | unsupported | unsupported | -| `DateTime` | `TimestampType` | `DateTime` | `DateTime` | +| `Date` | `DateType` | `Date` | `Date` | +| `DateTime` | `TimestampType` | `DateTime` | `DateTime` | | `DateTime64(6)` | `TimestampType` | `DateTime64(6)` | `DateTime64(6) (Spark's default is DateTime32)` | @@ -34,10 +35,15 @@ Primitive types: |------------------------|--------------------------------|-------------------------|--------------------------| | `Array(String)` | `ArrayType(StringType)` | `Array(String)` | `Array(String)` | | unsupported | `ArrayType(ByteType)` | `Array(Int8)` | `Array(Int8)` | -| unsupported | `ArrayType(ShortType)` | unsupported | unsupported | +| unsupported | `ArrayType(ShortType)` | `Array(Int16)` | `Array(Int16)` | +| unsupported | `ArrayType(IntegerType)` | `Array(Int32)` | `Array(Int32)` | | unsupported | `ArrayType(LongType)` | `Array(Int64)` | `Array(Int64)` | | `Array(Decimal(M, N))` | `ArrayType(DecimalType(M, N))` | `Array(Decimal(M, N))` | `Array(Decimal(M, N))` | -| unsupported | `ArrayType(TimestampType)` | unsupported | unsupported | -| unsupported | `ArrayType(Date)` | `Array(Date)` | `Array(Date)` | | unsupported | `ArrayType(FloatType)` | `Array(Float32)` | `Array(Float32)` | -| unsupported | `ArrayType(DoubleType)` | unsupported | unsupported | +| unsupported | `ArrayType(DoubleType)` | `Array(Float64)` | `Array(Float64)` | +| unsupported | `ArrayType(Date)` | `Array(Date)` | `Array(Date)` | +| unsupported | `ArrayType(TimestampType)` | `Array(DateTime64(6))` | `Array(DateTime64(6))` | + +Reading issues are caused by Clickhouse JDBC implementation: +* https://github.com/ClickHouse/clickhouse-java/issues/1754 +* https://github.com/ClickHouse/clickhouse-java/issues/1409 diff --git a/src/main/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/spark35/ClickhouseDialectExtension.scala b/src/main/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/spark35/ClickhouseDialectExtension.scala index 0e82c6d..780cb99 100644 --- a/src/main/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/spark35/ClickhouseDialectExtension.scala +++ b/src/main/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/spark35/ClickhouseDialectExtension.scala @@ -13,10 +13,10 @@ private object ClickhouseDialectExtension extends JdbcDialect { private val logger = LoggerFactory.getLogger(getClass) - private val arrayTypePattern: Regex = """^Array\((.*)\)$""".r - private val nullableTypePattern: Regex = """^Nullable\((.*)\)$""".r + private val arrayTypePattern: Regex = """(?i)^Array\((.*)\)$""".r + private val nullableTypePattern: Regex = """(?i)^Nullable\((.*)\)$""".r private val dateTypePattern: Regex = """(?i)^Date$""".r - private val dateTimeTypePattern: Regex = """(?i)^DateTime(64)?(\((.*)\))?$""".r + private val dateTimeTypePattern: Regex = """(?i)^DateTime(\d+)?(?:\((\d+)\))?$""".r private val decimalTypePattern: Regex = """(?i)^Decimal\((\d+),\s*(\d+)\)$""".r private val decimalTypePattern2: Regex = """(?i)^Decimal(32|64|128|256)\((\d+)\)$""".r @@ -94,7 +94,7 @@ private object ClickhouseDialectExtension extends JdbcDialect { case dateTypePattern() => logger.debug(s"Custom mapping applied: DateType for '${_typeName}'") Some(DateType) - case dateTimeTypePattern() => + case dateTimeTypePattern(_, _) => logger.debug(s"Custom mapping applied: TimestampType for '${_typeName}'") Some(TimestampType) case decimalTypePattern(precision, scale) => @@ -158,12 +158,15 @@ private object ClickhouseDialectExtension extends JdbcDialect { Some(JdbcType("Bool", Types.BOOLEAN)) case ShortType => logger.debug("Custom mapping applied: Int16 for 'ShortType'") - Some(JdbcType("Int16", Types.SMALLINT)) + // Using literal `Int16` fails on Spark 3.x - Spark converts type names to lowercase, + // but Clickhouse type names are case-sensitive. See https://issues.apache.org/jira/browse/SPARK-46612 + // Using SMALLINT as alias for Int16, which is case-insensitive. + Some(JdbcType("SMALLINT", Types.SMALLINT)) case TimestampType => logger.debug("Custom mapping applied: Datetime64(6) for 'TimestampType'") Some(JdbcType("Datetime64(6)", Types.TIMESTAMP)) case ArrayType(et, _) => - logger.debug("Custom mapping applied: Array[T_1] for ArrayType(T_0)") + logger.debug("Custom mapping applied: Array[T] for ArrayType(T)") getJDBCType(et) .orElse(JdbcUtils.getCommonJDBCType(et)) .map(jdbcType => JdbcType(s"Array(${jdbcType.databaseTypeDefinition})", Types.ARRAY)) diff --git a/src/test/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/ClickhouseDialectTest.scala b/src/test/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/ClickhouseDialectTest.scala index f673fde..7f63ebb 100644 --- a/src/test/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/ClickhouseDialectTest.scala +++ b/src/test/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/ClickhouseDialectTest.scala @@ -1,14 +1,15 @@ package io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse import io.github.mtsongithub.doetl.sparkdialectextensions.SharedSparkSession +import org.mockito.Mockito._ +import org.apache.spark.SparkException import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession -import org.mockito.Mockito._ import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.types._ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import org.scalatest.prop.{TableDrivenPropertyChecks, TableFor3} +import org.scalatest.prop.TableDrivenPropertyChecks class ClickhouseDialectTest extends AnyFunSuite @@ -444,7 +445,7 @@ class ClickhouseDialectTest statement.close() } - val testReadArrayCases: TableFor3[String, String, DataType] = Table( + val testReadArrayCases = Table( ("columnDefinition", "insertedData", "expectedType"), ( "charArrayColumn Array(String)", @@ -474,6 +475,69 @@ class ClickhouseDialectTest } } + val testReadArrayUnsupportedCases = Table( + ("columnDefinition", "insertedData", "expectedType", "errorMessage"), + // https://github.com/ClickHouse/clickhouse-java/issues/1754 + ( + "byteArrayColumn Array(Int8)", + "([1, 2, 3, 4, 5])", + ArrayType(ByteType, containsNull = false), + "class [B cannot be cast to class [Ljava.lang.Object;"), + ( + "shortArrayColumn Array(Int16)", + "([1, 2, 3, 4, 5])", + ArrayType(ShortType, containsNull = false), + "class [S cannot be cast to class [Ljava.lang.Object;"), + ( + "intArrayColumn Array(Int32)", + "([1, 2, 3, 4, 5])", + ArrayType(IntegerType, containsNull = false), + "class [I cannot be cast to class [Ljava.lang.Object;"), + ( + "longArrayColumn Array(Int64)", + "([1, 2, 3, 4, 5])", + ArrayType(LongType, containsNull = false), + "class [J cannot be cast to class [Ljava.lang.Object"), + // https://github.com/ClickHouse/clickhouse-java/issues/1409 + ( + "dateArrayColumn Array(Date)", + "(['2024-01-01', '2024-01-02', '2024-01-03'])", + ArrayType(DateType, containsNull = false), + "class [Ljava.time.LocalDate; cannot be cast to class [Ljava.sql.Date;"), + ( + "datetimeArrayColumn Array(DateTime64(6))", + "(['2024-01-01T00:00:00.000000', '2024-01-02T11:11:11.111111', '2024-01-03.2222222'])", + ArrayType(TimestampType, containsNull = false), + "class [Ljava.time.LocalDateTime; cannot be cast to class [Ljava.sql.Timestamp;")) + + forAll(testReadArrayUnsupportedCases) { + ( + columnDefinition: String, + insertedData: String, + expectedType: DataType, + errorMessage: String) => + test(s"cannot read ClickHouse Array for ${columnDefinition} column") { + setupTable(columnDefinition) + insertTestData(Seq(insertedData)) + + // schema is detected properly + val df = spark.read + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", tableName) + .load() + + assert(df.schema.fields.head.dataType === expectedType) + + // but read is failing + val exception = intercept[SparkException] { + df.collect() + } + // check the exception message + exception.getMessage should include(errorMessage) + } + } + val testWriteArrayCases = Table( ("columnName", "insertedData", "expectedType", "expectedClickhouseType"), ( @@ -486,6 +550,11 @@ class ClickhouseDialectTest Seq(Row(Array(1.toByte, 2.toByte, 3.toByte, 4.toByte, 5.toByte))), ArrayType(ByteType, containsNull = false), "Array(Int8)"), + ( + "shortArrayColumn", + Seq(Row(Array(1.toShort, 2.toShort, 3.toShort, 4.toShort, 5.toShort))), + ArrayType(ShortType, containsNull = false), + "Array(Int16)"), ( "intArrayColumn", Seq(Row(Array(1, 2, 3, 4, 5))), @@ -501,6 +570,22 @@ class ClickhouseDialectTest Seq(Row(Array(1.0f, 2.0f, 3.0f, 4.0f, 5.0f))), ArrayType(FloatType, containsNull = false), "Array(Float32)"), + ( + "doubleArrayColumn", + Seq(Row(Array(1.0d, 2.0d, 3.0d, 4.0d, 5.0d))), + ArrayType(DoubleType, containsNull = false), + "Array(Float64)"), + ( + "decimalArrayColumn", + Seq( + Row(Array( + new java.math.BigDecimal("1.23"), + new java.math.BigDecimal("2.34"), + new java.math.BigDecimal("3.45"), + new java.math.BigDecimal("4.56"), + new java.math.BigDecimal("5.67")))), + ArrayType(DecimalType(9, 2), containsNull = false), + "Array(Decimal(9, 2))"), ( "dateArrayColumn", Seq( @@ -512,16 +597,14 @@ class ClickhouseDialectTest ArrayType(DateType, containsNull = false), "Array(Date)"), ( - "decimalArrayColumn", + "datetimeArrayColumn", Seq( Row(Array( - new java.math.BigDecimal("1.23"), - new java.math.BigDecimal("2.34"), - new java.math.BigDecimal("3.45"), - new java.math.BigDecimal("4.56"), - new java.math.BigDecimal("5.67")))), - ArrayType(DecimalType(9, 2), containsNull = false), - "Array(Decimal(9, 2))")) + java.sql.Timestamp.valueOf("2022-01-01 00:00:00.000000"), + java.sql.Timestamp.valueOf("2022-01-02 11:11:11.111111"), + java.sql.Timestamp.valueOf("2022-01-03 22:22:22.222222")))), + ArrayType(TimestampType, containsNull = false), + "Array(DateTime64(6))")) forAll(testWriteArrayCases) { (