Skip to content

Commit

Permalink
feat(SystemTextJson): Compression
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 31, 2024
1 parent 58f24ea commit ad64abe
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 10 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `SystemTextJson.Compression`: Conditional compression as per `Box.Compression` [#126](https://github.com/jet/FsCodec/pull/126)

### Changed
### Removed
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": {
"version": "8.0.100",
"version": "9.0.100",
"rollForward": "latestMajor"
}
}
14 changes: 7 additions & 7 deletions src/FsCodec.Box/Compression.fs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ module private EncodedMaybeCompressed =
let s = new System.IO.MemoryStream(data.ToArray(), writable = false)
let decompressor = new System.IO.Compression.DeflateStream(s, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true)
let output = new System.IO.MemoryStream()
decompressor.CopyTo(output)
decompressor.CopyTo output
output.ToArray()
let private brotliDecompress (data: ReadOnlyMemory<byte>): byte[] =
let s = new System.IO.MemoryStream(data.ToArray(), writable = false)
use decompressor = new System.IO.Compression.BrotliStream(s, System.IO.Compression.CompressionMode.Decompress)
use output = new System.IO.MemoryStream()
decompressor.CopyTo(output)
decompressor.CopyTo output
output.ToArray()
let decode struct (encoding, data): ReadOnlyMemory<byte> =
match encoding with
| Encoding.Deflate -> inflate data |> ReadOnlyMemory
| Encoding.Brotli -> brotliDecompress data |> ReadOnlyMemory
| Encoding.Direct | _ -> data
| Encoding.Deflate -> inflate data |> ReadOnlyMemory
| Encoding.Brotli -> brotliDecompress data |> ReadOnlyMemory
| Encoding.Direct | _ -> data

(* Conditional compression logic: triggered as storage layer pulls Data/Meta fields
Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *)
Expand All @@ -62,7 +62,7 @@ type [<Struct>] CompressionOptions = { minSize: int; minGain: int } with
/// Attempt to compress anything possible
// TL;DR in general it's worth compressing everything to minimize RU consumption both on insert and update
// For DynamoStore, every time we need to calve from the tip, the RU impact of using TransactWriteItems is significant,
// so preventing or delaying that is of critical significance
// so preventing or delaying that is of critical importance
// Empirically not much JSON below 48 bytes actually compresses - while we don't assume that, it is what is guiding the derivation of the default
static member Default = { minSize = 48; minGain = 4 }
/// Encode the data without attempting to compress, regardless of size
Expand Down Expand Up @@ -95,7 +95,7 @@ type Compression private () =
: IEventCodec<'Event, EncodedBody, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.Utf8ToEncodedDirect, Func<_, _> Compression.EncodedToUtf8)

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * ReadOnlyMemory&lt;byte&rt;</c> Event Bodies to render and/or consume from Uncompressed <c>ReadOnlyMemory&lt;byte&gt;</c>.</summary>
/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to render and/or consume from Uncompressed <c>ReadOnlyMemory&lt;byte&gt;</c>.</summary>
[<Extension>]
static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context> =
Expand Down
139 changes: 139 additions & 0 deletions src/FsCodec.SystemTextJson/Compression.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
namespace FsCodec.SystemTextJson

open FsCodec
open FsCodec.SystemTextJson.Interop
open System
open System.Runtime.CompilerServices
open System.Runtime.InteropServices
open System.Text.Json

/// Represents the body of an Event (or its Metadata), holding the encoded form of the buffer together with an enum value identifying the encoding scheme.
/// Enables the decoding side to transparently inflate the data on loading without burdening the application layer with tracking the encoding scheme used.
type EncodedBody = (struct(int * JsonElement))

module private EncodedMaybeCompressed =

module Encoding =
let [<Literal>] Direct = 0 // Assumed for all values not listed here
let [<Literal>] Deflate = 1 // Deprecated encoding produced by Equinox.Cosmos/CosmosStore < v 4.1.0; no longer produced
let [<Literal>] Brotli = 2 // Default encoding

