Skip to content

Commit

Permalink
[DOP-16967] Allow writing ArrayType(TimestampType) from Spark to Clic…
Browse files Browse the repository at this point in the history
…khouse
  • Loading branch information
dolfinus committed Oct 2, 2024
1 parent 5d72653 commit 646aeb8
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 23 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
## 0.0.2 (2024-10-02)

* Allow writing ``ArrayType(TimestampType())` Spark column as Clickhouse's `Array(DateTime64(6))`.
* Allow writing ``ArrayType(ShortType())` Spark column as Clickhouse's `Array(Int16)`.

## 0.0.1 (2024-10-01)

First release! 🎉

This version includes custom Clickhouse dialect for Apache Spark 3.5.x, with following enhancements:
* support for writing Spark's `ArrayType` to Clickhouse. Currently [only few types](https://github.com/ClickHouse/clickhouse-java/issues/1754) are supported, like `ArrayType(StringType)`, `ArrayType(ByteType)`, `ArrayType(LongType)`, `ArrayType(FloatType)`. Unfortunately, reading Arrays from Clickhouse to Spark is not fully supported for now.
* fixed issue when writing Spark's `TimestampType` lead to creating Clickhouse table with `DateTime64(0)` instead of `DateTime64(6)`, resulting a precision loss (fractions of seconds were dropped).
* fixed issue when writing Spark's `BooleanType` lead to creating Clickhouse table with `UInt64` column instead of `Bool`.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ plugins {
group = "io.github.mtsongithub.doetl"
// TODO: crossbuild for Scala 2.13
archivesBaseName = "spark-dialect-extension_2.12"
version = "0.0.1"
version = "0.0.2"

