From f6adcb7dd8db2ebdeba688838fede7942ce14578 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Wed, 11 Aug 2021 15:16:20 -0400 Subject: [PATCH] Revise implementation of IValueTaskSources Better match implementation in RandomAccess --- .../src/System.IO.Pipes.csproj | 2 - .../IO/Pipes/NamedPipeServerStream.Windows.cs | 43 ++- .../PipeStream.ConnectionValueTaskSource.cs | 44 --- .../PipeStream.ReadWriteValueTaskSource.cs | 70 ---- .../IO/Pipes/PipeStream.ValueTaskSource.cs | 301 +++++++++--------- .../src/System/IO/Pipes/PipeStream.Windows.cs | 249 ++++++--------- ...andle.OverlappedValueTaskSource.Windows.cs | 5 +- 7 files changed, 269 insertions(+), 445 deletions(-) delete mode 100644 src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ConnectionValueTaskSource.cs delete mode 100644 src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ReadWriteValueTaskSource.cs diff --git a/src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj b/src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj index 96808fe848417..3113dd32f9721 100644 --- a/src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj +++ b/src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj @@ -102,8 +102,6 @@ - - diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Windows.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Windows.cs index 563fc99ffb20c..625d2e322f236 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Windows.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Windows.cs @@ -2,8 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; -using System.Runtime.CompilerServices; +using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Security.AccessControl; using System.Security.Principal; @@ -55,7 +54,7 @@ protected override void Dispose(bool disposing) } } - internal override void TryToReuse(PipeValueTaskSource source) + internal override void TryToReuse(PipeValueTaskSource source) { base.TryToReuse(source); @@ -324,53 +323,49 @@ internal ExecuteHelper(PipeStreamImpersonationWorker userCode, SafePipeHandle? h private unsafe ValueTask WaitForConnectionCoreAsync(CancellationToken cancellationToken) { CheckConnectOperationsServerWithHandle(); + Debug.Assert(IsAsync); - if (!IsAsync) - { - throw new InvalidOperationException(SR.InvalidOperation_PipeNotAsync); - } - - var valueTaskSource = Interlocked.Exchange(ref _reusableConnectionValueTaskSource, null) ?? new ConnectionValueTaskSource(this); + ConnectionValueTaskSource? vts = Interlocked.Exchange(ref _reusableConnectionValueTaskSource, null) ?? new ConnectionValueTaskSource(this); try { - valueTaskSource.PrepareForOperation(); - if (!Interop.Kernel32.ConnectNamedPipe(InternalHandle!, valueTaskSource.Overlapped)) + vts.PrepareForOperation(); + if (!Interop.Kernel32.ConnectNamedPipe(InternalHandle!, vts._overlapped)) { int errorCode = Marshal.GetLastPInvokeError(); - switch (errorCode) { case Interop.Errors.ERROR_IO_PENDING: - valueTaskSource.RegisterForCancellation(cancellationToken); + // Common case: IO was initiated, completion will be handled by callback. + // Register for cancellation now that the operation has been initiated. + vts.RegisterForCancellation(cancellationToken); break; - // If we are here then the pipe is already connected, or there was an error - // so we should unpin and free the overlapped. case Interop.Errors.ERROR_PIPE_CONNECTED: + // If we are here then the pipe is already connected. // IOCompletitionCallback will not be called because we completed synchronously. - valueTaskSource.Dispose(); + vts.Dispose(); if (State == PipeState.Connected) { - throw new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected); + return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected))); } - valueTaskSource.SetCompletedSynchronously(); - - // We return a cached task instead of TaskCompletionSource's Task allowing the GC to collect it. + State = PipeState.Connected; return ValueTask.CompletedTask; default: - valueTaskSource.Dispose(); - return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode)); + vts.Dispose(); + return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(Win32Marshal.GetExceptionForWin32Error(errorCode))); } } } catch { - valueTaskSource.Dispose(); + vts.Dispose(); throw; } - return new ValueTask(valueTaskSource, valueTaskSource.Version); + // Completion handled by callback. + vts.FinishedScheduling(); + return new ValueTask(vts, vts.Version); } private void CheckConnectOperationsServerWithHandle() diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ConnectionValueTaskSource.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ConnectionValueTaskSource.cs deleted file mode 100644 index 9330a9c4acf2d..0000000000000 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ConnectionValueTaskSource.cs +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -namespace System.IO.Pipes -{ - public abstract partial class PipeStream : Stream - { - internal sealed class ConnectionValueTaskSource : PipeValueTaskSource - { - private readonly NamedPipeServerStream _serverStream; - - internal ConnectionValueTaskSource(NamedPipeServerStream server) - : base(server) - { - _serverStream = server; - } - - internal override void SetCompletedSynchronously() - { - _serverStream.State = PipeState.Connected; - SetResult(default(VoidResult)); - } - - protected override void AsyncCallback(uint errorCode, uint numBytes) - { - // Special case for when the client has already connected to us. - if (errorCode == Interop.Errors.ERROR_PIPE_CONNECTED) - { - errorCode = 0; - } - - base.AsyncCallback(errorCode, numBytes); - } - - protected override void HandleError(int errorCode) => - SetException(Win32Marshal.GetExceptionForWin32Error(errorCode)); - - protected override void HandleUnexpectedCancellation() => - SetException(Error.GetOperationAborted()); - } - - internal struct VoidResult { } - } -} diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ReadWriteValueTaskSource.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ReadWriteValueTaskSource.cs deleted file mode 100644 index 39b51af451dbc..0000000000000 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ReadWriteValueTaskSource.cs +++ /dev/null @@ -1,70 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -namespace System.IO.Pipes -{ - public abstract partial class PipeStream : Stream - { - internal sealed class ReadWriteValueTaskSource : PipeValueTaskSource - { - private readonly bool _isWrite; - private readonly PipeStream _pipeStream; - - private bool _isMessageComplete; - private int _numBytes; // number of buffer read OR written - - internal ReadWriteValueTaskSource(PipeStream stream, bool isWrite) - : base(stream) - { - _pipeStream = stream; - _isWrite = isWrite; - _isMessageComplete = true; - } - - internal bool IsWrite => _isWrite; - - internal override void SetCompletedSynchronously() - { - if (!_isWrite) - { - _pipeStream.UpdateMessageCompletion(_isMessageComplete); - } - - SetResult(_numBytes); - } - - protected override void AsyncCallback(uint errorCode, uint numBytes) - { - _numBytes = (int)numBytes; - - // Allow async read to finish - if (!_isWrite) - { - switch (errorCode) - { - case Interop.Errors.ERROR_BROKEN_PIPE: - case Interop.Errors.ERROR_PIPE_NOT_CONNECTED: - case Interop.Errors.ERROR_NO_DATA: - errorCode = 0; - break; - } - } - - // For message type buffer. - if (errorCode == Interop.Errors.ERROR_MORE_DATA) - { - errorCode = 0; - _isMessageComplete = false; - } - else - { - _isMessageComplete = true; - } - - base.AsyncCallback(errorCode, numBytes); - } - - protected override void HandleError(int errorCode) => SetException(_pipeStream.WinIOError(errorCode)); - } - } -} diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ValueTaskSource.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ValueTaskSource.cs index ad910572620ad..9d1430a3a7cd6 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ValueTaskSource.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ValueTaskSource.cs @@ -3,8 +3,6 @@ using System.Buffers; using System.Diagnostics; -using System.Runtime.ExceptionServices; -using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks.Sources; @@ -12,51 +10,26 @@ namespace System.IO.Pipes { public abstract partial class PipeStream : Stream { - internal abstract unsafe class PipeValueTaskSource : IValueTaskSource, IValueTaskSource + internal abstract unsafe class PipeValueTaskSource : IValueTaskSource, IValueTaskSource { - private const int NoResult = 0; - private const int ResultSuccess = 1; - private const int ResultError = 2; - private const int RegisteringCancellation = 4; - private const int CompletedCallback = 8; - internal static readonly IOCompletionCallback s_ioCallback = IOCallback; internal readonly PreAllocatedOverlapped _preallocatedOverlapped; - private readonly PipeStream _pipeStream; - private CancellationTokenRegistration _cancellationRegistration; - private int _errorCode; - private NativeOverlapped* _overlapped; - private MemoryHandle _pinnedMemory; - private int _state; - - protected internal ManualResetValueTaskSourceCore _source; // mutable struct; do not make this readonly - public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token); - public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _source.OnCompleted(continuation, state, token, flags); - void IValueTaskSource.GetResult(short token) => GetResultAndRelease(token); - public TResult GetResult(short token) => GetResultAndRelease(token); - - private TResult GetResultAndRelease(short token) - { - try - { - return _source.GetResult(token); - } - finally - { - // The instance is ready to be reused - _pipeStream.TryToReuse(this); - } - } - - internal short Version => _source.Version; + internal readonly PipeStream _pipeStream; + internal MemoryHandle _memoryHandle; + internal ManualResetValueTaskSourceCore _source; // mutable struct; do not make this readonly + internal NativeOverlapped* _overlapped; + internal CancellationTokenRegistration _cancellationRegistration; + /// + /// 0 when the operation hasn't been scheduled, non-zero when either the operation has completed, + /// in which case its value is a packed combination of the error code and number of bytes, or when + /// the read/write call has finished scheduling the async operation. + /// + internal ulong _result; protected PipeValueTaskSource(PipeStream pipeStream) { - Debug.Assert(pipeStream != null, "pipeStream is null"); - _pipeStream = pipeStream; - // Using RunContinuationsAsynchronously for compat reasons (old API used ThreadPool.QueueUserWorkItem for continuations) _source.RunContinuationsAsynchronously = true; _preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null); } @@ -67,161 +40,199 @@ internal void Dispose() _preallocatedOverlapped.Dispose(); } - internal NativeOverlapped* Overlapped => _overlapped; - internal void PrepareForOperation(ReadOnlyMemory memory = default) { - _state = NoResult; - _pinnedMemory = memory.Pin(); + _result = 0; + _memoryHandle = memory.Pin(); _overlapped = _pipeStream._threadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped); } - internal void RegisterForCancellation(CancellationToken cancellationToken) + public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token); + public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _source.OnCompleted(continuation, state, token, flags); + void IValueTaskSource.GetResult(short token) => GetResult(token); + public int GetResult(short token) { - // Quick check to make sure that the cancellation token supports cancellation, and that the IO hasn't completed - if (cancellationToken.CanBeCanceled && _overlapped != null) + try { - // Register the cancellation only if the IO hasn't completed - int state = Interlocked.CompareExchange(ref _state, RegisteringCancellation, NoResult); - if (state == NoResult) - { - // Register the cancellation - _cancellationRegistration = cancellationToken.UnsafeRegister(thisRef => ((PipeValueTaskSource)thisRef!).Cancel(), this); + return _source.GetResult(token); + } + finally + { + // The instance is ready to be reused + _pipeStream.TryToReuse(this); + } + } - // Grab the state for case if IO completed while we were setting the registration. - state = Interlocked.Exchange(ref _state, NoResult); - } - else if (state != CompletedCallback) + internal short Version => _source.Version; + + internal void RegisterForCancellation(CancellationToken cancellationToken) + { + Debug.Assert(_overlapped != null); + if (cancellationToken.CanBeCanceled) + { + try { - // IO already completed and we have grabbed result state. - // Set NoResult to prevent invocation of CompleteCallback(result state) from AsyncCallback(...) - state = Interlocked.Exchange(ref _state, NoResult); + _cancellationRegistration = cancellationToken.UnsafeRegister(static (s, token) => + { + PipeValueTaskSource vts = (PipeValueTaskSource)s!; + if (!vts._pipeStream.SafePipeHandle.IsInvalid) + { + try + { + Interop.Kernel32.CancelIoEx(vts._pipeStream.SafePipeHandle, vts._overlapped); + // Ignore all failures: no matter whether it succeeds or fails, completion is handled via the IOCallback. + } + catch (ObjectDisposedException) { } // in case the SafeHandle is (erroneously) closed concurrently + } + }, this); } - - // If we have the result state of completed IO call CompleteCallback(result). - // Otherwise IO not completed. - if ((state & (ResultSuccess | ResultError)) != 0) + catch (OutOfMemoryException) { - CompleteCallback(state); + // Just in case trying to register OOMs, we ignore it in order to + // protect the higher-level calling code that would proceed to unpin + // memory that might be actively used by an in-flight async operation. } } } internal void ReleaseResources() { + // Ensure that any cancellation callback has either completed or will never run, so that + // we don't try to access an overlapped for this operation after it's already been freed. _cancellationRegistration.Dispose(); - // NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory - // (this is why we disposed the registration above) + // Unpin any pinned buffer. + _memoryHandle.Dispose(); + + // Free the overlapped. if (_overlapped != null) { _pipeStream._threadPoolBinding!.FreeNativeOverlapped(_overlapped); _overlapped = null; } - - _pinnedMemory.Dispose(); } - internal abstract void SetCompletedSynchronously(); + // After calling Read/WriteFile to start the asynchronous operation, the caller may configure cancellation, + // and only after that should we allow for completing the operation, as completion needs to factor in work + // done by that cancellation registration, e.g. unregistering. As such, we use _result to both track who's + // responsible for calling Complete and for passing the necessary data between parties. - private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped) + /// Invoked when the async operation finished being scheduled. + internal void FinishedScheduling() { - var valueTaskSource = (PipeValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped); - Debug.Assert(valueTaskSource is not null); - Debug.Assert(valueTaskSource._overlapped == pOverlapped); - - valueTaskSource.AsyncCallback(errorCode, numBytes); + // Set the value to 1. If it was already non-0, then the asynchronous operation already completed but + // didn't call Complete, so we call Complete here. The read result value is the data (packed) necessary + // to make the call. + ulong result = Interlocked.Exchange(ref _result, 1); + if (result != 0) + { + Complete(errorCode: (uint)result, numBytes: (uint)(result >> 32) & 0x7FFFFFFF); + } } - protected virtual void AsyncCallback(uint errorCode, uint numBytes) + /// Invoked when the asynchronous operation has completed asynchronously. + private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped) { - int resultState; - if (errorCode == 0) + PipeValueTaskSource? vts = (PipeValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped); + Debug.Assert(vts is not null); + Debug.Assert(vts._overlapped == pOverlapped, "Overlaps don't match"); + + // Set the value to a packed combination of the error code and number of bytes (plus a high-bit 1 + // to ensure the value we're setting is non-zero). If it was already non-0 (the common case), then + // the call site already finished scheduling the async operation, in which case we're ready to complete. + Debug.Assert(numBytes < int.MaxValue); + if (Interlocked.Exchange(ref vts._result, (1ul << 63) | ((ulong)numBytes << 32) | errorCode) != 0) { - resultState = ResultSuccess; - } - else - { - resultState = ResultError; - _errorCode = (int)errorCode; - } - - // Store the result so that other threads can observe it - // and if no other thread is registering cancellation, continue. - // Otherwise CompleteCallback(resultState) will be invoked by RegisterForCancellation(). - if (Interlocked.Exchange(ref _state, resultState) == NoResult) - { - // Now try to prevent invocation of CompleteCallback(resultState) from RegisterForCancellation(). - // Otherwise, thread responsible for registering cancellation stole the result and it will invoke CompleteCallback(resultState). - if (Interlocked.Exchange(ref _state, CompletedCallback) != NoResult) - { - CompleteCallback(resultState); - } + vts.Complete(errorCode, numBytes); } } - protected abstract void HandleError(int errorCode); - - private void Cancel() + private void Complete(uint errorCode, uint numBytes) { - SafeHandle handle = _pipeStream._threadPoolBinding!.Handle; - NativeOverlapped* overlapped = _overlapped; - - if (!handle.IsInvalid) - { - try - { - // If the handle is still valid, attempt to cancel the IO - if (!Interop.Kernel32.CancelIoEx(handle, overlapped)) - { - // This case should not have any consequences although - // it will be easier to debug if there exists any special case - // we are not aware of. - int errorCode = Marshal.GetLastPInvokeError(); - Debug.WriteLine("CancelIoEx finished with error code {0}.", errorCode); - } - } - catch (ObjectDisposedException) { } // in case the SafeHandle is (erroneously) closed concurrently - } + ReleaseResources(); + CompleteCore(errorCode, numBytes); } - protected virtual void HandleUnexpectedCancellation() => SetCanceled(); + private protected abstract void CompleteCore(uint errorCode, uint numBytes); + } - private void CompleteCallback(int resultState) - { - Debug.Assert(resultState == ResultSuccess || resultState == ResultError, "Unexpected result state " + resultState); - CancellationToken cancellationToken = _cancellationRegistration.Token; + internal sealed class ReadWriteValueTaskSource : PipeValueTaskSource + { + internal readonly bool _isWrite; - ReleaseResources(); + internal ReadWriteValueTaskSource(PipeStream stream, bool isWrite) : base(stream) => _isWrite = isWrite; - if (resultState == ResultError) + private protected override void CompleteCore(uint errorCode, uint numBytes) + { + if (!_isWrite) { - if (_errorCode == Interop.Errors.ERROR_OPERATION_ABORTED) - { - if (cancellationToken.CanBeCanceled && !cancellationToken.IsCancellationRequested) - { - HandleUnexpectedCancellation(); - } - else - { - // otherwise set canceled - SetCanceled(cancellationToken); - } - } - else + bool messageCompletion = true; + + switch (errorCode) { - HandleError(_errorCode); + case Interop.Errors.ERROR_BROKEN_PIPE: + case Interop.Errors.ERROR_PIPE_NOT_CONNECTED: + case Interop.Errors.ERROR_NO_DATA: + errorCode = 0; + break; + + case Interop.Errors.ERROR_MORE_DATA: + errorCode = 0; + messageCompletion = false; + break; } + + _pipeStream.UpdateMessageCompletion(messageCompletion); } - else + + switch (errorCode) { - SetCompletedSynchronously(); + case 0: + // Success + _source.SetResult((int)numBytes); + break; + + case Interop.Errors.ERROR_OPERATION_ABORTED: + // Cancellation + CancellationToken ct = _cancellationRegistration.Token; + _source.SetException(ct.IsCancellationRequested ? new OperationCanceledException(ct) : new OperationCanceledException()); + break; + + default: + // Failure + _source.SetException(_pipeStream.WinIOError((int)errorCode)); + break; } } + } + + internal sealed class ConnectionValueTaskSource : PipeValueTaskSource + { + internal ConnectionValueTaskSource(NamedPipeServerStream server) : base(server) { } - protected void SetResult(TResult result) => _source.SetResult(result); - protected void SetException(Exception exception) => _source.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(exception)); - protected void SetCanceled(CancellationToken cancellationToken = default) => SetException(new OperationCanceledException(cancellationToken)); + private protected override void CompleteCore(uint errorCode, uint numBytes) + { + switch (errorCode) + { + case 0: + case Interop.Errors.ERROR_PIPE_CONNECTED: // special case for when the client has already connected to us + // Success + _pipeStream.State = PipeState.Connected; + _source.SetResult((int)numBytes); + break; + + case Interop.Errors.ERROR_OPERATION_ABORTED: + // Cancellation + CancellationToken ct = _cancellationRegistration.Token; + _source.SetException(ct.CanBeCanceled && !ct.IsCancellationRequested ? Error.GetOperationAborted() : new OperationCanceledException(ct)); + break; + + default: + // Failure + _source.SetException(Win32Marshal.GetExceptionForWin32Error((int)errorCode)); + break; + } + } } } } diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs index d874e2ded61fc..1a1134c7f1503 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs @@ -2,12 +2,12 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; +using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; +using System.Runtime.Versioning; using System.Threading; using System.Threading.Tasks; using Microsoft.Win32.SafeHandles; -using System.Runtime.Versioning; namespace System.IO.Pipes { @@ -260,17 +260,14 @@ private void InitializeAsyncHandle(SafePipeHandle handle) _threadPoolBinding = ThreadPoolBoundHandle.BindHandle(handle); } - internal virtual void TryToReuse(PipeValueTaskSource source) + internal virtual void TryToReuse(PipeValueTaskSource source) { source._source.Reset(); if (source is ReadWriteValueTaskSource readWriteSource) { - ReadWriteValueTaskSource? vts = readWriteSource.IsWrite - ? Interlocked.CompareExchange(ref _reusableWriteValueTaskSource, readWriteSource, null) - : Interlocked.CompareExchange(ref _reusableReadValueTaskSource, readWriteSource, null); - - if (vts is not null) + ref ReadWriteValueTaskSource? field = ref readWriteSource._isWrite ? ref _reusableWriteValueTaskSource : ref _reusableReadValueTaskSource; + if (Interlocked.CompareExchange(ref field, readWriteSource, null) is not null) { source._preallocatedOverlapped.Dispose(); } @@ -289,61 +286,68 @@ private void DisposeCore(bool disposing) private unsafe int ReadCore(Span buffer) { - int errorCode = 0; - int r = ReadFileNative(_handle!, buffer, null, out errorCode); + DebugAssertHandleValid(_handle!); + Debug.Assert(!_isAsync); - if (r == -1) + if (buffer.Length == 0) { - // If the other side has broken the connection, set state to Broken and return 0 - if (errorCode == Interop.Errors.ERROR_BROKEN_PIPE || - errorCode == Interop.Errors.ERROR_PIPE_NOT_CONNECTED) + return 0; + } + + fixed (byte* p = &MemoryMarshal.GetReference(buffer)) + { + int bytesRead = 0; + if (Interop.Kernel32.ReadFile(_handle!, p, buffer.Length, out bytesRead, IntPtr.Zero) != 0) { - State = PipeState.Broken; - r = 0; + _isMessageComplete = true; + return bytesRead; } else { - throw Win32Marshal.GetExceptionForWin32Error(errorCode, string.Empty); - } - } - _isMessageComplete = (errorCode != Interop.Errors.ERROR_MORE_DATA); + int errorCode = Marshal.GetLastPInvokeError(); + _isMessageComplete = errorCode != Interop.Errors.ERROR_MORE_DATA; + switch (errorCode) + { + case Interop.Errors.ERROR_MORE_DATA: + return bytesRead; - Debug.Assert(r >= 0, "PipeStream's ReadCore is likely broken."); + case Interop.Errors.ERROR_BROKEN_PIPE: + case Interop.Errors.ERROR_PIPE_NOT_CONNECTED: + State = PipeState.Broken; + return 0; - return r; + default: + throw Win32Marshal.GetExceptionForWin32Error(errorCode, string.Empty); + } + } + } } - private ValueTask ReadAsyncCore(Memory buffer, CancellationToken cancellationToken) + private unsafe ValueTask ReadAsyncCore(Memory buffer, CancellationToken cancellationToken) { - var valueTaskSource = Interlocked.Exchange(ref _reusableReadValueTaskSource, null) ?? new ReadWriteValueTaskSource(this, isWrite: false); + Debug.Assert(_isAsync); + + ReadWriteValueTaskSource vts = Interlocked.Exchange(ref _reusableReadValueTaskSource, null) ?? new ReadWriteValueTaskSource(this, isWrite: false); try { - valueTaskSource.PrepareForOperation(buffer); - // Queue an async ReadFile operation and pass in a packed overlapped - int errorCode = 0; - int r; - unsafe - { - r = ReadFileNative(_handle!, buffer.Span, valueTaskSource.Overlapped, out errorCode); - } + vts.PrepareForOperation(buffer); + Debug.Assert(vts._memoryHandle.Pointer != null); - // ReadFile, the OS version, will return 0 on failure, but this ReadFileNative wrapper - // returns -1. This will return the following: - // - On error, r==-1. - // - On async requests that are still pending, r==-1 w/ hr==ERROR_IO_PENDING - // - On async requests that completed sequentially, r==0 - // - // You will NEVER RELIABLY be able to get the number of buffer read back from this call - // when using overlapped structures! You must not pass in a non-null lpNumBytesRead to - // ReadFile when using overlapped structures! This is by design NT behavior. - if (r == -1) + // Queue an async ReadFile operation. + if (Interop.Kernel32.ReadFile(_handle!, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, vts._overlapped) == 0) { + // The operation failed, or it's pending. + int errorCode = Marshal.GetLastPInvokeError(); switch (errorCode) { case Interop.Errors.ERROR_IO_PENDING: // Common case: IO was initiated, completion will be handled by callback. // Register for cancellation now that the operation has been initiated. - valueTaskSource.RegisterForCancellation(cancellationToken); + vts.RegisterForCancellation(cancellationToken); + break; + + case Interop.Errors.ERROR_MORE_DATA: + // The operation is completing asynchronously but there's nothing to cancel. break; // One side has closed its handle or server disconnected. @@ -351,87 +355,87 @@ private ValueTask ReadAsyncCore(Memory buffer, CancellationToken canc case Interop.Errors.ERROR_BROKEN_PIPE: case Interop.Errors.ERROR_PIPE_NOT_CONNECTED: State = PipeState.Broken; - - unsafe - { - // Clear the overlapped status bit for this special case. Failure to do so looks - // like we are freeing a pending overlapped. - valueTaskSource.Overlapped->InternalLow = IntPtr.Zero; - } - - valueTaskSource.Dispose(); + vts._overlapped->InternalLow = IntPtr.Zero; + vts.Dispose(); UpdateMessageCompletion(true); return new ValueTask(0); default: // Error. Callback will not be called. - valueTaskSource.Dispose(); + vts.Dispose(); return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode)); } } } catch { - valueTaskSource.Dispose(); + vts.Dispose(); throw; } - return new ValueTask(valueTaskSource, valueTaskSource.Version); + vts.FinishedScheduling(); + return new ValueTask(vts, vts.Version); } private unsafe void WriteCore(ReadOnlySpan buffer) { - int errorCode = 0; - int r = WriteFileNative(_handle!, buffer, null, out errorCode); + DebugAssertHandleValid(_handle!); + Debug.Assert(!_isAsync); + + if (buffer.Length == 0) + { + return; + } - if (r == -1) + fixed (byte* p = &MemoryMarshal.GetReference(buffer)) { - throw WinIOError(errorCode); + int bytesWritten = 0; + if (Interop.Kernel32.WriteFile(_handle!, p, buffer.Length, out bytesWritten, IntPtr.Zero) == 0) + { + throw WinIOError(Marshal.GetLastPInvokeError()); + } } - Debug.Assert(r >= 0, "PipeStream's WriteCore is likely broken."); } - private ValueTask WriteAsyncCore(ReadOnlyMemory buffer, CancellationToken cancellationToken) + private unsafe ValueTask WriteAsyncCore(ReadOnlyMemory buffer, CancellationToken cancellationToken) { - var valueTaskSource = Interlocked.Exchange(ref _reusableWriteValueTaskSource, null) ?? new ReadWriteValueTaskSource(this, isWrite: true); + Debug.Assert(_isAsync); + ReadWriteValueTaskSource vts = Interlocked.Exchange(ref _reusableWriteValueTaskSource, null) ?? new ReadWriteValueTaskSource(this, isWrite: true); try { - valueTaskSource.PrepareForOperation(buffer); - int errorCode = 0; + vts.PrepareForOperation(buffer); + Debug.Assert(vts._memoryHandle.Pointer != null); - // Queue an async WriteFile operation and pass in a packed overlapped - int r; - unsafe + // Queue an async WriteFile operation. + if (Interop.Kernel32.WriteFile(_handle!, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, vts._overlapped) == 0) { - r = WriteFileNative(_handle!, buffer.Span, valueTaskSource.Overlapped, out errorCode); - } + // The operation failed, or it's pending. + int errorCode = Marshal.GetLastPInvokeError(); + switch (errorCode) + { + case Interop.Errors.ERROR_IO_PENDING: + // Common case: IO was initiated, completion will be handled by callback. + // Register for cancellation now that the operation has been initiated. + vts.RegisterForCancellation(cancellationToken); + break; - // WriteFile, the OS version, will return 0 on failure, but this WriteFileNative - // wrapper returns -1. This will return the following: - // - On error, r==-1. - // - On async requests that are still pending, r==-1 w/ hr==ERROR_IO_PENDING - // - On async requests that completed sequentially, r==0 - // - // You will NEVER RELIABLY be able to get the number of buffer written back from this - // call when using overlapped structures! You must not pass in a non-null - // lpNumBytesWritten to WriteFile when using overlapped structures! This is by design - // NT behavior. - if (r == -1 && errorCode != Interop.Errors.ERROR_IO_PENDING) - { - valueTaskSource.Dispose(); - return ValueTask.FromException(WinIOError(errorCode)); + default: + // Error. Callback will not be invoked. + vts.Dispose(); + return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(WinIOError(errorCode))); + } } - - valueTaskSource.RegisterForCancellation(cancellationToken); } catch { - valueTaskSource.Dispose(); + vts.Dispose(); throw; } - return new ValueTask(valueTaskSource, valueTaskSource.Version); + // Completion handled by callback. + vts.FinishedScheduling(); + return new ValueTask(vts, vts.Version); } // Blocks until the other end of the pipe has read in all written buffer. @@ -573,77 +577,6 @@ public virtual PipeTransmissionMode ReadMode } } - private unsafe int ReadFileNative(SafePipeHandle handle, Span buffer, NativeOverlapped* overlapped, out int errorCode) - { - DebugAssertHandleValid(handle); - Debug.Assert((_isAsync && overlapped != null) || (!_isAsync && overlapped == null), "Async IO parameter screwup in call to ReadFileNative."); - - // Note that async callers check to avoid calling this first, so they can call user's callback. - if (buffer.Length == 0) - { - errorCode = 0; - return 0; - } - - int r = 0; - int numBytesRead = 0; - - fixed (byte* p = &MemoryMarshal.GetReference(buffer)) - { - r = _isAsync ? - Interop.Kernel32.ReadFile(handle, p, buffer.Length, IntPtr.Zero, overlapped) : - Interop.Kernel32.ReadFile(handle, p, buffer.Length, out numBytesRead, IntPtr.Zero); - } - - if (r == 0) - { - // In message mode, the ReadFile can inform us that there is more data to come. - errorCode = Marshal.GetLastPInvokeError(); - return errorCode == Interop.Errors.ERROR_MORE_DATA ? - numBytesRead : - -1; - } - else - { - errorCode = 0; - return numBytesRead; - } - } - - private unsafe int WriteFileNative(SafePipeHandle handle, ReadOnlySpan buffer, NativeOverlapped* overlapped, out int errorCode) - { - DebugAssertHandleValid(handle); - Debug.Assert((_isAsync && overlapped != null) || (!_isAsync && overlapped == null), "Async IO parameter screwup in call to WriteFileNative."); - - // Note that async callers check to avoid calling this first, so they can call user's callback. - if (buffer.Length == 0) - { - errorCode = 0; - return 0; - } - - int r = 0; - int numBytesWritten = 0; - - fixed (byte* p = &MemoryMarshal.GetReference(buffer)) - { - r = _isAsync ? - Interop.Kernel32.WriteFile(handle, p, buffer.Length, IntPtr.Zero, overlapped) : - Interop.Kernel32.WriteFile(handle, p, buffer.Length, out numBytesWritten, IntPtr.Zero); - } - - if (r == 0) - { - errorCode = Marshal.GetLastPInvokeError(); - return -1; - } - else - { - errorCode = 0; - return numBytesWritten; - } - } - internal static unsafe Interop.Kernel32.SECURITY_ATTRIBUTES GetSecAttrs(HandleInheritability inheritability) { Interop.Kernel32.SECURITY_ATTRIBUTES secAttrs = new Interop.Kernel32.SECURITY_ATTRIBUTES diff --git a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs index 1ba3516c42a9f..9ef47816836dc 100644 --- a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs @@ -142,14 +142,15 @@ internal void RegisterForCancellation(CancellationToken cancellationToken) private void ReleaseResources() { _strategy = null; - // Unpin any pinned buffer. - _memoryHandle.Dispose(); // Ensure that any cancellation callback has either completed or will never run, // so that we don't try to access an overlapped for this operation after it's already // been freed. _cancellationRegistration.Dispose(); + // Unpin any pinned buffer. + _memoryHandle.Dispose(); + // Free the overlapped. if (_overlapped != null) {