(* Decompression logic: triggered by extension methods below at the point where the Codec's Decode retrieves the Data or Meta properties *)

// Equinox.Cosmos / Equinox.CosmosStore Deflate logic was as below:
// let private deflate (uncompressedBytes: byte[]) =
// let output = new MemoryStream()
// let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
// compressor.Write(uncompressedBytes)
// compressor.Flush() // Could `Close`, but not required
// output.ToArray()
let private inflate (compressedBytes: byte[]) =
let input = new System.IO.MemoryStream(compressedBytes)
let decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true)
let output = new System.IO.MemoryStream()
decompressor.CopyTo output
output.ToArray()
let private brotliDecompress (data: byte[]): byte[] =
let s = new System.IO.MemoryStream(data, writable = false)
use decompressor = new System.IO.Compression.BrotliStream(s, System.IO.Compression.CompressionMode.Decompress)
use output = new System.IO.MemoryStream()
decompressor.CopyTo output
output.ToArray()
let decodeJsonElement struct (encoding, data: JsonElement): JsonElement =
match encoding, data.ValueKind with
| (Encoding.Direct | Encoding.Deflate), JsonValueKind.String -> data.GetBytesFromBase64() |> inflate |> InteropHelpers.Utf8ToJsonElement
| Encoding.Brotli, JsonValueKind.String -> data.GetBytesFromBase64() |> brotliDecompress |> InteropHelpers.Utf8ToJsonElement
| _ -> data
let decodeUtf8 struct (encoding, data: JsonElement): ReadOnlyMemory<byte> =
match encoding, data.ValueKind with
| (Encoding.Direct | Encoding.Deflate), JsonValueKind.String -> data.GetBytesFromBase64() |> inflate |> ReadOnlyMemory<byte>
| Encoding.Brotli, JsonValueKind.String -> data.GetBytesFromBase64() |> brotliDecompress |> ReadOnlyMemory<byte>
| _ -> InteropHelpers.JsonElementToUtf8 data

(* Conditional compression logic: triggered as storage layer pulls Data/Meta fields
Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *)

let encodeUncompressed (raw: JsonElement): EncodedBody = Encoding.Direct, raw
let private blobToStringElement = Convert.ToBase64String >> JsonSerializer.SerializeToElement
let private brotliCompress (eventBody: ReadOnlyMemory<byte>): System.IO.MemoryStream =
let output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
compressor.Write eventBody.Span
compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing
output
let tryCompress minSize minGain (raw: JsonElement): EncodedBody =
let utf8: ReadOnlyMemory<byte> = InteropHelpers.JsonElementToUtf8 raw
if utf8.Length < minSize then encodeUncompressed raw else

let brotli = brotliCompress utf8
if utf8.Length <= int brotli.Length + minGain then encodeUncompressed raw else
Encoding.Brotli, brotli.ToArray() |> blobToStringElement
let encodeUncompressedUtf8 (raw: ReadOnlyMemory<byte>): EncodedBody = Encoding.Direct, InteropHelpers.Utf8ToJsonElement raw
let tryCompressUtf8 minSize minGain (utf8: ReadOnlyMemory<byte>): EncodedBody =
if utf8.Length < minSize then encodeUncompressedUtf8 utf8 else

let brotli = brotliCompress utf8
if utf8.Length <= int brotli.Length + minGain then encodeUncompressedUtf8 utf8 else
Encoding.Brotli, brotli.ToArray() |> blobToStringElement

type [<Struct>] CompressionOptions = { minSize: int; minGain: int } with
/// Attempt to compress anything possible
// TL;DR in general it's worth compressing everything to minimize RU consumption both on insert and update
// For CosmosStore, every time we touch the tip, the RU impact of the write is significant,
// so preventing or delaying that is of critical importance
// Empirically not much JSON below 48 bytes actually compresses - while we don't assume that, it is what is guiding the derivation of the default
static member Default = { minSize = 48; minGain = 4 }

[<Extension; AbstractClass; Sealed>]
type Compression private () =

