From dcd95c80c768132e7bcefc8422fcb6641716d1b2 Mon Sep 17 00:00:00 2001 From: Gabriel Ciuloaica Date: Mon, 12 Jun 2023 16:50:12 +0300 Subject: [PATCH 1/5] added API to allow encoding and decoding to/from Generic record, to allow integration with Schema Registry (AWS Glue) --- .../main/scala/zio/schema/codec/AvroCodec.scala | 15 +++++++++++++-- .../scala-2/zio/schema/codec/AvroCodecSpec.scala | 15 +++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala b/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala index 97a499e7d..532ebc734 100644 --- a/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala +++ b/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala @@ -25,8 +25,13 @@ import zio.{ Chunk, Unsafe, ZIO } object AvroCodec { - implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] = - new BinaryCodec[A] { + trait ExtendedBinaryCodec[A] extends BinaryCodec[A] { + def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record + def decodeGenericRecord(value: GenericRecord)(implicit schema: Schema[A]): Either[DecodeError, A] + } + + implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): ExtendedBinaryCodec[A] = + new ExtendedBinaryCodec[A] { val avroSchema: SchemaAvro = AvroSchemaCodec.encodeToApacheAvro(schema).getOrElse(throw new Exception("Avro schema could not be generated.")) @@ -59,6 +64,12 @@ object AvroCodec { decode(chunk).map(Chunk(_)) ) } + + override def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record = + encodeValue(value, schema).asInstanceOf[GenericData.Record] + + override def decodeGenericRecord(value: GenericRecord)(implicit schema: Schema[A]): Either[DecodeError, A] = + decodeValue(value, schema) } private def decodeValue[A](raw: Any, schema: Schema[A]): Either[DecodeError, A] = schema match { diff --git a/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala b/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala index 5790916c8..033aa7369 100644 --- a/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala +++ b/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala @@ -1,5 +1,7 @@ package zio.schema.codec +import org.apache.avro.generic.GenericData + import java.math.BigInteger import java.time.{ DayOfWeek, @@ -19,7 +21,6 @@ import java.time.{ ZonedDateTime } import java.util.UUID - import zio._ import zio.schema.{ DeriveSchema, Schema } import zio.stream.ZStream @@ -138,7 +139,8 @@ object AvroCodecSpec extends ZIOSpecDefault { sequenceDecoderSpec, genericRecordDecoderSpec, enumDecoderSpec, - streamEncodingDecodingSpec + streamEncodingDecodingSpec, + genericRecordEncodeDecodeSpec ) private val primitiveEncoderSpec = suite("Avro Codec - Encoder primitive spec")( @@ -687,4 +689,13 @@ object AvroCodecSpec extends ZIOSpecDefault { }) + private val genericRecordEncodeDecodeSpec = suite("AvroCodec - encode/decode Generic Record")( + test("Encode/Decode") { + val codec = AvroCodec.schemaBasedBinaryCodec[Record] + val generic: GenericData.Record = codec.encodeGenericRecord(Record("John", 42)) + val result = codec.decodeGenericRecord(generic) + assertTrue(result == Right(Record("John", 42))) + } + ) + } From 441a42c297dd13e480ff9c44677dd3878ec67a69 Mon Sep 17 00:00:00 2001 From: Gabriel Ciuloaica Date: Mon, 12 Jun 2023 17:02:18 +0300 Subject: [PATCH 2/5] fixed imports --- .../src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala b/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala index 033aa7369..db4f48129 100644 --- a/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala +++ b/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala @@ -1,7 +1,5 @@ package zio.schema.codec -import org.apache.avro.generic.GenericData - import java.math.BigInteger import java.time.{ DayOfWeek, @@ -21,6 +19,9 @@ import java.time.{ ZonedDateTime } import java.util.UUID + +import org.apache.avro.generic.GenericData + import zio._ import zio.schema.{ DeriveSchema, Schema } import zio.stream.ZStream From 8122cbccd3f8688b71c13c6de76e8e6445af0e0e Mon Sep 17 00:00:00 2001 From: Gabriel Ciuloaica Date: Mon, 12 Jun 2023 16:50:12 +0300 Subject: [PATCH 3/5] added API to allow encoding and decoding to/from Generic record, to allow integration with Schema Registry (AWS Glue) --- .../main/scala/zio/schema/codec/AvroCodec.scala | 15 +++++++++++++-- .../scala-2/zio/schema/codec/AvroCodecSpec.scala | 15 +++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala b/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala index 97a499e7d..532ebc734 100644 --- a/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala +++ b/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala @@ -25,8 +25,13 @@ import zio.{ Chunk, Unsafe, ZIO } object AvroCodec { - implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] = - new BinaryCodec[A] { + trait ExtendedBinaryCodec[A] extends BinaryCodec[A] { + def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record + def decodeGenericRecord(value: GenericRecord)(implicit schema: Schema[A]): Either[DecodeError, A] + } + + implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): ExtendedBinaryCodec[A] = + new ExtendedBinaryCodec[A] { val avroSchema: SchemaAvro = AvroSchemaCodec.encodeToApacheAvro(schema).getOrElse(throw new Exception("Avro schema could not be generated.")) @@ -59,6 +64,12 @@ object AvroCodec { decode(chunk).map(Chunk(_)) ) } + + override def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record = + encodeValue(value, schema).asInstanceOf[GenericData.Record] + + override def decodeGenericRecord(value: GenericRecord)(implicit schema: Schema[A]): Either[DecodeError, A] = + decodeValue(value, schema) } private def decodeValue[A](raw: Any, schema: Schema[A]): Either[DecodeError, A] = schema match { diff --git a/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala b/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala index 5790916c8..033aa7369 100644 --- a/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala +++ b/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala @@ -1,5 +1,7 @@ package zio.schema.codec +import org.apache.avro.generic.GenericData + import java.math.BigInteger import java.time.{ DayOfWeek, @@ -19,7 +21,6 @@ import java.time.{ ZonedDateTime } import java.util.UUID - import zio._ import zio.schema.{ DeriveSchema, Schema } import zio.stream.ZStream @@ -138,7 +139,8 @@ object AvroCodecSpec extends ZIOSpecDefault { sequenceDecoderSpec, genericRecordDecoderSpec, enumDecoderSpec, - streamEncodingDecodingSpec + streamEncodingDecodingSpec, + genericRecordEncodeDecodeSpec ) private val primitiveEncoderSpec = suite("Avro Codec - Encoder primitive spec")( @@ -687,4 +689,13 @@ object AvroCodecSpec extends ZIOSpecDefault { }) + private val genericRecordEncodeDecodeSpec = suite("AvroCodec - encode/decode Generic Record")( + test("Encode/Decode") { + val codec = AvroCodec.schemaBasedBinaryCodec[Record] + val generic: GenericData.Record = codec.encodeGenericRecord(Record("John", 42)) + val result = codec.decodeGenericRecord(generic) + assertTrue(result == Right(Record("John", 42))) + } + ) + } From 453b1ab4a562fe4d8f8ea7b42ed07b13bd8fe7fa Mon Sep 17 00:00:00 2001 From: Gabriel Ciuloaica Date: Mon, 12 Jun 2023 17:02:18 +0300 Subject: [PATCH 4/5] fixed imports --- .../src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala b/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala index 033aa7369..db4f48129 100644 --- a/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala +++ b/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala @@ -1,7 +1,5 @@ package zio.schema.codec -import org.apache.avro.generic.GenericData - import java.math.BigInteger import java.time.{ DayOfWeek, @@ -21,6 +19,9 @@ import java.time.{ ZonedDateTime } import java.util.UUID + +import org.apache.avro.generic.GenericData + import zio._ import zio.schema.{ DeriveSchema, Schema } import zio.stream.ZStream From 1c12ab2b62827f16348018c0ea91acc3a3a1a89d Mon Sep 17 00:00:00 2001 From: Gabriel Ciuloaica Date: Tue, 22 Aug 2023 10:19:28 +0300 Subject: [PATCH 5/5] fix: fixed README --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 243bf44ee..2e33a70fe 100644 --- a/README.md +++ b/README.md @@ -30,13 +30,13 @@ _ZIO Schema_ is used by a growing number of ZIO libraries, including _ZIO Flow_, In order to use this library, we need to add the following lines in our `build.sbt` file: ```scala -libraryDependencies += "dev.zio" %% "zio-schema" % "0.4.11" -libraryDependencies += "dev.zio" %% "zio-schema-bson" % "0.4.11" -libraryDependencies += "dev.zio" %% "zio-schema-json" % "0.4.11" -libraryDependencies += "dev.zio" %% "zio-schema-protobuf" % "0.4.11" +libraryDependencies += "dev.zio" %% "zio-schema" % "0.4.13" +libraryDependencies += "dev.zio" %% "zio-schema-bson" % "0.4.13" +libraryDependencies += "dev.zio" %% "zio-schema-json" % "0.4.13" +libraryDependencies += "dev.zio" %% "zio-schema-protobuf" % "0.4.13" // Required for automatic generic derivation of schemas -libraryDependencies += "dev.zio" %% "zio-schema-derivation" % "0.4.11", +libraryDependencies += "dev.zio" %% "zio-schema-derivation" % "0.4.13", libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided" ```