diff --git a/src/libraries/NetCoreAppLibrary.props b/src/libraries/NetCoreAppLibrary.props index 49fc1048b2b1c..2b0f1a0f2e51d 100644 --- a/src/libraries/NetCoreAppLibrary.props +++ b/src/libraries/NetCoreAppLibrary.props @@ -77,6 +77,7 @@ System.IO.MemoryMappedFiles; System.IO.Pipes; System.IO.Pipes.AccessControl; + System.IO.Pipelines; System.IO.UnmanagedMemoryStream; System.Linq; System.Linq.Expressions; @@ -220,7 +221,6 @@ Microsoft.Extensions.Options.DataAnnotations; Microsoft.Extensions.Primitives; System.Diagnostics.EventLog; - System.IO.Pipelines; System.Security.Cryptography.Xml; System.Threading.RateLimiting; diff --git a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.csproj b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.csproj index c87749fe98545..7148be9f761d9 100644 --- a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.csproj +++ b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.csproj @@ -7,6 +7,11 @@ + + + + + diff --git a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj index c672633ad1681..a4517443d1832 100644 --- a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj +++ b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj @@ -65,4 +65,12 @@ System.IO.Pipelines.PipeReader + + + + + + + + diff --git a/src/libraries/System.Text.Json/ref/System.Text.Json.cs b/src/libraries/System.Text.Json/ref/System.Text.Json.cs index e16277de47529..aac63df154c35 100644 --- a/src/libraries/System.Text.Json/ref/System.Text.Json.cs +++ b/src/libraries/System.Text.Json/ref/System.Text.Json.cs @@ -307,8 +307,17 @@ public static void Serialize(System.Text.Json.Utf8JsonWriter writer, object? val public static System.Threading.Tasks.Task SerializeAsync(System.IO.Stream utf8Json, object? value, System.Type inputType, System.Text.Json.Serialization.JsonSerializerContext context, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } [System.Diagnostics.CodeAnalysis.RequiresDynamicCodeAttribute("JSON serialization and deserialization might require types that cannot be statically analyzed and might need runtime code generation. Use System.Text.Json source generation for native AOT applications.")] [System.Diagnostics.CodeAnalysis.RequiresUnreferencedCodeAttribute("JSON serialization and deserialization might require types that cannot be statically analyzed. Use the overload that takes a JsonTypeInfo or JsonSerializerContext, or make sure all of the required types are preserved.")] + public static System.Threading.Tasks.Task SerializeAsync(System.IO.Pipelines.PipeWriter utf8Json, TValue value, System.Text.Json.JsonSerializerOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + [System.Diagnostics.CodeAnalysis.RequiresDynamicCodeAttribute("JSON serialization and deserialization might require types that cannot be statically analyzed and might need runtime code generation. Use System.Text.Json source generation for native AOT applications.")] + [System.Diagnostics.CodeAnalysis.RequiresUnreferencedCodeAttribute("JSON serialization and deserialization might require types that cannot be statically analyzed. Use the overload that takes a JsonTypeInfo or JsonSerializerContext, or make sure all of the required types are preserved.")] public static System.Threading.Tasks.Task SerializeAsync(System.IO.Stream utf8Json, TValue value, System.Text.Json.JsonSerializerOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public static System.Threading.Tasks.Task SerializeAsync(System.IO.Stream utf8Json, TValue value, System.Text.Json.Serialization.Metadata.JsonTypeInfo jsonTypeInfo, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public static System.Threading.Tasks.Task SerializeAsync(System.IO.Pipelines.PipeWriter utf8Json, TValue value, System.Text.Json.Serialization.Metadata.JsonTypeInfo jsonTypeInfo, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public static System.Threading.Tasks.Task SerializeAsync(System.IO.Pipelines.PipeWriter utf8Json, object? value, System.Text.Json.Serialization.Metadata.JsonTypeInfo jsonTypeInfo, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + [System.Diagnostics.CodeAnalysis.RequiresDynamicCodeAttribute("JSON serialization and deserialization might require types that cannot be statically analyzed and might need runtime code generation. Use System.Text.Json source generation for native AOT applications.")] + [System.Diagnostics.CodeAnalysis.RequiresUnreferencedCodeAttribute("JSON serialization and deserialization might require types that cannot be statically analyzed. Use the overload that takes a JsonTypeInfo or JsonSerializerContext, or make sure all of the required types are preserved.")] + public static System.Threading.Tasks.Task SerializeAsync(System.IO.Pipelines.PipeWriter utf8Json, object? value, System.Type inputType, System.Text.Json.JsonSerializerOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public static System.Threading.Tasks.Task SerializeAsync(System.IO.Pipelines.PipeWriter utf8Json, object? value, System.Type inputType, System.Text.Json.Serialization.JsonSerializerContext context, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public static System.Text.Json.JsonDocument SerializeToDocument(object? value, System.Text.Json.Serialization.Metadata.JsonTypeInfo jsonTypeInfo) { throw null; } [System.Diagnostics.CodeAnalysis.RequiresDynamicCodeAttribute("JSON serialization and deserialization might require types that cannot be statically analyzed and might need runtime code generation. Use System.Text.Json source generation for native AOT applications.")] [System.Diagnostics.CodeAnalysis.RequiresUnreferencedCodeAttribute("JSON serialization and deserialization might require types that cannot be statically analyzed. Use the overload that takes a JsonTypeInfo or JsonSerializerContext, or make sure all of the required types are preserved.")] diff --git a/src/libraries/System.Text.Json/ref/System.Text.Json.csproj b/src/libraries/System.Text.Json/ref/System.Text.Json.csproj index 48187a11e58ea..9e41bc3097af0 100644 --- a/src/libraries/System.Text.Json/ref/System.Text.Json.csproj +++ b/src/libraries/System.Text.Json/ref/System.Text.Json.csproj @@ -26,6 +26,7 @@ + diff --git a/src/libraries/System.Text.Json/src/Resources/Strings.resx b/src/libraries/System.Text.Json/src/Resources/Strings.resx index 3654ae7dbd380..e343daa47cf5a 100644 --- a/src/libraries/System.Text.Json/src/Resources/Strings.resx +++ b/src/libraries/System.Text.Json/src/Resources/Strings.resx @@ -1,13 +1,17 @@ - @@ -717,6 +728,12 @@ Indentation size must be between {0} and {1}. + + PipeWriter.FlushAsync was canceled. + + + PipeWriter has been completed, nothing more can be written to it. + New line can be only "\n" or "\r\n". diff --git a/src/libraries/System.Text.Json/src/System.Text.Json.csproj b/src/libraries/System.Text.Json/src/System.Text.Json.csproj index 34fb2d13a6f3a..7b49b89792cd7 100644 --- a/src/libraries/System.Text.Json/src/System.Text.Json.csproj +++ b/src/libraries/System.Text.Json/src/System.Text.Json.csproj @@ -102,6 +102,7 @@ The System.Text.Json library is built-in as part of the shared framework in .NET + @@ -252,6 +253,7 @@ The System.Text.Json library is built-in as part of the shared framework in .NET + @@ -372,6 +374,7 @@ The System.Text.Json library is built-in as part of the shared framework in .NET + diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/AsyncSerializationBufferWriterContext.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/AsyncSerializationBufferWriterContext.cs new file mode 100644 index 0000000000000..f34bcaf612892 --- /dev/null +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/AsyncSerializationBufferWriterContext.cs @@ -0,0 +1,85 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.IO; +using System.IO.Pipelines; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Text.Json.Serialization +{ + // Common interface to help de-dupe code for different types that can do async serialization (Stream and PipeWriter) + internal interface IAsyncSerializationBufferWriterContext : IDisposable + { + int FlushThreshold { get; } + + ValueTask FlushAsync(CancellationToken cancellationToken); + + public IBufferWriter BufferWriter { get; } + } + + internal readonly struct AsyncSerializationStreamContext : IAsyncSerializationBufferWriterContext + { + private readonly Stream _stream; + private readonly JsonSerializerOptions _options; + private readonly PooledByteBufferWriter _bufferWriter; + + public AsyncSerializationStreamContext(Stream stream, JsonSerializerOptions options) + { + _stream = stream; + _options = options; + _bufferWriter = new PooledByteBufferWriter(_options.DefaultBufferSize); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async ValueTask FlushAsync(CancellationToken cancellationToken) + { + await _bufferWriter.WriteToStreamAsync(_stream, cancellationToken).ConfigureAwait(false); + _bufferWriter.Clear(); + } + + public int FlushThreshold => (int)(_options.DefaultBufferSize * JsonSerializer.FlushThreshold); + + public IBufferWriter BufferWriter => _bufferWriter; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Dispose() + { + _bufferWriter.Dispose(); + } + } + + internal readonly struct AsyncSerializationPipeContext : IAsyncSerializationBufferWriterContext + { + private readonly PipeWriter _pipe; + + public AsyncSerializationPipeContext(PipeWriter pipe) + { + _pipe = pipe; + } + + public int FlushThreshold => (int)((4 * PipeOptions.Default.MinimumSegmentSize) * JsonSerializer.FlushThreshold); + + public IBufferWriter BufferWriter => _pipe; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async ValueTask FlushAsync(CancellationToken cancellationToken) + { + FlushResult result = await _pipe.FlushAsync(cancellationToken).ConfigureAwait(false); + if (result.IsCanceled || result.IsCompleted) + { + if (result.IsCanceled) + { + ThrowHelper.ThrowOperationCanceledException_PipeWriteCanceled(); + } + + ThrowHelper.ThrowOperationCanceledException_PipeWriteCompleted(); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Dispose() { } + } +} diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Pipe.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Pipe.cs new file mode 100644 index 0000000000000..dbd8debe55f09 --- /dev/null +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Pipe.cs @@ -0,0 +1,194 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.CodeAnalysis; +using System.IO.Pipelines; +using System.Text.Json.Serialization; +using System.Text.Json.Serialization.Metadata; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Text.Json +{ + public static partial class JsonSerializer + { + /// + /// Converts the provided value to UTF-8 encoded JSON text and write it to the . + /// + /// The type of the value to serialize. + /// The UTF-8 to write to. + /// The value to convert. + /// Metadata about the type to convert. + /// The that can be used to cancel the write operation. + /// A task that represents the asynchronous write operation. + /// + /// is . + /// + public static Task SerializeAsync( + PipeWriter utf8Json, + TValue value, + JsonTypeInfo jsonTypeInfo, + CancellationToken cancellationToken = default) + { + if (utf8Json is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(utf8Json)); + } + if (jsonTypeInfo is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(jsonTypeInfo)); + } + + jsonTypeInfo.EnsureConfigured(); + return jsonTypeInfo.SerializeAsync(utf8Json, value, cancellationToken); + } + + /// + /// Converts the provided value to UTF-8 encoded JSON text and write it to the . + /// + /// The type of the value to serialize. + /// The UTF-8 to write to. + /// The value to convert. + /// Options to control the conversion behavior. + /// The that can be used to cancel the write operation. + /// A task that represents the asynchronous write operation. + /// + /// is . + /// + /// + /// There is no compatible + /// for or its serializable members. + /// + [RequiresUnreferencedCode(SerializationUnreferencedCodeMessage)] + [RequiresDynamicCode(SerializationRequiresDynamicCodeMessage)] + public static Task SerializeAsync( + PipeWriter utf8Json, + TValue value, + JsonSerializerOptions? options = null, + CancellationToken cancellationToken = default) + { + if (utf8Json is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(utf8Json)); + } + + JsonTypeInfo jsonTypeInfo = GetTypeInfo(options); + return jsonTypeInfo.SerializeAsync(utf8Json, value, cancellationToken); + } + + /// + /// Converts the provided value to UTF-8 encoded JSON text and write it to the . + /// + /// The UTF-8 to write to. + /// The value to convert. + /// Metadata about the type to convert. + /// The that can be used to cancel the write operation. + /// A task that represents the asynchronous write operation. + /// + /// is . + /// + /// + /// does not match the type of . + /// + public static Task SerializeAsync( + PipeWriter utf8Json, + object? value, + JsonTypeInfo jsonTypeInfo, + CancellationToken cancellationToken = default) + { + if (utf8Json is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(utf8Json)); + } + + if (jsonTypeInfo is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(jsonTypeInfo)); + } + + jsonTypeInfo.EnsureConfigured(); + return jsonTypeInfo.SerializeAsObjectAsync(utf8Json, value, cancellationToken); + } + + /// + /// Converts the provided value to UTF-8 encoded JSON text and write it to the . + /// + /// The UTF-8 to write to. + /// The value to convert. + /// The type of the to convert. + /// A metadata provider for serializable types. + /// The that can be used to cancel the write operation. + /// A task that represents the asynchronous write operation. + /// + /// is not compatible with . + /// + /// + /// , , or is . + /// + /// + /// There is no compatible + /// for or its serializable members. + /// + public static Task SerializeAsync( + PipeWriter utf8Json, + object? value, + Type inputType, + JsonSerializerContext context, + CancellationToken cancellationToken = default) + { + if (utf8Json is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(utf8Json)); + } + + if (context is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(context)); + } + + ValidateInputType(value, inputType); + JsonTypeInfo jsonTypeInfo = GetTypeInfo(context, inputType); + + return jsonTypeInfo.SerializeAsObjectAsync(utf8Json, value, cancellationToken); + } + + /// + /// Converts the provided value to UTF-8 encoded JSON text and write it to the . + /// + /// The UTF-8 to write to. + /// The value to convert. + /// The type of the to convert. + /// Options to control the conversion behavior. + /// The that can be used to cancel the write operation. + /// A task that represents the asynchronous write operation. + /// + /// is not compatible with . + /// + /// + /// or is . + /// + /// + /// There is no compatible + /// for or its serializable members. + /// + [RequiresUnreferencedCode(SerializationUnreferencedCodeMessage)] + [RequiresDynamicCode(SerializationRequiresDynamicCodeMessage)] + public static Task SerializeAsync( + PipeWriter utf8Json, + object? value, + Type inputType, + JsonSerializerOptions? options = null, + CancellationToken cancellationToken = default) + { + if (utf8Json is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(utf8Json)); + } + + ValidateInputType(value, inputType); + JsonTypeInfo jsonTypeInfo = GetTypeInfo(options, inputType); + + return jsonTypeInfo.SerializeAsObjectAsync(utf8Json, value, cancellationToken); + } + } +} diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfo.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfo.cs index f8d5bffedc51d..b46a44f10f317 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfo.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfo.cs @@ -6,6 +6,7 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; +using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Text.Json.Reflection; @@ -989,7 +990,10 @@ public JsonPropertyInfo CreateJsonPropertyInfo(Type propertyType, string name) // Untyped, root-level serialization methods internal abstract void SerializeAsObject(Utf8JsonWriter writer, object? rootValue); + internal abstract Task SerializeAsObjectAsync(TSerializationContext serializationContext, object? rootValue, CancellationToken cancellationToken) + where TSerializationContext : struct, IAsyncSerializationBufferWriterContext; internal abstract Task SerializeAsObjectAsync(Stream utf8Json, object? rootValue, CancellationToken cancellationToken); + internal abstract Task SerializeAsObjectAsync(PipeWriter utf8Json, object? rootValue, CancellationToken cancellationToken); internal abstract void SerializeAsObject(Stream utf8Json, object? rootValue); // Untyped, root-level deserialization methods diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfoOfT.WriteHelpers.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfoOfT.WriteHelpers.cs index 08bec6e288981..a5c2bf8745592 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfoOfT.WriteHelpers.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfoOfT.WriteHelpers.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.IO; +using System.IO.Pipelines; using System.Text.Json.Serialization.Converters; using System.Threading; using System.Threading.Tasks; @@ -58,12 +59,28 @@ rootValue is not null && } } - // Root serialization method for async streaming serialization. - internal async Task SerializeAsync( - Stream utf8Json, + internal Task SerializeAsync(Stream utf8Json, T? rootValue, CancellationToken cancellationToken, object? rootValueBoxed = null) + { + return SerializeAsync(new AsyncSerializationStreamContext(utf8Json, Options), rootValue, cancellationToken, rootValueBoxed); + } + + internal Task SerializeAsync(PipeWriter utf8Json, + T? rootValue, + CancellationToken cancellationToken, + object? rootValueBoxed = null) + { + return SerializeAsync(new AsyncSerializationPipeContext(utf8Json), rootValue, cancellationToken, rootValueBoxed); + } + + // Root serialization method for async streaming serialization. + private async Task SerializeAsync( + TSerializationContext serializationContext, + T? rootValue, + CancellationToken cancellationToken, + object? rootValueBoxed = null) where TSerializationContext : struct, IAsyncSerializationBufferWriterContext { Debug.Assert(IsConfigured); Debug.Assert(rootValueBoxed is null || rootValueBoxed is T); @@ -76,24 +93,30 @@ internal async Task SerializeAsync( Debug.Assert(CanUseSerializeHandler); Debug.Assert(Converter is JsonMetadataServicesConverter); - using var bufferWriter = new PooledByteBufferWriter(Options.DefaultBufferSize); - Utf8JsonWriter writer = Utf8JsonWriterCache.RentWriter(Options, bufferWriter); + Utf8JsonWriter writer = Utf8JsonWriterCache.RentWriter(Options, serializationContext.BufferWriter); try { - SerializeHandler(writer, rootValue!); - writer.Flush(); + try + { + SerializeHandler(writer, rootValue!); + writer.Flush(); + } + finally + { + // Record the serialization size in both successful and failed operations, + // since we want to immediately opt out of the fast path if it exceeds the threshold. + OnRootLevelAsyncSerializationCompleted(writer.BytesCommitted + writer.BytesPending); + + Utf8JsonWriterCache.ReturnWriter(writer); + } + + await serializationContext.FlushAsync(cancellationToken).ConfigureAwait(false); } finally { - // Record the serialization size in both successful and failed operations, - // since we want to immediately opt out of the fast path if it exceeds the threshold. - OnRootLevelAsyncSerializationCompleted(writer.BytesCommitted + writer.BytesPending); - - Utf8JsonWriterCache.ReturnWriter(writer); + serializationContext.Dispose(); } - - await bufferWriter.WriteToStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false); } else if ( #if NET @@ -104,7 +127,7 @@ rootValue is not null && Options.TryGetPolymorphicTypeInfoForRootType(rootValue, out JsonTypeInfo? derivedTypeInfo)) { Debug.Assert(typeof(T) == typeof(object)); - await derivedTypeInfo.SerializeAsObjectAsync(utf8Json, rootValue, cancellationToken).ConfigureAwait(false); + await derivedTypeInfo.SerializeAsObjectAsync(serializationContext, rootValue, cancellationToken).ConfigureAwait(false); } else { @@ -117,14 +140,13 @@ rootValue is not null && state.CancellationToken = cancellationToken; - using var bufferWriter = new PooledByteBufferWriter(Options.DefaultBufferSize); - using var writer = new Utf8JsonWriter(bufferWriter, Options.GetWriterOptions()); + var writer = new Utf8JsonWriter(serializationContext.BufferWriter, Options.GetWriterOptions()); try { do { - state.FlushThreshold = (int)(bufferWriter.Capacity * JsonSerializer.FlushThreshold); + state.FlushThreshold = serializationContext.FlushThreshold; try { @@ -139,8 +161,7 @@ rootValue is not null && } else { - await bufferWriter.WriteToStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false); - bufferWriter.Clear(); + await serializationContext.FlushAsync(cancellationToken).ConfigureAwait(false); } } finally @@ -169,6 +190,15 @@ rootValue is not null && } } while (!isFinalBlock); + + if (CanUseSerializeHandler) + { + // On successful serialization, record the serialization size + // to determine potential suitability of the type for + // fast-path serialization in streaming methods. + Debug.Assert(writer.BytesPending == 0); + OnRootLevelAsyncSerializationCompleted(writer.BytesCommitted); + } } catch { @@ -176,14 +206,10 @@ rootValue is not null && await state.DisposePendingDisposablesOnExceptionAsync().ConfigureAwait(false); throw; } - - if (CanUseSerializeHandler) + finally { - // On successful serialization, record the serialization size - // to determine potential suitability of the type for - // fast-path serialization in streaming methods. - Debug.Assert(writer.BytesPending == 0); - OnRootLevelAsyncSerializationCompleted(writer.BytesCommitted); + writer.Dispose(); + serializationContext.Dispose(); } } } @@ -271,9 +297,15 @@ rootValue is not null && internal sealed override void SerializeAsObject(Utf8JsonWriter writer, object? rootValue) => Serialize(writer, JsonSerializer.UnboxOnWrite(rootValue), rootValue); + internal sealed override Task SerializeAsObjectAsync(TSerializationContext serializationContext, object? rootValue, CancellationToken cancellationToken) + => SerializeAsync(serializationContext, JsonSerializer.UnboxOnWrite(rootValue), cancellationToken, rootValue); + internal sealed override Task SerializeAsObjectAsync(Stream utf8Json, object? rootValue, CancellationToken cancellationToken) => SerializeAsync(utf8Json, JsonSerializer.UnboxOnWrite(rootValue), cancellationToken, rootValue); + internal sealed override Task SerializeAsObjectAsync(PipeWriter utf8Json, object? rootValue, CancellationToken cancellationToken) + => SerializeAsync(utf8Json, JsonSerializer.UnboxOnWrite(rootValue), cancellationToken, rootValue); + internal sealed override void SerializeAsObject(Stream utf8Json, object? rootValue) => Serialize(utf8Json, JsonSerializer.UnboxOnWrite(rootValue), rootValue); diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/ThrowHelper.Serialization.cs b/src/libraries/System.Text.Json/src/System/Text/Json/ThrowHelper.Serialization.cs index 5647005377e3f..d50f3f261fe06 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/ThrowHelper.Serialization.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/ThrowHelper.Serialization.cs @@ -897,5 +897,17 @@ public static void ThrowArgumentException_JsonPolymorphismOptionsAssociatedWithD { throw new ArgumentException(SR.JsonPolymorphismOptionsAssociatedWithDifferentJsonTypeInfo, paramName: parameterName); } + + [DoesNotReturn] + public static void ThrowOperationCanceledException_PipeWriteCanceled() + { + throw new OperationCanceledException(SR.PipeWriterCanceled); + } + + [DoesNotReturn] + public static void ThrowOperationCanceledException_PipeWriteCompleted() + { + throw new OperationCanceledException(SR.PipeWriterCompleted); + } } } diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Writer/Utf8JsonWriterCache.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Writer/Utf8JsonWriterCache.cs index b0d9528031286..bfe92487f01c8 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Writer/Utf8JsonWriterCache.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Writer/Utf8JsonWriterCache.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Buffers; using System.Diagnostics; namespace System.Text.Json @@ -40,7 +41,7 @@ public static Utf8JsonWriter RentWriterAndBuffer(JsonWriterOptions options, int return writer; } - public static Utf8JsonWriter RentWriter(JsonSerializerOptions options, PooledByteBufferWriter bufferWriter) + public static Utf8JsonWriter RentWriter(JsonSerializerOptions options, IBufferWriter bufferWriter) { ThreadLocalState state = t_threadLocalState ??= new(); Utf8JsonWriter writer; diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/CollectionTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/CollectionTests.cs index 3b3d3ac1942d5..73c4ed6f661c9 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/CollectionTests.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/CollectionTests.cs @@ -24,4 +24,9 @@ public sealed partial class CollectionTestsDynamic_SyncStream : CollectionTests { public CollectionTestsDynamic_SyncStream() : base(JsonSerializerWrapper.SyncStreamSerializer) { } } + + public sealed partial class CollectionTestsDynamic_Pipe : CollectionTests + { + public CollectionTestsDynamic_Pipe() : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } } diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/ConstructorTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/ConstructorTests.cs index 7808e5b109b48..ca5c31afc1b81 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/ConstructorTests.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/ConstructorTests.cs @@ -27,4 +27,10 @@ public class ConstructorTests_Span : ConstructorTests public ConstructorTests_Span() : base(JsonSerializerWrapper.SpanSerializer) { } } + + public class ConstructorTests_Pipe : ConstructorTests + { + public ConstructorTests_Pipe() + : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } } diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/InvalidTypeTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/InvalidTypeTests.cs index 91ca5c015a64d..d4ea4d2f594c1 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/InvalidTypeTests.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/InvalidTypeTests.cs @@ -37,6 +37,11 @@ public class InvalidTypeTests_Writer : InvalidTypeTests public InvalidTypeTests_Writer() : base(JsonSerializerWrapper.ReaderWriterSerializer) { } } + public class InvalidTypeTests_Pipe : InvalidTypeTests + { + public InvalidTypeTests_Pipe() : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } + public abstract class InvalidTypeTests { private JsonSerializerWrapper Serializer { get; } diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonCreationHandlingTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonCreationHandlingTests.cs index 5a669bb5b2f90..573638913ae38 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonCreationHandlingTests.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonCreationHandlingTests.cs @@ -22,4 +22,9 @@ public sealed class JsonCreationHandlingTests_SyncStream : JsonCreationHandlingT { public JsonCreationHandlingTests_SyncStream() : base(JsonSerializerWrapper.SyncStreamSerializer) { } } + + public sealed class JsonCreationHandlingTests_Pipe : JsonCreationHandlingTests + { + public JsonCreationHandlingTests_Pipe() : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } } diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonSerializerApiValidation.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonSerializerApiValidation.cs index 1057e6a89006c..1ed08df56d62c 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonSerializerApiValidation.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonSerializerApiValidation.cs @@ -50,6 +50,11 @@ public class JsonSerializerApiValidation_Node : JsonSerializerApiValidation { public JsonSerializerApiValidation_Node() : base(JsonSerializerWrapper.NodeSerializer) { } } + + public class JsonSerializerApiValidation_Pipe : JsonSerializerApiValidation + { + public JsonSerializerApiValidation_Pipe() : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } } /// diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonSerializerWrapper.Reflection.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonSerializerWrapper.Reflection.cs index d3531970a8e76..a0a8167ec328c 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonSerializerWrapper.Reflection.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonSerializerWrapper.Reflection.cs @@ -1,10 +1,12 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; +using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Text.Json.Nodes; using System.Text.Json.Serialization.Metadata; @@ -32,6 +34,7 @@ protected JsonSerializerWrapper() public static JsonSerializerWrapper DocumentSerializer { get; } = new DocumentSerializerWrapper(); public static JsonSerializerWrapper ElementSerializer { get; } = new ElementSerializerWrapper(); public static JsonSerializerWrapper NodeSerializer { get; } = new NodeSerializerWrapper(); + public static JsonSerializerWrapper AsyncPipeSerializer { get; } = new AsyncPipelinesSerializerWrapper(); private class SpanSerializerWrapper : JsonSerializerWrapper { @@ -881,5 +884,91 @@ private int ReadExactlyFromSource(byte[] buffer, int offset, int count) public override void SetLength(long value) => throw new NotSupportedException(); public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); } + + // TODO: Deserialize to use PipeReader overloads once implemented + private class AsyncPipelinesSerializerWrapper : JsonSerializerWrapper + { + public override JsonSerializerOptions DefaultOptions => JsonSerializerOptions.Default; + public override bool SupportsNullValueOnDeserialize => true; + + public override async Task DeserializeWrapper(string json, JsonSerializerOptions options = null) + { + return await JsonSerializer.DeserializeAsync(new MemoryStream(Encoding.UTF8.GetBytes(json)), options); + } + public override async Task DeserializeWrapper(string json, Type type, JsonSerializerOptions options = null) + { + return await JsonSerializer.DeserializeAsync(new MemoryStream(Encoding.UTF8.GetBytes(json)), type, options); + } + + public override async Task DeserializeWrapper(string json, JsonTypeInfo jsonTypeInfo) + { + return await JsonSerializer.DeserializeAsync(new MemoryStream(Encoding.UTF8.GetBytes(json)), jsonTypeInfo); + } + + public override async Task DeserializeWrapper(string value, JsonTypeInfo jsonTypeInfo) + { + return await JsonSerializer.DeserializeAsync(new MemoryStream(Encoding.UTF8.GetBytes(value)), jsonTypeInfo); + } + + public override async Task DeserializeWrapper(string json, Type type, JsonSerializerContext context) + { + return await JsonSerializer.DeserializeAsync(new MemoryStream(Encoding.UTF8.GetBytes(json)), type, context); + } + + public override async Task SerializeWrapper(object value, Type inputType, JsonSerializerOptions options = null) + { + Pipe pipe = new Pipe(); + await JsonSerializer.SerializeAsync(pipe.Writer, value, inputType, options); + ReadResult result = await pipe.Reader.ReadAsync(); + + string stringResult = Encoding.UTF8.GetString(result.Buffer.ToArray()); + pipe.Reader.AdvanceTo(result.Buffer.End); + return stringResult; + } + + public override async Task SerializeWrapper(T value, JsonSerializerOptions options = null) + { + Pipe pipe = new Pipe(); + await JsonSerializer.SerializeAsync(pipe.Writer, value, options); + ReadResult result = await pipe.Reader.ReadAsync(); + + string stringResult = Encoding.UTF8.GetString(result.Buffer.ToArray()); + pipe.Reader.AdvanceTo(result.Buffer.End); + return stringResult; + } + + public override async Task SerializeWrapper(object value, Type inputType, JsonSerializerContext context) + { + Pipe pipe = new Pipe(); + await JsonSerializer.SerializeAsync(pipe.Writer, value, inputType, context); + ReadResult result = await pipe.Reader.ReadAsync(); + + string stringResult = Encoding.UTF8.GetString(result.Buffer.ToArray()); + pipe.Reader.AdvanceTo(result.Buffer.End); + return stringResult; + } + + public override async Task SerializeWrapper(T value, JsonTypeInfo jsonTypeInfo) + { + Pipe pipe = new Pipe(); + await JsonSerializer.SerializeAsync(pipe.Writer, value, jsonTypeInfo); + ReadResult result = await pipe.Reader.ReadAsync(); + + string stringResult = Encoding.UTF8.GetString(result.Buffer.ToArray()); + pipe.Reader.AdvanceTo(result.Buffer.End); + return stringResult; + } + + public override async Task SerializeWrapper(object value, JsonTypeInfo jsonTypeInfo) + { + Pipe pipe = new Pipe(); + await JsonSerializer.SerializeAsync(pipe.Writer, value, jsonTypeInfo); + ReadResult result = await pipe.Reader.ReadAsync(); + + string stringResult = Encoding.UTF8.GetString(result.Buffer.ToArray()); + pipe.Reader.AdvanceTo(result.Buffer.End); + return stringResult; + } + } } } diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/MetadataTests/MetadataTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/MetadataTests/MetadataTests.cs index 0e40f66a685dc..7d8c2238a085d 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/MetadataTests/MetadataTests.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/MetadataTests/MetadataTests.cs @@ -47,6 +47,11 @@ public class MetadataTests_Node : MetadataTests public MetadataTests_Node() : base(JsonSerializerWrapper.NodeSerializer) { } } + public class MetadataTests_Pipe : MetadataTests + { + public MetadataTests_Pipe() : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } + public abstract partial class MetadataTests { protected JsonSerializerWrapper Serializer { get; } diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/NumberHandlingTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/NumberHandlingTests.cs index 1dfa1e16b0b88..5613dda779335 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/NumberHandlingTests.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/NumberHandlingTests.cs @@ -42,6 +42,11 @@ public class NumberHandlingTests_Node : NumberHandlingTests_OverloadSpecific public NumberHandlingTests_Node() : base(JsonSerializerWrapper.NodeSerializer) { } } + public class NumberHandlingTests_Pipe : NumberHandlingTests_OverloadSpecific + { + public NumberHandlingTests_Pipe() : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } + public abstract class NumberHandlingTests_OverloadSpecific { private JsonSerializerWrapper Serializer { get; } diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/Pipe.WriteTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/Pipe.WriteTests.cs new file mode 100644 index 0000000000000..6bd7a765e3829 --- /dev/null +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/Pipe.WriteTests.cs @@ -0,0 +1,427 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Runtime.CompilerServices; +using System.Text.Json.Serialization.Metadata; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.Text.Json.Serialization.Tests +{ + public partial class PipeTests + { + [Fact] + public async Task WriteNullArgumentFail() + { + await Assert.ThrowsAsync(async () => await JsonSerializer.SerializeAsync((PipeWriter)null, 1)); + await Assert.ThrowsAsync(async () => await JsonSerializer.SerializeAsync((PipeWriter)null, 1, typeof(int))); + } + + [Fact] + public async Task VerifyValueFail() + { + Pipe pipe = new Pipe(); + await Assert.ThrowsAsync(async () => await JsonSerializer.SerializeAsync(pipe.Writer, "", (Type)null)); + } + + [Fact] + public async Task VerifyTypeFail() + { + Pipe pipe = new Pipe(); + await Assert.ThrowsAsync(async () => await JsonSerializer.SerializeAsync(pipe.Writer, 1, typeof(string))); + } + + [Fact] + public async Task CompletedPipeWithExceptionThrowsFromSerialize() + { + Pipe pipe = new Pipe(); + pipe.Reader.Complete(new FormatException()); + + await Assert.ThrowsAsync(() => JsonSerializer.SerializeAsync(pipe.Writer, 1)); + } + + [Fact] + public async Task CompletedPipeThrowsFromSerialize() + { + Pipe pipe = new Pipe(); + pipe.Reader.Complete(); + + await Assert.ThrowsAsync(() => JsonSerializer.SerializeAsync(pipe.Writer, 1)); + } + + [Fact] + public async Task CancelPendingFlushDuringBackpressureThrows() + { + Pipe pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5)); + await pipe.Writer.WriteAsync("123456789"u8.ToArray()); + Task serializeTask = JsonSerializer.SerializeAsync(pipe.Writer, GetNumbersAsync()); + Assert.False(serializeTask.IsCompleted); + + pipe.Writer.CancelPendingFlush(); + + await Assert.ThrowsAsync(() => serializeTask); + + ReadResult result = await pipe.Reader.ReadAsync(); + + // Technically this check is not needed, but helps confirm behavior, that Pipe had written but was waiting for flush to continue. + // result.Buffer: 123456789[0 + Assert.Equal(11, result.Buffer.Length); + pipe.Reader.AdvanceTo(result.Buffer.End); + + static async IAsyncEnumerable GetNumbersAsync() + { + int i = 0; + while (true) + { + await Task.Delay(10); + yield return i++; + } + } + } + + [Fact] + public async Task BackpressureIsObservedWhenWritingJson() + { + Pipe pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5)); + await pipe.Writer.WriteAsync("123456789"u8.ToArray()); + Task serializeTask = JsonSerializer.SerializeAsync(pipe.Writer, 1); + Assert.False(serializeTask.IsCompleted); + + ReadResult result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.GetPosition(5)); + + // Still need to read 1 more byte to unblock flush + Assert.False(serializeTask.IsCompleted); + + result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.GetPosition(1)); + + await serializeTask; + } + + [Fact] + public async Task CanCancelBackpressuredJsonWrite() + { + Pipe pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5)); + await pipe.Writer.WriteAsync("123456789"u8.ToArray()); + + CancellationTokenSource cts = new(); + Task serializeTask = JsonSerializer.SerializeAsync(pipe.Writer, 1, cancellationToken: cts.Token); + Assert.False(serializeTask.IsCompleted); + + cts.Cancel(); + + await Assert.ThrowsAsync(() => serializeTask); + + ReadResult result = await pipe.Reader.ReadAsync(); + // Even though flush was canceled, the bytes are still written to the Pipe + Assert.Equal(10, result.Buffer.Length); + pipe.Reader.AdvanceTo(result.Buffer.End); + } + + [Theory] + [InlineData(32)] + [InlineData(128)] + [InlineData(1024)] + [InlineData(1024 * 16)] // the default JsonSerializerOptions.DefaultBufferSize value + [InlineData(1024 * 1024)] + public async Task ShouldUseFastPathOnSmallPayloads(int defaultBufferSize) + { + var instrumentedResolver = new PocoWithInstrumentedFastPath.Context( + new JsonSerializerOptions + { + DefaultBufferSize = defaultBufferSize, + }); + + // The current implementation uses a heuristic + int smallValueThreshold = defaultBufferSize / 2; + PocoWithInstrumentedFastPath smallValue = PocoWithInstrumentedFastPath.CreateValueWithSerializationSize(smallValueThreshold); + + // We don't care about backpressure in this test + Pipe pipe = new Pipe(new PipeOptions(pauseWriterThreshold: defaultBufferSize, resumeWriterThreshold: defaultBufferSize / 2)); + ReadResult result; + + // The first 10 serializations should not call into the fast path + for (int i = 0; i < 10; i++) + { + await JsonSerializer.SerializeAsync(pipe.Writer, smallValue, instrumentedResolver.Options); + result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.End); + Assert.Equal(0, instrumentedResolver.FastPathInvocationCount); + } + + // Subsequent iterations do call into the fast path + for (int i = 0; i < 10; i++) + { + await JsonSerializer.SerializeAsync(pipe.Writer, smallValue, instrumentedResolver.Options); + result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.End); + Assert.Equal(i + 1, instrumentedResolver.FastPathInvocationCount); + } + + // Polymorphic serialization should use the fast path + await JsonSerializer.SerializeAsync(pipe.Writer, (object)smallValue, instrumentedResolver.Options); + result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.End); + Assert.Equal(11, instrumentedResolver.FastPathInvocationCount); + + // Attempt to serialize a value that is deemed large + var largeValue = PocoWithInstrumentedFastPath.CreateValueWithSerializationSize(smallValueThreshold + 1); + await JsonSerializer.SerializeAsync(pipe.Writer, largeValue, instrumentedResolver.Options); + result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.End); + Assert.Equal(12, instrumentedResolver.FastPathInvocationCount); + + // Any subsequent attempts no longer call into the fast path + for (int i = 0; i < 10; i++) + { + await JsonSerializer.SerializeAsync(pipe.Writer, smallValue, instrumentedResolver.Options); + result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.End); + Assert.Equal(12, instrumentedResolver.FastPathInvocationCount); + } + } + + [Fact] + public async Task FastPathObservesBackpressure() + { + int defaultBufferSize = 4096; + var instrumentedResolver = new PocoWithInstrumentedFastPath.Context( + new JsonSerializerOptions + { + DefaultBufferSize = defaultBufferSize, + }); + + // The current implementation uses a heuristic + int smallValueThreshold = defaultBufferSize / 2; + PocoWithInstrumentedFastPath smallValue = PocoWithInstrumentedFastPath.CreateValueWithSerializationSize(smallValueThreshold); + + Pipe pipe = new Pipe(new PipeOptions(pauseWriterThreshold: defaultBufferSize / 2, resumeWriterThreshold: defaultBufferSize / 4)); + ReadResult result; + + // The first 10 serializations should not call into the fast path + for (int i = 0; i < 10; i++) + { + await JsonSerializer.SerializeAsync(pipe.Writer, smallValue, instrumentedResolver.Options); + result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.End); + Assert.Equal(0, instrumentedResolver.FastPathInvocationCount); + } + + Task serializeTask = JsonSerializer.SerializeAsync(pipe.Writer, smallValue, instrumentedResolver.Options); + Assert.False(serializeTask.IsCompleted); + Assert.Equal(1, instrumentedResolver.FastPathInvocationCount); + + result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.End); + await serializeTask; + } + + [Fact] + public async Task CanCancelBackpressuredFastPath() + { + int defaultBufferSize = 4096; + var instrumentedResolver = new PocoWithInstrumentedFastPath.Context( + new JsonSerializerOptions + { + DefaultBufferSize = defaultBufferSize, + }); + + // The current implementation uses a heuristic + int smallValueThreshold = defaultBufferSize / 2; + PocoWithInstrumentedFastPath smallValue = PocoWithInstrumentedFastPath.CreateValueWithSerializationSize(smallValueThreshold); + + Pipe pipe = new Pipe(new PipeOptions(pauseWriterThreshold: defaultBufferSize / 2, resumeWriterThreshold: defaultBufferSize / 4)); + ReadResult result; + + // The first 10 serializations should not call into the fast path + for (int i = 0; i < 10; i++) + { + await JsonSerializer.SerializeAsync(pipe.Writer, smallValue, instrumentedResolver.Options); + result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.End); + Assert.Equal(0, instrumentedResolver.FastPathInvocationCount); + } + + CancellationTokenSource cts = new(); + + Task serializeTask = JsonSerializer.SerializeAsync(pipe.Writer, smallValue, instrumentedResolver.Options, cts.Token); + Assert.False(serializeTask.IsCompleted); + + cts.Cancel(); + await Assert.ThrowsAsync(() => serializeTask); + } + + [Fact] + public async Task BuffersBehaveAsExpected() + { + TestPool pool = new TestPool(2000); + Pipe pipe = new Pipe(new PipeOptions(pool)); + + // Many small writes + for (int i = 0; i < 100; ++i) + { + await JsonSerializer.SerializeAsync(pipe.Writer, "a"); + } + // Should fit into a single 2000 byte buffer + Assert.Equal(1, pool.BufferCount); + ReadResult result = await pipe.Reader.ReadAsync(); + pipe.Reader.AdvanceTo(result.Buffer.End); + Assert.Equal(0, pool.BufferCount); + + // Partially fill Pipe so next write needs a new buffer + await JsonSerializer.SerializeAsync(pipe.Writer, new string('a', 600)); + Assert.Equal(1, pool.BufferCount); + + // Writing strings incurs a 3x buffer size due to max potential transcoding + // 600 + 600*3 > 2000 means a second buffer will be grabbed + await JsonSerializer.SerializeAsync(pipe.Writer, new string('a', 600)); + Assert.Equal(2, pool.BufferCount); + result = await pipe.Reader.ReadAsync(); + Assert.Equal(1204, result.Buffer.Length); + SequencePosition pos = result.Buffer.Start; + int segments = 0; + while (result.Buffer.TryGet(ref pos, out ReadOnlyMemory memory)) + { + segments++; + Assert.Equal(602, memory.Length); + } + Assert.Equal(2, segments); + pipe.Reader.AdvanceTo(result.Buffer.End); + + // Large write + await JsonSerializer.SerializeAsync(pipe.Writer, new string('a', 2000)); + // Write is larger than pools max buffer size so Pipes will provide a buffer from elsewhere. + Assert.Equal(0, pool.BufferCount); + result = await pipe.Reader.ReadAsync(); + Assert.Equal(2002, result.Buffer.Length); + + } + + internal class TestPool : MemoryPool + { + private readonly int _bufferSize; + private int _bufferCount; + + public int BufferCount => _bufferCount; + + public TestPool(int bufferSize) + { + _bufferSize = bufferSize; + } + + public override int MaxBufferSize => _bufferSize; + + public override IMemoryOwner Rent(int minBufferSize = -1) + { + _bufferCount++; + return new MemoryPoolBuffer(_bufferSize, this); + } + + protected override void Dispose(bool disposing) + { } + + private sealed class MemoryPoolBuffer : IMemoryOwner + { + private readonly TestPool _pool; + private byte[]? _array; + private int _length; + + public MemoryPoolBuffer(int size, TestPool pool) + { + _array = ArrayPool.Shared.Rent(size); + _length = size; + _pool = pool; + } + + public Memory Memory + { + get + { + byte[]? array = _array; + + return new Memory(array, 0, _length); + } + } + + public void Dispose() + { + byte[]? array = _array; + if (array != null) + { + _array = null; + ArrayPool.Shared.Return(array); + _pool._bufferCount--; + } + } + } + } + + internal class PocoWithInstrumentedFastPath + { + public static PocoWithInstrumentedFastPath CreateValueWithSerializationSize(int targetSerializationSize) + { + int objectSerializationPaddingSize = """{"Value":""}""".Length; // 12 + return new PocoWithInstrumentedFastPath { Value = new string('a', targetSerializationSize - objectSerializationPaddingSize) }; + } + + public string? Value { get; set; } + + public class Context : JsonSerializerContext, IJsonTypeInfoResolver + { + public int FastPathInvocationCount { get; private set; } + + public Context(JsonSerializerOptions options) : base(options) + { } + + protected override JsonSerializerOptions? GeneratedSerializerOptions => Options; + public override JsonTypeInfo? GetTypeInfo(Type type) => GetTypeInfo(type, Options); + + public JsonTypeInfo? GetTypeInfo(Type type, JsonSerializerOptions options) + { + if (type == typeof(string)) + { + return JsonMetadataServices.CreateValueInfo(options, JsonMetadataServices.StringConverter); + } + + if (type == typeof(object)) + { + return JsonMetadataServices.CreateValueInfo(options, JsonMetadataServices.ObjectConverter); + } + + if (type == typeof(PocoWithInstrumentedFastPath)) + { + return JsonMetadataServices.CreateObjectInfo(options, + new JsonObjectInfoValues + { + PropertyMetadataInitializer = _ => new JsonPropertyInfo[1] + { + JsonMetadataServices.CreatePropertyInfo(options, + new JsonPropertyInfoValues + { + DeclaringType = typeof(PocoWithInstrumentedFastPath), + PropertyName = "Value", + Getter = obj => ((PocoWithInstrumentedFastPath)obj).Value, + }) + }, + + SerializeHandler = (writer, value) => + { + writer.WriteStartObject(); + writer.WriteString("Value", value.Value); + writer.WriteEndObject(); + FastPathInvocationCount++; + } + }); + } + + return null; + } + } + } + } +} diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/PolymorphicTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/PolymorphicTests.cs index 46f2202b1420e..9e14737051a2e 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/PolymorphicTests.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/PolymorphicTests.cs @@ -55,6 +55,11 @@ public class PolymorphicTests_Node : PolymorphicTests public PolymorphicTests_Node() : base(JsonSerializerWrapper.NodeSerializer) { } } + public class PolymorphicTests_Pipe : PolymorphicTests + { + public PolymorphicTests_Pipe() : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } + public abstract partial class PolymorphicTests : SerializerTests { public PolymorphicTests(JsonSerializerWrapper serializer) : base(serializer) diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/ReferenceHandlerTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/ReferenceHandlerTests.cs index 3cfcdbeff8df5..d793d4cbd47e7 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/ReferenceHandlerTests.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/ReferenceHandlerTests.cs @@ -29,4 +29,9 @@ public sealed class ReferenceHandlerTestsDynamic_IgnoreCycles_AsyncStream : Refe { public ReferenceHandlerTestsDynamic_IgnoreCycles_AsyncStream() : base(JsonSerializerWrapper.AsyncStreamSerializer) { } } + + public sealed class ReferenceHandlerTestsDynamic_IgnoreCycles_Pipe : ReferenceHandlerTests_IgnoreCycles + { + public ReferenceHandlerTestsDynamic_IgnoreCycles_Pipe() : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } } diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/RequiredKeywordTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/RequiredKeywordTests.cs index 41dd4f61688f1..969f5d17c6063 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/RequiredKeywordTests.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/RequiredKeywordTests.cs @@ -52,4 +52,9 @@ public class RequiredKeywordTests_Node : RequiredKeywordTests { public RequiredKeywordTests_Node() : base(JsonSerializerWrapper.NodeSerializer) { } } + + public class RequiredKeywordTests_Pipe : RequiredKeywordTests + { + public RequiredKeywordTests_Pipe() : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } } diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/UnmappedMemberHandlingTests.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/UnmappedMemberHandlingTests.cs index 2b532b6e6f44b..5c59342f3c779 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/UnmappedMemberHandlingTests.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/UnmappedMemberHandlingTests.cs @@ -12,4 +12,9 @@ public sealed partial class UnmappedMemberHandlingTests_AsyncStream : UnmappedMe { public UnmappedMemberHandlingTests_AsyncStream() : base(JsonSerializerWrapper.AsyncStreamSerializer) { } } + + public sealed partial class UnmappedMemberHandlingTests_Pipe : UnmappedMemberHandlingTests + { + public UnmappedMemberHandlingTests_Pipe() : base(JsonSerializerWrapper.AsyncPipeSerializer) { } + } } diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/System.Text.Json.Tests.csproj b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/System.Text.Json.Tests.csproj index 9eb2e9a2e63d7..3dc7cc9e50d40 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/System.Text.Json.Tests.csproj +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/System.Text.Json.Tests.csproj @@ -170,6 +170,7 @@ +