Skip to content

Commit

Permalink
[DOP-18631] - add partial support for ArrayType (#8)
Browse files Browse the repository at this point in the history
* [DOP-16990] - implement Array(T) mapping & test

* [DOP-16990] - add logic for writing ArrayType(...) to Clickhouse Array(...)

* [DOP-18631] - add logic for writing ArrayType(...) to Clickhouse Array(...)

* Apply code quality checks (auto-formatted)

* [DOP-18631] - update test

* Apply code quality checks (auto-formatted)

* [DOP-18631] - update documentation

---------

Co-authored-by: GitHub Actions Bot <actions@github.com>
  • Loading branch information
maxim-lixakov and actions-user authored Aug 5, 2024
1 parent 2320a27 commit 22e74eb
Show file tree
Hide file tree
Showing 5 changed files with 499 additions and 31 deletions.
71 changes: 59 additions & 12 deletions docs/data_type_mappings.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,66 @@ This documentation outlines the customized mappings that the Spark Dialect Exten

#### Customized Type Mappings with Spark Dialect Extension

| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|----------------------------|--------------------------------|-------------------------------|-----------------------------|
| `Int8` | `ByteType` | `Int8` | `Int8` |
| `Int16` | `ShortType` | `Int16` | `Int16` |
| `Datetime64(6)` | `TimestampType` | `Datetime64(6)` | `Datetime64(6)` |
| `Bool` | `BooleanType` | `Bool` | `Bool` |
| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|------------------------|----------------------|-------------------------|--------------------------|
| `Bool` | `BooleanType` | `Bool` | `Bool` |
| `Int8` | `ByteType` | `Int8` | `Int8` |
| `Int16` | `ShortType` | `Int16` | `Int16` |
| `Int32` | `IntegerType` | `Int32` | `Int32` |
| `Int64` | `LongType` | `Int64` | `Int64` |
| `UInt8` | `ShortType` | `UInt8` | `UInt8` |
| `UInt16` | `IntegerType` | `UInt16` | `UInt16` |
| `UInt32` | `LongType` | `Int64` | `Int64` |
| `UInt64` | `DecimalType(20, 0)` | `Decimal(20, 0)` | `Decimal(20, 0)` |
| `Float32` | `FloatType` | `Float32` | `Float32` |
| `Float64` | `DoubleType` | `Float64` | `Float64` |
| `Decimal(M, N)` | `DecimalType(M, N)` | `Decimal(M, N)` | `Decimal(M, N)` |
| `Decimal32(N)` | `DecimalType(M, N)` | `Decimal32(M, N)` | `Decimal32(M, N)` |
| `Decimal64(N)` | `DecimalType(M, N)` | `Decimal64(M, N)` | `Decimal64(M, N)` |
| `Decimal128(N)` | `DecimalType(M, N)` | `Decimal128(M, N)` | `Decimal128(M, N)` |
| `Decimal256(N)` | unsupported | unsupported | unsupported |
| `DateTime` | `TimestampType` | `DateTime` | `DateTime` |
| `Datetime64(6)` | `TimestampType` | `Datetime64(6)` | `Datetime64(6)` |


``Array(T)`` `->` ``ArrayType(T)``:

| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|------------------------|--------------------------------|-------------------------|--------------------------|
| `Array(String)` | `ArrayType(StringType)` | `Array(String)` | `Array(String)` |
| unsupported | `ArrayType(ByteType)` | `Array(Int8)` | `Array(Int8)` |
| unsupported | `ArrayType(ShortType)` | unsupported | unsupported |
| unsupported | `ArrayType(LongType)` | `Array(Int64)` | `Array(Int64)` |
| `Array(Decimal(M, N))` | `ArrayType(DecimalType(M, N))` | `Array(Decimal(M, N))` | `Array(Decimal(M, N))` |
| unsupported | `ArrayType(TimestampType)` | unsupported | unsupported |
| unsupported | `ArrayType(Date)` | `Array(Date)` | `Array(Date)` |
| unsupported | `ArrayType(FloatType)` | `Array(Float32)` | `Array(Float32)` |
| unsupported | `ArrayType(DoubleType)` | unsupported | unsupported |


#### Default Type Mappings without Spark Dialect Extension

| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|----------------------------|--------------------------------|-------------------------------|-----------------------------|
| `Int8` | `IntegerType` | `Int32` | `Int32` |
| `Int16` | `IntegerType` | `Int32` | `Int32` |
| `Datetime64(6)` | `TimestampType` | `Datetime64(6)` | `DateTime32` |
| `Bool` | `BooleanType` | `Bool` | `UInt64` |
| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|------------------------|----------------------|-------------------------|--------------------------|
| `Bool` | `BooleanType` | `Bool` | `UInt64` |
| `Int8` | `IntegerType` | `Int32` | `Int32` |
| `Int16` | `IntegerType` | `Int32` | `Int32` |
| `Int32` | `IntegerType` | `Int32` | `Int32` |
| `Int64` | `LongType` | `Int64` | `Int64` |
| `UInt8` | `IntegerType` | `UInt8` | `UInt8` |
| `UInt16` | `IntegerType` | `UInt16` | `UInt16` |
| `UInt32` | `DecimalType(20, 0)` | `Decimal(20, 0)` | `Decimal(20, 0)` |
| `UInt64` | `DecimalType(20, 0)` | `Decimal(20, 0)` | `Decimal(20, 0)` |
| `Float32` | `FloatType` | `Float32` | `Float32` |
| `Float64` | `DoubleType` | `Float64` | `Float64` |
| `Decimal(M, N)` | `DecimalType(M, N)` | `Decimal(M, N)` | `Decimal(M, N)` |
| `Decimal32(N)` | `DecimalType(M, N)` | `Decimal32(M, N)` | `Decimal32(M, N)` |
| `Decimal64(N)` | `DecimalType(M, N)` | `Decimal64(M, N)` | `Decimal64(M, N)` |
| `Decimal128(N)` | `DecimalType(M, N)` | `Decimal128(M, N)` | `Decimal128(M, N)` |
| `Decimal256(N)` | unsupported | unsupported | unsupported |
| `DateTime` | `TimestampType` | `DateTime` | `DateTime` |
| `Datetime64(6)` | `TimestampType` | `Datetime64(6)` | `DateTime32` |

``Array(T)`` `->` ``ArrayType(T)``:

**unsupported**
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
//SPDX-License-Identifier: Apache-2.0
package ru.mts.doetl.sparkdialectextensions

import scala.util.matching.Regex
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
import org.apache.spark.sql.execution.datasources.jdbc.{JdbcUtils}
import org.apache.spark.sql.types._
import org.slf4j.LoggerFactory
import java.sql.Types
Expand All @@ -11,41 +11,137 @@ private object ClickhouseDialectExtension extends JdbcDialect {

private val logger = LoggerFactory.getLogger(getClass)

private val arrayTypePattern: Regex = """^Array\((.*)\)$""".r
private val nullableTypePattern: Regex = """^Nullable\((.*)\)$""".r
private val dateTypePattern: Regex = """(?i)^Date$""".r
private val dateTimeTypePattern: Regex = """(?i)^DateTime(64)?(\((.*)\))?$""".r
private val decimalTypePattern: Regex = """(?i)^Decimal\((\d+),\s*(\d+)\)$""".r
private val decimalTypePattern2: Regex = """(?i)^Decimal(32|64|128|256)\((\d+)\)$""".r

override def canHandle(url: String): Boolean = {
url.startsWith("jdbc:clickhouse")
}

/**
* A mock method to demonstrate the retrieval of the Catalyst type based on JDBC metadata.
* A method to demonstrate the retrieval of the Catalyst type based on JDBC metadata.
*
* @param sqlType
* SQL type as integer
* @param typeName
* Name of the SQL type
* @param size
* Size of the type (not used in mock)
* Size of the type
* @param md
* MetadataBuilder for further metadata handling (not used in mock)
* MetadataBuilder for further metadata handling
* @return
* Always returns None in this mock
* The corresponding Catalyst data type.
*/
override def getCatalystType(
sqlType: Int,
typeName: String,
size: Int,
md: MetadataBuilder): Option[DataType] = (sqlType, typeName) match {
case (Types.TINYINT, "Int8") =>
logger.debug("Custom mapping applied: ByteType for 'Int8'")
Some(ByteType)
case (Types.SMALLINT, "Int16") =>
logger.debug("Custom mapping applied: ShortType for 'Int16'")
Some(ShortType)
case _ =>
logger.debug(
s"No custom JDBC type mapping for sqlType: $sqlType, typeName: $typeName, default driver mapping is used")
None
md: MetadataBuilder): Option[DataType] = {
val scale = md.build.getLong("scale").toInt
sqlType match {
case Types.ARRAY =>
unwrapNullable(typeName) match {
case (_, arrayTypePattern(nestType)) =>
// due to https://github.com/ClickHouse/clickhouse-java/issues/1754, spark is not able to read Arrays of
// any types except Decimal(...) and String
toCatalystType(Types.ARRAY, nestType, size, scale, md).map {
case (nullable, dataType) => ArrayType(dataType, nullable)
}
case _ => None
}
case _ => toCatalystType(sqlType, typeName, size, scale, md).map(_._2)
}
}

private def toCatalystType(
sqlType: Int,
typeName: String,
precision: Int,
scale: Int,
md: MetadataBuilder): Option[(Boolean, DataType)] = {
val (nullable, _typeName) = unwrapNullable(typeName)
val dataType = _typeName match {
case "String" =>
logger.debug(s"Custom mapping applied: StringType for '${_typeName}'")
Some(StringType)
case "Int8" =>
logger.debug(s"Custom mapping applied: ByteType for 'Int8'")
Some(ByteType)
case "UInt8" | "Int16" =>
logger.debug(s"Custom mapping applied: ShortType for '${_typeName}'")
Some(ShortType)
case "UInt16" | "Int32" =>
logger.debug(s"Custom mapping applied: IntegerType for '${_typeName}'")
Some(IntegerType)
case "UInt32" | "Int64" =>
logger.debug(s"Custom mapping applied: LongType for '${_typeName}'")
Some(LongType)
case "Int128" | "Int256" | "UInt256" =>
logger.debug(s"Type '${_typeName}' is not supported")
None
case "Float32" =>
logger.debug(s"Custom mapping applied: FloatType for 'Float32'")
Some(FloatType)
case "Float64" =>
logger.debug(s"Custom mapping applied: DoubleType for 'Float64'")
Some(DoubleType)
case dateTypePattern() =>
logger.debug(s"Custom mapping applied: DateType for '${_typeName}'")
Some(DateType)
case dateTimeTypePattern() =>
logger.debug(s"Custom mapping applied: TimestampType for '${_typeName}'")
Some(TimestampType)
case decimalTypePattern(precision, scale) =>
logger.debug(
s"Custom mapping applied: DecimalType($precision, $scale) for '${_typeName}'")
Some(DecimalType(precision.toInt, scale.toInt))
case decimalTypePattern2(w, scale) =>
w match {
case "32" =>
logger.debug(s"Custom mapping applied: DecimalType(9, $scale) for 'Decimal$w'")
Some(DecimalType(9, scale.toInt))
case "64" =>
logger.debug(s"Custom mapping applied: DecimalType(18, $scale) for 'Decimal$w'")
Some(DecimalType(18, scale.toInt))
case "128" =>
logger.debug(s"Custom mapping applied: DecimalType(38, $scale) for 'Decimal$w'")
Some(DecimalType(38, scale.toInt))
case "256" =>
logger.debug(s"Custom mapping applied: DecimalType(76, $scale) for 'Decimal$w'")
Some(
DecimalType(76, scale.toInt)
) // throw exception, spark support precision up to 38
}
case _ =>
logger.debug(
s"No custom mapping for typeName: ${_typeName}, default driver mapping is used")
None
}
dataType.map((nullable, _))
}

/**
* Unwraps nullable types to determine if the type is nullable and to retrieve the base type.
* This logic is copied from the Housepower project.
*
* @see
* https://github.com/housepower/ClickHouse-Native-JDBC
* @param maybeNullableTypeName
* The type name that may include Nullable.
* @return
* A tuple where the first element indicates if the type is nullable, and the second element
* is the base type.
*/
private def unwrapNullable(maybeNullableTypeName: String): (Boolean, String) =
maybeNullableTypeName match {
case nullableTypePattern(typeName) => (true, typeName)
case _ => (false, maybeNullableTypeName)
}

/**
* Retrieve the jdbc / sql type for a given datatype. Logging the usage of the dialect extension
* info.
Expand All @@ -64,6 +160,11 @@ private object ClickhouseDialectExtension extends JdbcDialect {
case TimestampType =>
logger.debug("Custom mapping applied: Datetime64(6) for 'TimestampType'")
Some(JdbcType("Datetime64(6)", Types.TIMESTAMP))
case ArrayType(et, _) =>
logger.debug("Custom mapping applied: Array[T_1] for ArrayType(T_0)")
getJDBCType(et)
.orElse(JdbcUtils.getCommonJDBCType(et))
.map(jdbcType => JdbcType(s"Array(${jdbcType.databaseTypeDefinition})", Types.ARRAY))
case _ =>
logger.debug(
s"No custom JDBC type mapping for DataType: ${dt.simpleString}, default driver mapping is used")
Expand Down
Loading

0 comments on commit 22e74eb

Please sign in to comment.