Skip to content

Commit

Permalink
Fixes #737 (#738)
Browse files Browse the repository at this point in the history
  • Loading branch information
gregor-rayman committed Sep 10, 2024
1 parent c741f6b commit 6b9753c
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 18 deletions.
129 changes: 112 additions & 17 deletions zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import zio.prelude.NonEmptyMap
import zio.schema._
import zio.schema.annotation._
import zio.schema.codec.DecodeError.ReadError
import zio.stream.ZPipeline
import zio.{ Cause, Chunk, ChunkBuilder, NonEmptyChunk, ZIO }
import zio.stream.{ ZChannel, ZPipeline }
import zio.{ Cause, Chunk, ChunkBuilder, ZIO, ZNothing }

object JsonCodec {

Expand All @@ -47,11 +47,7 @@ object JsonCodec {
override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.fromChannel(
ZPipeline.utfDecode.channel.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage))
) >>>
ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>>
ZPipeline.map[(Unit, NonEmptyChunk[String]), String] {
case (_, fragments) => fragments.mkString
} >>>
) >>> splitOnJsonBoundary >>>
ZPipeline.mapZIO { (s: String) =>
ZIO
.fromEither(jsonCodec.decodeJson(s))
Expand All @@ -62,14 +58,118 @@ object JsonCodec {
JsonEncoder.charSequenceToByteChunk(jsonCodec.encodeJson(value, None))

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks(
_.flatMap(encode)
)
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
}

implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] =
schemaBasedBinaryCodec[A](JsonCodec.Config.default)

val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = {
val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.')
val ContextJson = 'j'
val ContextString = 's'
val ContextBoolean = 'b'
val ContextNull = 'u'
val ContextNullAfterFirstL = 'x'
val ContextNumber = 'n'
val ContextEscape = 'e'

ZPipeline.suspend {
val stringBuilder = new StringBuilder
var depth = 0
var context = ContextJson

def fetchChunk(chunk: Chunk[String]): Chunk[String] = {
val chunkBuilder = ChunkBuilder.make[String]()
for {
string <- chunk
c <- string
} {
var valueEnded = false
context match {
case ContextEscape =>
context = 's'
case ContextString =>
c match {
case '\\' => context = ContextEscape
case '"' =>
context = ContextJson
valueEnded = true
case _ =>
}
case ContextBoolean =>
if (c == 'e') {
context = ContextJson
valueEnded = true
}
case ContextNull =>
if (c == 'l') {
context = ContextNullAfterFirstL
}
case ContextNullAfterFirstL =>
if (c == 'l') {
context = ContextJson
valueEnded = true
}
case ContextNumber =>
c match {
case '}' | ']' =>
depth -= 1
context = ContextJson
valueEnded = true
case _ if !validNumChars(c) =>
context = ContextJson
valueEnded = true
case _ =>
}
case _ =>
c match {
case '{' | '[' =>
depth += 1
case '}' | ']' =>
depth -= 1
valueEnded = true
case '"' =>
context = ContextString
case 't' | 'f' =>
context = ContextBoolean
case 'n' =>
context = ContextNull
case x if validNumChars(x) =>
context = ContextNumber
case _ =>
}
}
if (depth > 0 || context != ContextJson || valueEnded)
stringBuilder.append(c)

if (valueEnded && depth == 0) {
val str = stringBuilder.result()
if (!str.forall(_.isWhitespace)) chunkBuilder += str
stringBuilder.clear()
}
}
chunkBuilder.result()
}

lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] =
ZChannel.readWithCause(
in => {
val out = fetchChunk(in)
if (out.isEmpty) loop else ZChannel.write(out) *> loop
},
err =>
if (stringBuilder.isEmpty) ZChannel.refailCause(err)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.refailCause(err),
done =>
if (stringBuilder.isEmpty) ZChannel.succeed(done)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.succeed(done)
)

ZPipeline.fromChannel(loop)
}
}

