Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Use clickhouse java client to parse schema #215

Merged
merged 1 commit into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,59 +14,59 @@

package org.apache.spark.sql.clickhouse

import scala.util.matching.Regex

import com.clickhouse.client.ClickHouseDataType._
import com.clickhouse.client.{ClickHouseColumn, ClickHouseDataType}
import org.apache.spark.sql.types._
import xenon.clickhouse.exception.CHClientException

object SchemaUtils {

// format: off
private[clickhouse] val arrayTypePattern: Regex = """^Array\((.+)\)$""".r
private[clickhouse] val mapTypePattern: Regex = """^Map\((\w+),\s*(.+)\)$""".r
private[clickhouse] val dateTypePattern: Regex = """^Date$""".r
private[clickhouse] val dateTimeTypePattern: Regex = """^DateTime(64)?(\((.*)\))?$""".r
private[clickhouse] val decimalTypePattern: Regex = """^Decimal\((\d+),\s*(\d+)\)$""".r
private[clickhouse] val decimalTypePattern2: Regex = """^Decimal(32|64|128|256)\((\d+)\)$""".r
private[clickhouse] val enumTypePattern: Regex = """^Enum(8|16)$""".r
private[clickhouse] val fixedStringTypePattern: Regex = """^FixedString\((\d+)\)$""".r
private[clickhouse] val nullableTypePattern: Regex = """^Nullable\((.*)\)""".r
private[clickhouse] val lowCardinalityTypePattern: Regex = """^LowCardinality\((.*)\)""".r
// format: on

def fromClickHouseType(chType: String): (DataType, Boolean) = {
val (unwrappedChType, nullable) = unwrapNullable(unwrapLowCardinalityTypePattern(chType))
val catalystType = unwrappedChType match {
case "String" | "UUID" | fixedStringTypePattern() | enumTypePattern(_) => StringType
case "Bool" => BooleanType
case "Int8" => ByteType
case "UInt8" | "Int16" => ShortType
case "UInt16" | "Int32" => IntegerType
case "UInt32" | "Int64" | "UInt64" | "IPv4" => LongType
case "Int128" | "Int256" | "UInt256" =>
throw CHClientException(s"unsupported type: $chType") // not support
case "Float32" => FloatType
case "Float64" => DoubleType
case dateTypePattern() => DateType
case dateTimeTypePattern(_, _, _) => TimestampType
case decimalTypePattern(precision, scale) => DecimalType(precision.toInt, scale.toInt)
case decimalTypePattern2(w, scale) => w match {
case "32" => DecimalType(9, scale.toInt)
case "64" => DecimalType(18, scale.toInt)
case "128" => DecimalType(38, scale.toInt)
case "256" => DecimalType(76, scale.toInt) // throw exception, spark support precision up to 38
}
case arrayTypePattern(nestedChType) =>
val (_type, _nullable) = fromClickHouseType(nestedChType)
ArrayType(_type, _nullable)
case mapTypePattern(keyChType, valueChType) =>
val (_keyType, _keyNullable) = fromClickHouseType(keyChType)
require(!_keyNullable, s"Illegal type: $keyChType, the key type of Map should not be nullable")
val (_valueType, _valueNullable) = fromClickHouseType(valueChType)
MapType(_keyType, _valueType, _valueNullable)
case _ => throw CHClientException(s"Unsupported type: $chType")
def fromClickHouseType(chColumn: ClickHouseColumn): (DataType, Boolean) = {
val catalystType = chColumn.getDataType match {
case Nothing => NullType
case Bool => BooleanType
case String | FixedString | JSON | UUID | Enum | Enum8 | Enum16 | IPv4 | IPv6 => StringType
case Int8 => ByteType
case UInt8 | Int16 => ShortType
case UInt16 | Int32 => IntegerType
case UInt32 | Int64 | UInt64 => LongType
case Int128 | UInt128 | Int256 | UInt256 => DecimalType(38, 0)
case Float32 => FloatType
case Float64 => DoubleType
case Date | Date32 => DateType
case DateTime | DateTime32 | DateTime64 => TimestampType
case ClickHouseDataType.Decimal if chColumn.getScale <= 38 =>
DecimalType(chColumn.getPrecision, chColumn.getScale)
case Decimal32 => DecimalType(9, chColumn.getScale)
case Decimal64 => DecimalType(18, chColumn.getScale)
case Decimal128 => DecimalType(38, chColumn.getScale)
case IntervalYear => YearMonthIntervalType(YearMonthIntervalType.YEAR)
case IntervalMonth => YearMonthIntervalType(YearMonthIntervalType.MONTH)
case IntervalDay => DayTimeIntervalType(DayTimeIntervalType.DAY)
case IntervalHour => DayTimeIntervalType(DayTimeIntervalType.HOUR)
case IntervalMinute => DayTimeIntervalType(DayTimeIntervalType.MINUTE)
case IntervalSecond => DayTimeIntervalType(DayTimeIntervalType.SECOND)
case Array =>
val elementChCols = chColumn.getNestedColumns
assert(elementChCols.size == 1)
val (elementType, elementNullable) = fromClickHouseType(elementChCols.get(0))
ArrayType(elementType, elementNullable)
case Map =>
val kvChCols = chColumn.getNestedColumns
assert(kvChCols.size == 2)
val (keyChType, valueChType) = (kvChCols.get(0), kvChCols.get(1))
val (keyType, keyNullable) = fromClickHouseType(keyChType)
require(
!keyNullable,
s"Illegal type: ${keyChType.getOriginalTypeName}, the key type of Map should not be nullable"
)
val (valueType, valueNullable) = fromClickHouseType(valueChType)
MapType(keyType, valueType, valueNullable)
case Object | Nested | Tuple | Point | Polygon | MultiPolygon | Ring | IntervalQuarter | IntervalWeek |
Decimal256 | AggregateFunction | SimpleAggregateFunction =>
throw CHClientException(s"Unsupported type: ${chColumn.getOriginalTypeName}")
}
(catalystType, nullable)
(catalystType, chColumn.isNullable)
}

def toClickHouseType(catalystType: DataType): String =
Expand All @@ -92,11 +92,12 @@ object SchemaUtils {
}

def fromClickHouseSchema(chSchema: Seq[(String, String)]): StructType = {
val structFields = chSchema
.map { case (name, maybeNullableType) =>
val (catalyst, nullable) = fromClickHouseType(maybeNullableType)
StructField(name, catalyst, nullable)
}
val structFields = chSchema.map { case (name, maybeNullableType) =>
val chCols = ClickHouseColumn.parse(s"`$name` $maybeNullableType")
assert(chCols.size == 1)
val (sparkType, nullable) = fromClickHouseType(chCols.get(0))
StructField(name, sparkType, nullable)
}
StructType(structFields)
}

Expand All @@ -111,16 +112,4 @@ object SchemaUtils {
if (nullable) wrapNullable(chType) else chType

private[clickhouse] def wrapNullable(chType: String): String = s"Nullable($chType)"

private[clickhouse] def unwrapNullable(maybeNullableType: String): (String, Boolean) =
maybeNullableType match {
case nullableTypePattern(typeName) => (typeName, true)
case _ => (maybeNullableType, false)
}

private[clickhouse] def unwrapLowCardinalityTypePattern(maybeLowCardinalityType: String): String =
maybeLowCardinalityType match {
case lowCardinalityTypePattern(typeName) => typeName
case _ => maybeLowCardinalityType
}
}
Loading