Skip to content

Commit

Permalink
parquet (feature): Encode complex objects as JSON columns (#3343)
Browse files Browse the repository at this point in the history
Previously ParquetWriter used MessagePack for embedding complex object
data into a column. For compatibility with the Parquet ecosystem (e.g.,
DuckDB), using JSON is better.
  • Loading branch information
xerial committed Jan 18, 2024
1 parent 1ee3560 commit 3e4be7f
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import wvlet.airframe.msgpack.spi.MessageException.*
trait Value {
override def toString = toJson
def toJson: String

/**
* Unlike toJson, toUnquotedString does not quote string/timestamp values.
* @return
*/
def toUnquotedString: String = toJson
def valueType: ValueType

/**
Expand Down Expand Up @@ -139,29 +145,30 @@ object Value {
appendJsonString(b, toRawString)
b.result()
}
protected def toRawString: String
def toRawString: String
}

case class StringValue(v: String) extends RawValue {
override def toString = v
override protected def toRawString: String = v
override def valueType: ValueType = ValueType.STRING
override def toString: String = v
override def toUnquotedString: String = v
override def toRawString: String = v
override def valueType: ValueType = ValueType.STRING
override def writeTo(packer: Packer): Unit = {
packer.packString(v)
}
}

case class BinaryValue(v: Array[Byte]) extends RawValue {
@transient private var decodedStringCache: String = null

override def valueType: ValueType = ValueType.BINARY
override def toUnquotedString: String = toRawString
override def valueType: ValueType = ValueType.BINARY
override def writeTo(packer: Packer): Unit = {
packer.packBinaryHeader(v.length)
packer.writePayload(v)
}

// Produces Base64 encoded strings
override protected def toRawString: String = {
override def toRawString: String = {
synchronized {
if (decodedStringCache == null) {
decodedStringCache = Base64.getEncoder.encodeToString(v)
Expand Down Expand Up @@ -212,8 +219,10 @@ object Value {
appendJsonString(b, toRawString)
b.result()
}
def toRawString = v.toString
override def valueType: ValueType = ValueType.EXTENSION // ValueType.TIMESTAMP

override def toUnquotedString: String = toRawString
def toRawString = v.toString
override def valueType: ValueType = ValueType.EXTENSION // ValueType.TIMESTAMP
override def writeTo(packer: Packer): Unit = {
packer.packTimestamp(v)
}
Expand Down Expand Up @@ -244,7 +253,13 @@ object Value {
def isEmpty: Boolean = entries.isEmpty
def nonEmpty: Boolean = entries.nonEmpty
override def toJson: String = {
s"{${entries.map(x => s"${x._1.toJson}:${x._2.toJson}").mkString(",")}}"
entries
.map { kv =>
// JSON requires Map key must be a quoted UTF-8 string
val jsonKey = new StringBuilder()
appendJsonString(jsonKey, kv._1.toUnquotedString)
s"""${jsonKey.result()}:${kv._2.toJson}"""
}.mkString("{", ",", "}")
}
override def valueType: ValueType = ValueType.MAP
override def writeTo(packer: Packer): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package wvlet.airframe.parquet
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter, RecordMaterializer}
import org.apache.parquet.schema.{GroupType, MessageType}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.LogicalTypeAnnotation.stringType
import org.apache.parquet.schema.LogicalTypeAnnotation.{jsonType, stringType}
import wvlet.airframe.codec.MessageCodec
import wvlet.airframe.codec.PrimitiveCodec.ValueCodec
import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport

import scala.jdk.CollectionConverters.*

object ParquetRecordReader extends LogSupport {
Expand Down Expand Up @@ -56,7 +57,21 @@ object ParquetRecordReader extends LogSupport {
}
private class MsgPackConverter(fieldName: String, holder: RecordBuilder) extends PrimitiveConverter {
override def addBinary(value: Binary): Unit = {
holder.add(fieldName, ValueCodec.fromMsgPack(value.getBytes))
val v = ValueCodec.fromMsgPack(value.getBytes)
holder.add(fieldName, v)
}
}
private class JsonConverter(fieldName: String, holder: RecordBuilder) extends PrimitiveConverter {
override def addBinary(value: Binary): Unit = {
val jsonStr = value.toStringUsingUTF8
val obj: Any =
if (jsonStr.startsWith("{") || jsonStr.endsWith("[")) {
// Map to message pack value for handling nested objects
ValueCodec.fromJson(jsonStr)
} else {
jsonStr
}
holder.add(fieldName, obj)
}
}

Expand Down Expand Up @@ -84,8 +99,10 @@ class ParquetRecordReader[A](
case PrimitiveTypeName.BOOLEAN => new BooleanConverter(f.getName, recordBuilder)
case PrimitiveTypeName.FLOAT => new FloatConverter(f.getName, recordBuilder)
case PrimitiveTypeName.DOUBLE => new DoubleConverter(f.getName, recordBuilder)
case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == stringType =>
case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == stringType() =>
new StringConverter(f.getName, recordBuilder)
case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == jsonType() =>
new JsonConverter(f.getName, recordBuilder)
case PrimitiveTypeName.BINARY =>
new MsgPackConverter(f.getName, recordBuilder)
case _ => ???
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
*/
package wvlet.airframe.parquet

import org.apache.parquet.schema.LogicalTypeAnnotation.stringType
import org.apache.parquet.schema.LogicalTypeAnnotation.{jsonType, stringType}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{LogicalTypeAnnotation, MessageType, PrimitiveType, Type, Types}
import org.apache.parquet.schema.Types.{MapBuilder, PrimitiveBuilder}
import wvlet.airframe.json.Json
import wvlet.airframe.msgpack.spi.MsgPack
import wvlet.airframe.surface.Primitive.PrimitiveSurface
import wvlet.airframe.surface.{
Alias,
ArraySurface,
OptionSurface,
Parameter,
Expand All @@ -30,11 +32,12 @@ import wvlet.airframe.surface.{
Surface
}
import wvlet.airframe.ulid.ULID
import wvlet.log.LogSupport

import java.util.UUID
import scala.jdk.CollectionConverters.*

object ParquetSchema {
object ParquetSchema extends LogSupport {

// Convert surface into primitive
private def toParquetPrimitiveTypeName(s: PrimitiveSurface): PrimitiveTypeName = {
Expand Down Expand Up @@ -77,12 +80,18 @@ object ParquetSchema {
toParquetPrimitive(p, rep)
case o: OptionSurface =>
buildParquetType(o.elementSurface, Some(Repetition.OPTIONAL))
case s: Surface if s == Surface.of[MsgPack] =>
Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL))
case s: Surface if s == Surface.of[Json] =>
Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL)).as(jsonType())
case s: Surface if classOf[wvlet.airframe.msgpack.spi.Value].isAssignableFrom(s.rawType) =>
Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL))
case s: Surface if s.isSeq || s.isArray =>
val elementSurface = s.typeArgs(0)
buildParquetType(elementSurface, Some(Repetition.REPEATED))
case m: Surface if m.isMap =>
// Encode Map[_, _] type as Binary and make it optional as Map can be empty
Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL)
// Encode Map[_, _] type as Json and make it optional as Map can be empty
Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).as(jsonType())
// case m: Surface if m.isMap =>
// val keySurface = m.typeArgs(0)
// val valueSurface = m.typeArgs(1)
Expand All @@ -101,8 +110,8 @@ object ParquetSchema {
}
groupType
case s: Surface =>
// Use MsgPack for other types
Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL))
// Use JSON for other types
Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL)).as(jsonType())
}
}