repositories {
mavenCentral()
Expand Down
16 changes: 11 additions & 5 deletions docs/data_type_mappings.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ Primitive types:
| `Decimal64(N)` | `DecimalType(M, N)` | `Decimal64(M, N)` | `Decimal64(M, N)` |
| `Decimal128(N)` | `DecimalType(M, N)` | `Decimal128(M, N)` | `Decimal128(M, N)` |
| `Decimal256(N)` | unsupported | unsupported | unsupported |
| `DateTime` | `TimestampType` | `DateTime` | `DateTime` |
| `Date` | `DateType` | `Date` | `Date` |
| `DateTime` | `TimestampType` | `DateTime` | `DateTime` |
| `DateTime64(6)` | `TimestampType` | `DateTime64(6)` | `DateTime64(6) (Spark's default is DateTime32)` |


Expand All @@ -34,10 +35,15 @@ Primitive types:
|------------------------|--------------------------------|-------------------------|--------------------------|
| `Array(String)` | `ArrayType(StringType)` | `Array(String)` | `Array(String)` |
| unsupported | `ArrayType(ByteType)` | `Array(Int8)` | `Array(Int8)` |
| unsupported | `ArrayType(ShortType)` | unsupported | unsupported |
| unsupported | `ArrayType(ShortType)` | `Array(Int16)` | `Array(Int16)` |
| unsupported | `ArrayType(IntegerType)` | `Array(Int32)` | `Array(Int32)` |
| unsupported | `ArrayType(LongType)` | `Array(Int64)` | `Array(Int64)` |
| `Array(Decimal(M, N))` | `ArrayType(DecimalType(M, N))` | `Array(Decimal(M, N))` | `Array(Decimal(M, N))` |
| unsupported | `ArrayType(TimestampType)` | unsupported | unsupported |
| unsupported | `ArrayType(Date)` | `Array(Date)` | `Array(Date)` |
| unsupported | `ArrayType(FloatType)` | `Array(Float32)` | `Array(Float32)` |
| unsupported | `ArrayType(DoubleType)` | unsupported | unsupported |
| unsupported | `ArrayType(DoubleType)` | `Array(Float64)` | `Array(Float64)` |
| unsupported | `ArrayType(Date)` | `Array(Date)` | `Array(Date)` |
| unsupported | `ArrayType(TimestampType)` | `Array(DateTime64(6))` | `Array(DateTime64(6))` |

Reading issues are caused by Clickhouse JDBC implementation:
* https://github.com/ClickHouse/clickhouse-java/issues/1754
* https://github.com/ClickHouse/clickhouse-java/issues/1409
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ private object ClickhouseDialectExtension extends JdbcDialect {

private val logger = LoggerFactory.getLogger(getClass)

private val arrayTypePattern: Regex = """^Array\((.*)\)$""".r
private val nullableTypePattern: Regex = """^Nullable\((.*)\)$""".r
private val arrayTypePattern: Regex = """(?i)^Array\((.*)\)$""".r
private val nullableTypePattern: Regex = """(?i)^Nullable\((.*)\)$""".r
private val dateTypePattern: Regex = """(?i)^Date$""".r
private val dateTimeTypePattern: Regex = """(?i)^DateTime(64)?(\((.*)\))?$""".r
private val dateTimeTypePattern: Regex = """(?i)^DateTime(\d+)?(?:\((\d+)\))?$""".r
private val decimalTypePattern: Regex = """(?i)^Decimal\((\d+),\s*(\d+)\)$""".r
private val decimalTypePattern2: Regex = """(?i)^Decimal(32|64|128|256)\((\d+)\)$""".r

Expand Down Expand Up @@ -94,7 +94,7 @@ private object ClickhouseDialectExtension extends JdbcDialect {
case dateTypePattern() =>
logger.debug(s"Custom mapping applied: DateType for '${_typeName}'")
Some(DateType)
case dateTimeTypePattern() =>
case dateTimeTypePattern(_, _) =>
logger.debug(s"Custom mapping applied: TimestampType for '${_typeName}'")
Some(TimestampType)
case decimalTypePattern(precision, scale) =>
Expand Down Expand Up @@ -158,12 +158,15 @@ private object ClickhouseDialectExtension extends JdbcDialect {
Some(JdbcType("Bool", Types.BOOLEAN))
case ShortType =>
logger.debug("Custom mapping applied: Int16 for 'ShortType'")
Some(JdbcType("Int16", Types.SMALLINT))
// Using literal `Int16` fails on Spark 3.x - Spark converts type names to lowercase,
// but Clickhouse type names are case-sensitive. See https://issues.apache.org/jira/browse/SPARK-46612
// Using SMALLINT as alias for Int16, which is case-insensitive.
Some(JdbcType("SMALLINT", Types.SMALLINT))
case TimestampType =>
logger.debug("Custom mapping applied: Datetime64(6) for 'TimestampType'")
Some(JdbcType("Datetime64(6)", Types.TIMESTAMP))
case ArrayType(et, _) =>
logger.debug("Custom mapping applied: Array[T_1] for ArrayType(T_0)")
logger.debug("Custom mapping applied: Array[T] for ArrayType(T)")
getJDBCType(et)
.orElse(JdbcUtils.getCommonJDBCType(et))
.map(jdbcType => JdbcType(s"Array(${jdbcType.databaseTypeDefinition})", Types.ARRAY))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse

import io.github.mtsongithub.doetl.sparkdialectextensions.SharedSparkSession
import org.mockito.Mockito._
import org.apache.spark.SparkException
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.mockito.Mockito._
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.types._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.{TableDrivenPropertyChecks, TableFor3}
import org.scalatest.prop.TableDrivenPropertyChecks

class ClickhouseDialectTest
extends AnyFunSuite
Expand Down Expand Up @@ -444,7 +445,7 @@ class ClickhouseDialectTest
statement.close()
}

val testReadArrayCases: TableFor3[String, String, DataType] = Table(
val testReadArrayCases = Table(
("columnDefinition", "insertedData", "expectedType"),
(
"charArrayColumn Array(String)",
Expand Down Expand Up @@ -474,6 +475,69 @@ class ClickhouseDialectTest
}
}

val testReadArrayUnsupportedCases = Table(
("columnDefinition", "insertedData", "expectedType", "errorMessage"),
// https://github.com/ClickHouse/clickhouse-java/issues/1754
(
"byteArrayColumn Array(Int8)",
"([1, 2, 3, 4, 5])",
ArrayType(ByteType, containsNull = false),
"class [B cannot be cast to class [Ljava.lang.Object;"),
(
"shortArrayColumn Array(Int16)",
"([1, 2, 3, 4, 5])",
ArrayType(ShortType, containsNull = false),
"class [S cannot be cast to class [Ljava.lang.Object;"),
(
"intArrayColumn Array(Int32)",
"([1, 2, 3, 4, 5])",
ArrayType(IntegerType, containsNull = false),
"class [I cannot be cast to class [Ljava.lang.Object;"),
(
"longArrayColumn Array(Int64)",
"([1, 2, 3, 4, 5])",
ArrayType(LongType, containsNull = false),
"class [J cannot be cast to class [Ljava.lang.Object"),
// https://github.com/ClickHouse/clickhouse-java/issues/1409
(
"dateArrayColumn Array(Date)",
"(['2024-01-01', '2024-01-02', '2024-01-03'])",
ArrayType(DateType, containsNull = false),
"class [Ljava.time.LocalDate; cannot be cast to class [Ljava.sql.Date;"),
(
"datetimeArrayColumn Array(DateTime64(6))",
"(['2024-01-01T00:00:00.000000', '2024-01-02T11:11:11.111111', '2024-01-03.2222222'])",
ArrayType(TimestampType, containsNull = false),
"class [Ljava.time.LocalDateTime; cannot be cast to class [Ljava.sql.Timestamp;"))

forAll(testReadArrayUnsupportedCases) {
(
columnDefinition: String,
insertedData: String,
expectedType: DataType,
errorMessage: String) =>
test(s"cannot read ClickHouse Array for ${columnDefinition} column") {
setupTable(columnDefinition)
insertTestData(Seq(insertedData))

// schema is detected properly
val df = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.load()

assert(df.schema.fields.head.dataType === expectedType)

// but read is failing
val exception = intercept[SparkException] {
df.collect()
}
// check the exception message
exception.getMessage should include(errorMessage)
}
}

val testWriteArrayCases = Table(
("columnName", "insertedData", "expectedType", "expectedClickhouseType"),
(
Expand All @@ -486,6 +550,11 @@ class ClickhouseDialectTest
Seq(Row(Array(1.toByte, 2.toByte, 3.toByte, 4.toByte, 5.toByte))),
ArrayType(ByteType, containsNull = false),
"Array(Int8)"),
(
"shortArrayColumn",
Seq(Row(Array(1.toShort, 2.toShort, 3.toShort, 4.toShort, 5.toShort))),
ArrayType(ShortType, containsNull = false),
"Array(Int16)"),
(
"intArrayColumn",
Seq(Row(Array(1, 2, 3, 4, 5))),
Expand All @@ -501,6 +570,22 @@ class ClickhouseDialectTest
Seq(Row(Array(1.0f, 2.0f, 3.0f, 4.0f, 5.0f))),
ArrayType(FloatType, containsNull = false),
"Array(Float32)"),
(
"doubleArrayColumn",
Seq(Row(Array(1.0d, 2.0d, 3.0d, 4.0d, 5.0d))),
ArrayType(DoubleType, containsNull = false),
"Array(Float64)"),
(
"decimalArrayColumn",
Seq(
Row(Array(
new java.math.BigDecimal("1.23"),
new java.math.BigDecimal("2.34"),
new java.math.BigDecimal("3.45"),
new java.math.BigDecimal("4.56"),
new java.math.BigDecimal("5.67")))),
ArrayType(DecimalType(9, 2), containsNull = false),
"Array(Decimal(9, 2))"),
(
"dateArrayColumn",
Seq(
Expand All @@ -512,16 +597,14 @@ class ClickhouseDialectTest
ArrayType(DateType, containsNull = false),
"Array(Date)"),
(
"decimalArrayColumn",
"datetimeArrayColumn",
Seq(
Row(Array(
new java.math.BigDecimal("1.23"),
new java.math.BigDecimal("2.34"),
new java.math.BigDecimal("3.45"),
new java.math.BigDecimal("4.56"),
new java.math.BigDecimal("5.67")))),
ArrayType(DecimalType(9, 2), containsNull = false),
"Array(Decimal(9, 2))"))
java.sql.Timestamp.valueOf("2022-01-01 00:00:00.000000"),
java.sql.Timestamp.valueOf("2022-01-02 11:11:11.111111"),
java.sql.Timestamp.valueOf("2022-01-03 22:22:22.222222")))),
ArrayType(TimestampType, containsNull = false),
"Array(DateTime64(6))"))

forAll(testWriteArrayCases) {
(
Expand Down

0 comments on commit 646aeb8

Please sign in to comment.