Skip to content

Commit

Permalink
Add base Encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jan 3, 2025
1 parent a4d8e15 commit 505c91f
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 118 deletions.
2 changes: 1 addition & 1 deletion src/FsCodec.Box/ByteArray.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ type ByteArray private () =
[<Extension>]
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> ByteArray.ReadOnlyMemoryToBytes, Func<_, _> ByteArray.BytesToReadOnlyMemory)
FsCodec.Core.EventCodec.mapBodies ByteArray.ReadOnlyMemoryToBytes ByteArray.BytesToReadOnlyMemory native
75 changes: 10 additions & 65 deletions src/FsCodec.Box/Compression.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,77 +8,21 @@ open System.Runtime.InteropServices
/// Enables the decoding side to transparently inflate the data on loading without burdening the application layer with tracking the encoding scheme used
type EncodedBody = (struct(int * ReadOnlyMemory<byte>))

module private EncodedMaybeCompressed =

module Encoding =
let [<Literal>] Direct = 0 // Assumed for all values not listed here
let [<Literal>] Deflate = 1 // Deprecated encoding produced by versions pre 3.0.0-rc.13; no longer produced
let [<Literal>] Brotli = 2 // Default encoding as of 3.0.0-rc.13

(* Decompression logic: triggered by extension methods below at the point where the Codec's Decode retrieves the Data or Meta properties *)

// In versions pre 3.0.0-rc.13, the compression was implemented as follows; NOTE: use of Flush vs Close saves space but is unconventional
// let private deflate (eventBody: ReadOnlyMemory<byte>): System.IO.MemoryStream =
// let output = new System.IO.MemoryStream()
// let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
// compressor.Write(eventBody.Span)
// compressor.Flush() // NOTE: using Flush in lieu of close means the result is not padded, which can hinder interop
// output
let private inflate (data: ReadOnlyMemory<byte>): byte[] =
let s = new System.IO.MemoryStream(data.ToArray(), writable = false)
let decompressor = new System.IO.Compression.DeflateStream(s, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true)
let output = new System.IO.MemoryStream()
decompressor.CopyTo output
output.ToArray()
let private brotliDecompress (data: ReadOnlyMemory<byte>): byte[] =
let s = new System.IO.MemoryStream(data.ToArray(), writable = false)
use decompressor = new System.IO.Compression.BrotliStream(s, System.IO.Compression.CompressionMode.Decompress)
use output = new System.IO.MemoryStream()
decompressor.CopyTo output
output.ToArray()
let decode struct (encoding, data): ReadOnlyMemory<byte> =
match encoding with
| Encoding.Deflate -> inflate data |> ReadOnlyMemory
| Encoding.Brotli -> brotliDecompress data |> ReadOnlyMemory
| Encoding.Direct | _ -> data

(* Conditional compression logic: triggered as storage layer pulls Data/Meta fields
Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *)

let private brotliCompress (eventBody: ReadOnlyMemory<byte>): System.IO.MemoryStream =
let output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
compressor.Write(eventBody.Span)
compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing
output
let encodeUncompressed (raw: ReadOnlyMemory<byte>): EncodedBody = Encoding.Direct, raw
let encode minSize minGain (raw: ReadOnlyMemory<byte>): EncodedBody =
if raw.Length < minSize then encodeUncompressed raw
else match brotliCompress raw with
| tmp when raw.Length > int tmp.Length + minGain -> Encoding.Brotli, tmp.ToArray() |> ReadOnlyMemory
| _ -> encodeUncompressed raw

type [<Struct>] CompressionOptions = { minSize: int; minGain: int } with
/// Attempt to compress anything possible
// TL;DR in general it's worth compressing everything to minimize RU consumption both on insert and update
// For DynamoStore, every time we need to calve from the tip, the RU impact of using TransactWriteItems is significant,
// so preventing or delaying that is of critical importance
// Empirically not much JSON below 48 bytes actually compresses - while we don't assume that, it is what is guiding the derivation of the default
static member Default = { minSize = 48; minGain = 4 }
/// Encode the data without attempting to compress, regardless of size
static member Uncompressed = { minSize = Int32.MaxValue; minGain = 0 }

[<Extension; AbstractClass; Sealed>]
[<Extension; AbstractClass; Sealed; Obsolete "Please use FsCodec.Encoding instead">]
type Compression private () =

static member Utf8ToEncodedDirect(x: ReadOnlyMemory<byte>): EncodedBody =
EncodedMaybeCompressed.encodeUncompressed x
FsCodec.Encoding.FromBlob x
static member Utf8ToEncodedTryCompress(options, x: ReadOnlyMemory<byte>): EncodedBody =
EncodedMaybeCompressed.encode options.minSize options.minGain x
FsCodec.Encoding.FromBlobTryCompress({ minSize = options.minSize; minGain = options.minGain }, x)
static member EncodedToUtf8(x: EncodedBody): ReadOnlyMemory<byte> =
EncodedMaybeCompressed.decode x
FsCodec.Encoding.DecodeToBlob x
static member EncodedToByteArray(x: EncodedBody): byte[] =
Compression.EncodedToUtf8(x).ToArray()
FsCodec.Encoding.DecodeToBlob(x).ToArray()

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to attempt to compress the data.<br/>
/// If sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
Expand All @@ -87,22 +31,23 @@ type Compression private () =
static member EncodeTryCompress<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>, [<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBody, 'Context> =
let opts = defaultArg options CompressionOptions.Default
FsCodec.Core.EventCodec.Map(native, (fun d -> Compression.Utf8ToEncodedTryCompress(opts, d)), Func<_, _> Compression.EncodedToUtf8)
let opts: FsCodec.CompressionOptions = { minSize = opts.minSize; minGain = opts.minGain }
FsCodec.Core.EventCodec.mapBodies (fun d -> Encoding.FromBlobTryCompress(opts, d)) Encoding.DecodeToBlob native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to encode as per <c>EncodeTryCompress</c>, but without attempting compression.</summary>
[<Extension>]
static member EncodeUncompressed<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>)
: IEventCodec<'Event, EncodedBody, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.Utf8ToEncodedDirect, Func<_, _> Compression.EncodedToUtf8)
Encoding.EncodeUncompressed native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to render and/or consume from Uncompressed <c>ReadOnlyMemory&lt;byte&gt;</c>.</summary>
[<Extension>]
static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.EncodedToUtf8, Func<_, _> Compression.Utf8ToEncodedDirect)
Encoding.ToBlobCodec native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to render and/or consume from Uncompressed <c>byte[]</c>.</summary>
[<Extension>]
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.EncodedToByteArray, Func<_, _> Compression.Utf8ToEncodedDirect)
Encoding.ToBlobArrayCodec native
6 changes: 3 additions & 3 deletions src/FsCodec.Box/FsCodec.Box.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="../FsCodec/FsCodec.fsproj" />
<!-- TODO if taking a dependency on 3.1, the impl should switch to EventCodec.mapBodies, and EventCodec.Map should be Obsoleted -->
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="FsCodec" Version="[3.0.0, 4.0.0)" />
<ProjectReference Include="../FsCodec/FsCodec.fsproj" />
<!-- <ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="../FsCodec/FsCodec.fsproj" />-->
<!-- <PackageReference Condition=" '$(Configuration)' == 'Release' " Include="FsCodec" Version="[3.1.0, 4.0.0)" />-->
</ItemGroup>