Expand All @@ -123,8 +132,13 @@ object ParquetSchema {
Primitive.Boolean
case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == stringType() =>
Primitive.String
case _ =>
case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == jsonType() =>
Primitive.String
case PrimitiveTypeName.BINARY =>
Surface.of[MsgPack]
case _ =>
// Use JSON for other types
Surface.of[Json]
}
} else {
val g = t.asGroupType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
package wvlet.airframe.parquet

import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.apache.parquet.schema.LogicalTypeAnnotation.stringType
import org.apache.parquet.schema.LogicalTypeAnnotation.{jsonType, stringType}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.{MessageType, Type}
import org.apache.parquet.schema.Type.Repetition
import wvlet.airframe.codec.MessageCodec
import wvlet.airframe.codec.{JSONCodec, MessageCodec}
import wvlet.airframe.codec.PrimitiveCodec.{
BooleanCodec,
DoubleCodec,
Expand All @@ -29,6 +29,7 @@ import wvlet.airframe.codec.PrimitiveCodec.{
ValueCodec
}
import wvlet.airframe.msgpack.spi.MsgPack
import wvlet.airframe.msgpack.spi.Value.{StringValue, TimestampValue}
import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport

Expand Down Expand Up @@ -103,12 +104,19 @@ object ParquetWriteCodec extends LogSupport {
recordConsumer.addDouble(DoubleCodec.fromMsgPack(msgpack))
}
}
case PrimitiveTypeName.BINARY if tpe.getLogicalTypeAnnotation == stringType =>
case PrimitiveTypeName.BINARY if tpe.getLogicalTypeAnnotation == stringType() =>
new PrimitiveParquetCodec(codec) {
override protected def writeValue(recordConsumer: RecordConsumer, msgpack: MsgPack): Unit = {
recordConsumer.addBinary(Binary.fromString(StringCodec.fromMsgPack(msgpack)))
}
}
case PrimitiveTypeName.BINARY if tpe.getLogicalTypeAnnotation == jsonType() =>
new PrimitiveParquetCodec(codec) {
override protected def writeValue(recordConsumer: RecordConsumer, msgpack: MsgPack): Unit = {
val json: String = ValueCodec.fromMsgPack(msgpack).toUnquotedString
recordConsumer.addBinary(Binary.fromString(json))
}
}
case _ =>
// For the other primitive type values
new PrimitiveParquetCodec(codec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object ParquetTest extends AirSpec {
p7: Boolean = true,
p8: Boolean = false,
json: Json = """{"id":1,"param":"json param"}""",
jsonValue: JSONValue = JSON.parse("""{"id":1,"param":"json param"}"""),
jsonValue: JSONValue = JSON.parse("""{"id":2,"param":"json param2"}"""),
seqValue: Seq[String] = Seq("s1", "s2"),
mapValue: Map[String, Any] = Map("param1" -> "hello", "feature1" -> true),
ulid: ULID = ULID.newULID,
Expand All @@ -127,6 +127,7 @@ object ParquetTest extends AirSpec {

withResource(Parquet.newReader[MyData](path = file.getPath)) { reader =>
val r1 = reader.read()
r1.json shouldBe d1.json
r1 shouldBe d1
val r2 = reader.read()
r2 shouldBe d2
Expand Down

0 comments on commit 3e4be7f

Please sign in to comment.