def schemaBasedBinaryCodec[A](cfg: Config)(implicit schema: Schema[A]): BinaryCodec[A] =
new BinaryCodec[A] {
override def decode(whole: Chunk[Byte]): Either[DecodeError, A] =
Expand All @@ -80,10 +180,7 @@ object JsonCodec {

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.utfDecode.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) >>>
ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>>
ZPipeline.map[(Unit, NonEmptyChunk[String]), String] {
case (_, fragments) => fragments.mkString
} >>>
splitOnJsonBoundary >>>
ZPipeline.mapZIO { (s: String) =>
ZIO.fromEither(JsonDecoder.decode(schema, s))
}
Expand All @@ -92,9 +189,7 @@ object JsonCodec {
JsonEncoder.encode(schema, value, cfg)

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks(
_.flatMap(encode)
)
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
}

def jsonEncoder[A](schema: Schema[A]): ZJsonEncoder[A] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import zio.test._

object JsonCodecSpec extends ZIOSpecDefault {

case class Person(name: String, age: Int)
val personSchema: Schema[Person] = DeriveSchema.gen[Person]

def spec: Spec[TestEnvironment, Any] =
suite("JsonCodec Spec")(
encoderSuite,
Expand Down Expand Up @@ -436,6 +439,114 @@ object JsonCodecSpec extends ZIOSpecDefault {
"""{"NumberOne":{"value":"foo"}}"""
)
}
),
suite("Streams")(
suite("Streams of integers")(
test("Encodes a stream with multiple integers") {
assertEncodesMany(Schema[Int], 1 to 5, charSequenceToByteChunk("1\n2\n3\n4\n5"))
},
test("Decodes a stream with multiple integers separated by newlines") {
assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1\n2\n3\n4\n5"))
},
test("Decodes a stream with multiple integers separated by spaces") {
assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2 3 4 5"))
},
test("Decodes a stream with multiple integers separated by commas and other non JSON number characters") {
assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5"))
}
),
suite("Streams of booleans")(
test("Encodes a stream with multiple booleans") {
assertEncodesMany(Schema[Boolean], List(true, true, false), charSequenceToByteChunk("true\ntrue\nfalse"))
},
test("Decodes a stream with multiple booleans separated by newlines") {
assertDecodesMany(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true\ntrue\nfalse"))
},
test("Decodes a stream with multiple booleans separated by spaces") {
assertDecodesMany(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true true false"))
},
test(
"Decodes a stream with multiple booleans separated by commas and other non JSON boolean characters and not separated at all"
) {
assertDecodesMany(
Schema[Boolean],
Chunk(true, true, false, false),
charSequenceToByteChunk("true true, falsefalse")
)
}
),
suite("Streams of strings")(
test("Encodes a stream with multiple strings") {
assertEncodesMany(Schema[String], List("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\""))
},
test("Decodes a stream with multiple strings separated by newlines") {
assertDecodesMany(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\""))
},
test("Decodes a stream with multiple strings separated by spaces, commas and not separated at all") {
assertDecodesMany(Schema[String], Chunk("a", "b", "c", "d"), charSequenceToByteChunk(""""a" "b","c""d""""))
}
),
suite("Stream of records")(
test("Encodes a stream with multiple records") {
assertEncodesMany(
personSchema,
List(
Person("Alice", 1),
Person("Bob", 2),
Person("Charlie", 3)
),
charSequenceToByteChunk(
"""{"name":"Alice","age":1}
|{"name":"Bob","age":2}
|{"name":"Charlie","age":3}""".stripMargin
)
)
},
test("Decodes a stream with multiple records separated by newlines") {
assertDecodesMany(
personSchema,
Chunk(
Person("Alice", 1),
Person("Bob", 2),
Person("Charlie", 3)
),
charSequenceToByteChunk(
"""{"name":"Alice","age":1}
|{"name":"Bob","age":2}
|{"name":"Charlie","age":3}""".stripMargin
)
)
},
test("Decodes a stream with multiple records, not separated with internalnewlines") {
assertDecodesMany(
personSchema,
Chunk(
Person("Alice", 1),
Person("Bob", 2),
Person("Charlie", 3)
),
charSequenceToByteChunk(
"""{"name":"Alice","age":1}{"name":"Bob",
|"age"
|:2}{"name":"Charlie","age":3}""".stripMargin
)
)
},
test("Encodes a stream with no records") {
assertEncodesMany(
personSchema,
List.empty[Person],
charSequenceToByteChunk("")
)
},
test("Decodes a stream with no records") {
assertDecodesMany(
personSchema,
Chunk.empty,
charSequenceToByteChunk("")
)
}
)
)
)

Expand Down Expand Up @@ -1307,7 +1418,7 @@ object JsonCodecSpec extends ZIOSpecDefault {
),
test("decode discriminated case objects with extra fields")(
assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"type":"Cash","extraField":1}""")) &>
assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}""""))
assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}"""))
),
suite("of case objects")(
test("without annotation")(
Expand Down Expand Up @@ -1505,6 +1616,23 @@ object JsonCodecSpec extends ZIOSpecDefault {
assertZIO(stream)(equalTo(chunk))
}

private def assertEncodesMany[A](
schema: Schema[A],
values: Seq[A],
chunk: Chunk[Byte],
cfg: JsonCodec.Config = JsonCodec.Config.default,
print: Boolean = false
) = {
val stream = ZStream
.fromIterable(values)
.via(JsonCodec.schemaBasedBinaryCodec(cfg)(schema).streamEncoder)
.runCollect
.tap { chunk =>
printLine(s"${new String(chunk.toArray)}").when(print).ignore
}
assertZIO(stream)(equalTo(chunk))
}

private def assertEncodesJson[A](
schema: Schema[A],
value: A,
Expand Down Expand Up @@ -1551,6 +1679,16 @@ object JsonCodecSpec extends ZIOSpecDefault {
assertZIO(result)(equalTo(Chunk(value)))
}

private def assertDecodesMany[A](
schema: Schema[A],
values: Chunk[A],
chunk: Chunk[Byte],
cfg: JsonCodec.Config = JsonCodec.Config.default
) = {
val result = ZStream.fromChunk(chunk).via(JsonCodec.schemaBasedBinaryCodec[A](cfg)(schema).streamDecoder).runCollect
assertZIO(result)(equalTo(values))
}

private def assertEncodesThenDecodesFallback[A, B](
schema: Schema.Fallback[A, B],
value: Fallback[A, B]
Expand Down

0 comments on commit 6b9753c

Please sign in to comment.