static member Encode(x: JsonElement): EncodedBody =
EncodedMaybeCompressed.encodeUncompressed x
static member Encode(x: ReadOnlyMemory<byte>): EncodedBody =
EncodedMaybeCompressed.encodeUncompressedUtf8 x
static member EncodeTryCompress(options, x: JsonElement): EncodedBody =
EncodedMaybeCompressed.tryCompress options.minSize options.minGain x
static member EncodeTryCompress(options, x: ReadOnlyMemory<byte>): EncodedBody =
EncodedMaybeCompressed.tryCompressUtf8 options.minSize options.minGain x
static member DecodeToJsonElement(x: EncodedBody): JsonElement =
EncodedMaybeCompressed.decodeJsonElement x
static member DecodeToUtf8(x: EncodedBody): ReadOnlyMemory<byte> =
EncodedMaybeCompressed.decodeUtf8 x
static member DecodeToByteArray(x: EncodedBody): byte[] =
Compression.DecodeToUtf8(x).ToArray()

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>JsonElement</c> Event Bodies to attempt to compress the data.<br/>
/// If sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> conveys a value that must be round tripped alongside the body in order for the decoding process to correctly interpret it.</summary>
[<Extension>]
static member EncodeTryCompress<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>, [<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBody, 'Context> =
let opts = defaultArg options CompressionOptions.Default
FsCodec.Core.EventCodec.Map(native, (fun x -> Compression.EncodeTryCompress(opts, x)), Func<_, _> Compression.DecodeToUtf8)

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>JsonElement</c> Event Bodies to attempt to compress the data.<br/>
/// If sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> conveys a value that must be round tripped alongside the body in order for the decoding process to correctly interpret it.</summary>
[<Extension>]
static member EncodeTryCompress<'Event, 'Context>(native: IEventCodec<'Event, JsonElement, 'Context>, [<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBody, 'Context> =
let opts = defaultArg options CompressionOptions.Default
FsCodec.Core.EventCodec.Map(native, (fun x -> Compression.EncodeTryCompress(opts, x)), Func<_, _> Compression.DecodeToJsonElement)

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>JsonElement</c> Event Bodies to encode as per <c>EncodeTryCompress</c>, but without attempting compression.</summary>
[<Extension>]
static member EncodeUncompressed<'Event, 'Context>(native: IEventCodec<'Event, JsonElement, 'Context>)
: IEventCodec<'Event, EncodedBody, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.Encode, Func<_, _> Compression.DecodeToJsonElement)

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * JsonElement</c> Event Bodies to render and/or consume Uncompressed <c>ReadOnlyMemory&lt;byte&gt;</c>.</summary>
[<Extension>]
static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.DecodeToUtf8, Func<_, _> Compression.Encode)

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * JsonElement</c> Event Bodies to render and/or consume Uncompressed <c>byte[]</c>.</summary>
[<Extension>]
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.DecodeToByteArray, Func<_, _> Compression.Encode)
1 change: 1 addition & 0 deletions src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<Compile Include="Codec.fs" />
<Compile Include="CodecJsonElement.fs" />
<Compile Include="Interop.fs" />
<Compile Include="Compression.fs" />
<Compile Include="StringIdConverter.fs" />
</ItemGroup>

Expand Down
96 changes: 96 additions & 0 deletions tests/FsCodec.SystemTextJson.Tests/CompressionTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
module FsCodec.SystemTextJson.Tests.CompressionTests

open Swensen.Unquote
open System
open System.Text.Json
open Xunit

let inline roundtrip (sut: FsCodec.IEventCodec<'event, 'F, unit>) value =
let encoded = sut.Encode((), value = value)
let loaded = FsCodec.Core.TimelineEvent.Create(-1L, encoded)
sut.Decode loaded

(* Base Fixture Round-trips a String encoded as JsonElement *)

module StringUtf8 =

let eventType = "EventType"
let enc (x: 't): JsonElement = JsonSerializer.SerializeToElement x
let dec (b: JsonElement): 't = JsonSerializer.Deserialize b
let jsonElementCodec<'t> =
let encode e = struct (eventType, enc e)
let decode s (b: JsonElement) = if s = eventType then ValueSome (dec b) else invalidOp "Invalid eventType value"
FsCodec.Codec.Create(encode, decode)

let sut<'t> = jsonElementCodec<'t>

let [<Fact>] roundtrips () =
let value = {| value = "Hello World" |}
let res' = roundtrip sut value
res' =! ValueSome value

module InternalDecoding =

let inputValue = {| value = "Hello World" |}
// A JsonElement that's a JSON Object should be handled as an uncompressed value
let direct = struct (0, JsonSerializer.SerializeToElement inputValue)
// A JsonElement that's a JSON String should be treated as base64'd Deflate data where the Decoding is unspecified
let implicitDeflate = struct (Unchecked.defaultof<int>, JsonSerializer.SerializeToElement "qlYqS8wpTVWyUvJIzcnJVwjPL8pJUaoFAAAA//8=")
let explicitDeflate = struct (1, JsonSerializer.SerializeToElement "qlYqS8wpTVWyUvJIzcnJVwjPL8pJUaoFAAAA//8=")
let explicitBrotli = struct (2, JsonSerializer.SerializeToElement "CwuAeyJ2YWx1ZSI6IkhlbGxvIFdvcmxkIn0D")

let decode useRom =
if useRom then FsCodec.SystemTextJson.Compression.DecodeToByteArray >> JsonSerializer.Deserialize
else FsCodec.SystemTextJson.Compression.DecodeToJsonElement >> JsonSerializer.Deserialize
let [<Theory; InlineData false; InlineData true>] ``Can decode all known representations`` useRom =
test <@ decode useRom direct = inputValue @>
test <@ decode useRom implicitDeflate = inputValue @>
test <@ decode useRom explicitDeflate = inputValue @>
test <@ decode useRom explicitBrotli = inputValue @>

let [<Theory; InlineData false; InlineData true>] ``Defaults to leaving the body alone if unknown`` useRom =
let struct (_, je) = direct
let body = struct (99, je)
let decoded = decode useRom body
test <@ decoded = inputValue @>

type JsonElement with member x.Utf8ByteCount = if x.ValueKind = JsonValueKind.Null then 0 else x.GetRawText() |> System.Text.Encoding.UTF8.GetByteCount

module TryCompress =

let sut = FsCodec.SystemTextJson.Compression.EncodeTryCompress StringUtf8.sut

let compressibleValue = {| value = String('x', 5000) |}

let [<Fact>] roundtrips () =
let res' = roundtrip sut compressibleValue
res' =! ValueSome compressibleValue

let [<Fact>] ``compresses when possible`` () =
let encoded = sut.Encode((), value = compressibleValue)
let struct (_encoding, encodedValue) = encoded.Data
encodedValue.Utf8ByteCount <! JsonSerializer.Serialize(compressibleValue).Length

let [<Fact>] ``produces equivalent JsonElement where compression not possible`` () =
let value = {| value = "NotCompressible" |}
let directResult = StringUtf8.sut.Encode((), value).Data
let failedToCompressResult = sut.Encode((), value = value)
let struct (_encoding, result) = failedToCompressResult.Data
true =! JsonElement.DeepEquals(directResult, result)

module Uncompressed =

let sut = FsCodec.SystemTextJson.Compression.EncodeUncompressed StringUtf8.sut

// Borrow the value we just demonstrated to be compressible
let compressibleValue = TryCompress.compressibleValue

let [<Fact>] roundtrips () =
let res' = roundtrip sut compressibleValue
res' =! ValueSome compressibleValue

let [<Fact>] ``does not compress (despite it being possible to)`` () =
let directResult = StringUtf8.sut.Encode((), compressibleValue).Data
let shouldNotBeCompressedResult = sut.Encode((), value = compressibleValue)
let struct (_encoding, result) = shouldNotBeCompressedResult.Data
result.Utf8ByteCount =! directResult.Utf8ByteCount
23 changes: 23 additions & 0 deletions tests/FsCodec.SystemTextJson.Tests/Examples.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,26 @@ Client ClientB, event 2 meta { principal = "me" } event Removed { name = null }
Unhandled Event: Category Misc, Id x, Index 0, Event: "Dummy"
*)

(* Well known states for Compression regression tests *)

open System
let private brotliCompress (eventBody: ReadOnlyMemory<byte>): System.IO.MemoryStream =
let output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
compressor.Write eventBody.Span
compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing
output

/// Equinox.Cosmos / Equinox.CosmosStore Deflate logic was exactly as below, do not tweak:
let private deflate (uncompressedBytes: byte[]) =
let output = new System.IO.MemoryStream()
let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
compressor.Write(uncompressedBytes)
compressor.Flush() // Could `Close`, but not required
output.ToArray()
let raw = {| value = "Hello World" |}

[| raw |> System.Text.Json.JsonSerializer.SerializeToUtf8Bytes |> ReadOnlyMemory |> brotliCompress |> _.ToArray()
raw |> System.Text.Json.JsonSerializer.SerializeToUtf8Bytes |> deflate |]
|> Array.map Convert.ToBase64String
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<TargetFramework>net9.0</TargetFramework>
<DisableImplicitFSharpCoreReference>false</DisableImplicitFSharpCoreReference>
<DefineConstants>SYSTEM_TEXT_JSON</DefineConstants>
</PropertyGroup>
Expand Down Expand Up @@ -43,6 +43,7 @@
<Link>SomeNullHandlingTests.fs</Link>
</Compile>
<Compile Include="StringIdTests.fs" />
<Compile Include="CompressionTests.fs" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion tests/FsCodec.Tests/CompressionTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ module Uncompressed =
let struct (_encoding, result) = encoded.Data
true =! directResult.Span.SequenceEqual(result.Span)


module Decoding =

let raw = struct(0, Text.Encoding.UTF8.GetBytes("Hello World") |> ReadOnlyMemory)
let deflated = struct(1, Convert.FromBase64String("8kjNyclXCM8vykkBAAAA//8=") |> ReadOnlyMemory)
let brotli = struct(2, Convert.FromBase64String("CwWASGVsbG8gV29ybGQ=") |> ReadOnlyMemory)
Expand Down

0 comments on commit ad64abe

Please sign in to comment.