</Project>
45 changes: 15 additions & 30 deletions src/FsCodec.SystemTextJson/Encoding.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ type EncodedBody = (struct(int * JsonElement))

module private Impl =

module Encoding =
let [<Literal>] Direct = 0 // Assumed for all values not listed here
let [<Literal>] Deflate = 1 // Deprecated encoding produced by Equinox.Cosmos/CosmosStore < v 4.1.0; no longer produced
let [<Literal>] Brotli = 2 // Default encoding

(* Decompression logic: triggered by extension methods below at the point where the Codec's Decode retrieves the Data or Meta properties *)

// Equinox.Cosmos / Equinox.CosmosStore Deflate logic was as below:
Expand Down Expand Up @@ -73,56 +68,46 @@ module private Impl =
if utf8.Length <= int brotli.Length + minGain then encodeUncompressedUtf8 utf8 else
Encoding.Brotli, brotli.ToArray() |> blobToBase64StringJsonElement

type [<Struct>] CompressionOptions = { minSize: int; minGain: int } with
/// Attempt to compress anything possible
// TL;DR in general it's worth compressing everything to minimize RU consumption both on insert and update
// For CosmosStore, every time we touch the tip, the RU impact of the write is significant,
// so preventing or delaying that is of critical importance
// Empirically not much JSON below 48 bytes actually compresses - while we don't assume that, it is what is guiding the derivation of the default
static member Default = { minSize = 48; minGain = 4 }

[<Extension; AbstractClass; Sealed>]
type Encoding private () =

static member FromJsonElement(x: JsonElement): EncodedBody =
Impl.encodeUncompressed x
static member FromUtf8(x: ReadOnlyMemory<byte>): EncodedBody =
Impl.encodeUncompressedUtf8 x
static member TryCompress(options, x: JsonElement): EncodedBody =
static member FromJsonElementTryCompress(options, x: JsonElement): EncodedBody =
Impl.tryCompress options.minSize options.minGain x
static member TryCompressUtf8(options, x: ReadOnlyMemory<byte>): EncodedBody =
static member FromUtf8TryCompress(options, x: ReadOnlyMemory<byte>): EncodedBody =
Impl.tryCompressUtf8 options.minSize options.minGain x
static member ToJsonElement(x: EncodedBody): JsonElement =
static member DecodeToJsonElement(x: EncodedBody): JsonElement =
Impl.decode x
static member ToUtf8(x: EncodedBody): ReadOnlyMemory<byte> =
static member DecodeToUtf8(x: EncodedBody): ReadOnlyMemory<byte> =
Impl.decodeUtf8 x
static member ToByteArray(x: EncodedBody): byte[] =
Encoding.ToUtf8(x).ToArray()
static member ExpandTo(ms: System.IO.Stream, x: EncodedBody) =
Impl.decode_ (fun el -> JsonSerializer.Serialize(ms, el)) (fun dec -> dec ms) x

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>JsonElement</c> Event Bodies to encode as per <c>EncodeTryCompress</c>, but without attempting compression.</summary>
[<Extension>]
static member EncodeUncompressed<'Event, 'Context>(native: IEventCodec<'Event, JsonElement, 'Context>)
: IEventCodec<'Event, EncodedBody, 'Context> =
FsCodec.Core.EventCodec.mapBodies Encoding.FromJsonElement Encoding.ToJsonElement native
FsCodec.Core.EventCodec.mapBodies Encoding.FromJsonElement Encoding.DecodeToJsonElement native

/// <summary>The body will be saved as-is under the following circumstances:<br/>
/// - the <c>shouldCompress</c> predicate is not satisfied for the event in question.<br/>
/// - sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> produced when <c>Encode</c>ing conveys the encoding used, and must be round tripped alongside the body as a required input of a future <c>Decode</c>.</summary>
/// <remarks>NOTE this is intended for interoperability only; a Codec (such as <c>CodecJsonElement</c>) that encodes to <c>JsonElement</c> is strongly recommended unless you don't have a choice.</remarks>
[<Extension>]
static member EncodeTryCompressUtf8<'Event, 'Context>(
static member EncodeUtf8TryCompress<'Event, 'Context>(
native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>,
[<Optional; DefaultParameterValue null>] ?shouldCompress: Func<IEventData<ReadOnlyMemory<byte>>, bool>,
[<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBody, 'Context> =
let opts = defaultArg options CompressionOptions.Default
let encode = shouldCompress |> function
| None -> fun _x (d: ReadOnlyMemory<byte>) -> Encoding.TryCompressUtf8(opts, d)
| Some predicate -> fun x d -> if predicate.Invoke x then Encoding.TryCompressUtf8(opts, d) else Encoding.FromUtf8 d
FsCodec.Core.EventCodec.mapBodies_ encode Encoding.ToUtf8 native
| None -> fun _x (d: ReadOnlyMemory<byte>) -> Encoding.FromUtf8TryCompress(opts, d)
| Some predicate -> fun x d -> if predicate.Invoke x then Encoding.FromUtf8TryCompress(opts, d) else Encoding.FromUtf8 d
FsCodec.Core.EventCodec.mapBodies_ encode Encoding.DecodeToUtf8 native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>JsonElement</c> Event Bodies to attempt to compress the data.<br/>
/// The body will be saved as-is under the following circumstances:<br/>
Expand All @@ -137,18 +122,18 @@ type Encoding private () =
: IEventCodec<'Event, EncodedBody, 'Context> =
let opts = defaultArg options CompressionOptions.Default
let encode = shouldCompress |> function
| None -> fun _x (d: JsonElement) -> Encoding.TryCompress(opts, d)
| Some predicate -> fun x d -> if predicate.Invoke x then Encoding.TryCompress(opts, d) else Encoding.FromJsonElement d
FsCodec.Core.EventCodec.mapBodies_ encode Encoding.ToJsonElement native
| None -> fun _x (d: JsonElement) -> Encoding.FromJsonElementTryCompress(opts, d)
| Some predicate -> fun x d -> if predicate.Invoke x then Encoding.FromJsonElementTryCompress(opts, d) else Encoding.FromJsonElement d
FsCodec.Core.EventCodec.mapBodies_ encode Encoding.DecodeToJsonElement native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * JsonElement</c> Event Bodies to render and/or consume Uncompressed <c>ReadOnlyMemory&lt;byte&gt;</c>.</summary>
[<Extension>]
static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context> =
FsCodec.Core.EventCodec.mapBodies Encoding.ToUtf8 Encoding.FromUtf8 native
FsCodec.Core.EventCodec.mapBodies Encoding.DecodeToUtf8 Encoding.FromUtf8 native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * JsonElement</c> Event Bodies to render and/or consume Uncompressed <c>byte[]</c>.</summary>
[<Extension>]
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
static member ToUtf8ArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.mapBodies Encoding.ToByteArray Encoding.FromUtf8 native
FsCodec.Core.EventCodec.mapBodies (Encoding.DecodeToUtf8 >> _.ToArray()) Encoding.FromUtf8 native
7 changes: 3 additions & 4 deletions src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../FsCodec.Box/FsCodec.Box.fsproj" />
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="../FsCodec.Box/FsCodec.Box.fsproj" />
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="FsCodec.Box" Version="[3.0.0, 4.0.0)" />
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="../FsCodec.Box/FsCodec.Box.fsproj" />
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="FsCodec.Box" Version="[3.0.0, 4.0.0)" />
<!-- TODO when updating FsCodec.Box dep to 3.1, the FsCodec direct dep can be removed -->
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="FsCodec" Version="[3.0.4-alpha.0.4, 4.0.0)" />
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="FsCodec" Version="[3.0.4-alpha.0.4, 4.0.0)" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions src/FsCodec.SystemTextJson/Interop.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ type InteropHelpers private () =
[<Extension>]
static member ToUtf8Codec<'Event, 'Context>(native: FsCodec.IEventCodec<'Event, JsonElement, 'Context>)
: FsCodec.IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> InteropHelpers.JsonElementToUtf8, Func<_, _> InteropHelpers.Utf8ToJsonElement)
FsCodec.Core.EventCodec.mapBodies InteropHelpers.JsonElementToUtf8 InteropHelpers.Utf8ToJsonElement native

/// <summary>Adapts an IEventCodec that's rendering to <c>ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to handle <c>JsonElement</c> bodies instead.<br/>
/// NOTE where possible, it's better to use <c>CodecJsonElement</c> in preference to <c>Codec</c> to encode directly in order to avoid this mapping process.</summary>
[<Extension>]
static member ToJsonElementCodec<'Event, 'Context>(native: FsCodec.IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>)
: FsCodec.IEventCodec<'Event, JsonElement, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> InteropHelpers.Utf8ToJsonElement, Func<_, _> InteropHelpers.JsonElementToUtf8)
FsCodec.Core.EventCodec.mapBodies InteropHelpers.Utf8ToJsonElement InteropHelpers.JsonElementToUtf8 native
Loading

0 comments on commit 505c91f

Please sign in to comment.