diff --git a/src/Common/src/CoreLib/System.Private.CoreLib.Shared.projitems b/src/Common/src/CoreLib/System.Private.CoreLib.Shared.projitems
index 8c7464f0eb99..a86021a11e50 100644
--- a/src/Common/src/CoreLib/System.Private.CoreLib.Shared.projitems
+++ b/src/Common/src/CoreLib/System.Private.CoreLib.Shared.projitems
@@ -233,6 +233,7 @@
+
diff --git a/src/Common/src/CoreLib/System/IO/Stream.cs b/src/Common/src/CoreLib/System/IO/Stream.cs
new file mode 100644
index 000000000000..1906a434fecf
--- /dev/null
+++ b/src/Common/src/CoreLib/System/IO/Stream.cs
@@ -0,0 +1,1331 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+/*============================================================
+**
+**
+**
+**
+**
+** Purpose: Abstract base class for all Streams. Provides
+** default implementations of asynchronous reads & writes, in
+** terms of the synchronous reads & writes (and vice versa).
+**
+**
+===========================================================*/
+
+using System;
+using System.Buffers;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Runtime;
+using System.Runtime.InteropServices;
+using System.Runtime.CompilerServices;
+using System.Runtime.ExceptionServices;
+using System.Security;
+using System.Diagnostics;
+using System.Reflection;
+
+namespace System.IO
+{
+ public abstract partial class Stream : MarshalByRefObject, IDisposable
+ {
+ public static readonly Stream Null = new NullStream();
+
+ //We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
+ // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
+ // improvement in Copy performance.
+ private const int DefaultCopyBufferSize = 81920;
+
+ // To implement Async IO operations on streams that don't support async IO
+
+ private ReadWriteTask _activeReadWriteTask;
+ private SemaphoreSlim _asyncActiveSemaphore;
+
+ internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
+ {
+ // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
+ // WaitHandle, we don't need to worry about Disposing it.
+ return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
+ }
+
+ public abstract bool CanRead
+ {
+ get;
+ }
+
+ // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
+ public abstract bool CanSeek
+ {
+ get;
+ }
+
+ public virtual bool CanTimeout
+ {
+ get
+ {
+ return false;
+ }
+ }
+
+ public abstract bool CanWrite
+ {
+ get;
+ }
+
+ public abstract long Length
+ {
+ get;
+ }
+
+ public abstract long Position
+ {
+ get;
+ set;
+ }
+
+ public virtual int ReadTimeout
+ {
+ get
+ {
+ throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
+ }
+ set
+ {
+ throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
+ }
+ }
+
+ public virtual int WriteTimeout
+ {
+ get
+ {
+ throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
+ }
+ set
+ {
+ throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
+ }
+ }
+
+ public Task CopyToAsync(Stream destination)
+ {
+ int bufferSize = GetCopyBufferSize();
+
+ return CopyToAsync(destination, bufferSize);
+ }
+
+ public Task CopyToAsync(Stream destination, Int32 bufferSize)
+ {
+ return CopyToAsync(destination, bufferSize, CancellationToken.None);
+ }
+
+ public Task CopyToAsync(Stream destination, CancellationToken cancellationToken)
+ {
+ int bufferSize = GetCopyBufferSize();
+
+ return CopyToAsync(destination, bufferSize, cancellationToken);
+ }
+
+ public virtual Task CopyToAsync(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
+ {
+ StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
+
+ return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
+ }
+
+ private async Task CopyToAsyncInternal(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
+ {
+ byte[] buffer = ArrayPool.Shared.Rent(bufferSize);
+ try
+ {
+ while (true)
+ {
+ int bytesRead = await ReadAsync(new Memory(buffer), cancellationToken).ConfigureAwait(false);
+ if (bytesRead == 0) break;
+ await destination.WriteAsync(new ReadOnlyMemory(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false);
+ }
+ }
+ finally
+ {
+ ArrayPool.Shared.Return(buffer);
+ }
+ }
+
+ // Reads the bytes from the current stream and writes the bytes to
+ // the destination stream until all bytes are read, starting at
+ // the current position.
+ public void CopyTo(Stream destination)
+ {
+ int bufferSize = GetCopyBufferSize();
+
+ CopyTo(destination, bufferSize);
+ }
+
+ public virtual void CopyTo(Stream destination, int bufferSize)
+ {
+ StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
+
+ byte[] buffer = ArrayPool.Shared.Rent(bufferSize);
+ try
+ {
+ int read;
+ while ((read = Read(buffer, 0, buffer.Length)) != 0)
+ {
+ destination.Write(buffer, 0, read);
+ }
+ }
+ finally
+ {
+ ArrayPool.Shared.Return(buffer);
+ }
+ }
+
+ private int GetCopyBufferSize()
+ {
+ int bufferSize = DefaultCopyBufferSize;
+
+ if (CanSeek)
+ {
+ long length = Length;
+ long position = Position;
+ if (length <= position) // Handles negative overflows
+ {
+ // There are no bytes left in the stream to copy.
+ // However, because CopyTo{Async} is virtual, we need to
+ // ensure that any override is still invoked to provide its
+ // own validation, so we use the smallest legal buffer size here.
+ bufferSize = 1;
+ }
+ else
+ {
+ long remaining = length - position;
+ if (remaining > 0)
+ {
+ // In the case of a positive overflow, stick to the default size
+ bufferSize = (int)Math.Min(bufferSize, remaining);
+ }
+ }
+ }
+
+ return bufferSize;
+ }
+
+ // Stream used to require that all cleanup logic went into Close(),
+ // which was thought up before we invented IDisposable. However, we
+ // need to follow the IDisposable pattern so that users can write
+ // sensible subclasses without needing to inspect all their base
+ // classes, and without worrying about version brittleness, from a
+ // base class switching to the Dispose pattern. We're moving
+ // Stream to the Dispose(bool) pattern - that's where all subclasses
+ // should put their cleanup starting in V2.
+ public virtual void Close()
+ {
+ // Ideally we would assert CanRead == CanWrite == CanSeek = false,
+ // but we'd have to fix PipeStream & NetworkStream very carefully.
+
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public void Dispose()
+ {
+ // Ideally we would assert CanRead == CanWrite == CanSeek = false,
+ // but we'd have to fix PipeStream & NetworkStream very carefully.
+
+ Close();
+ }
+
+
+ protected virtual void Dispose(bool disposing)
+ {
+ // Note: Never change this to call other virtual methods on Stream
+ // like Write, since the state on subclasses has already been
+ // torn down. This is the last code to run on cleanup for a stream.
+ }
+
+ public abstract void Flush();
+
+ public Task FlushAsync()
+ {
+ return FlushAsync(CancellationToken.None);
+ }
+
+ public virtual Task FlushAsync(CancellationToken cancellationToken)
+ {
+ return Task.Factory.StartNew(state => ((Stream)state).Flush(), this,
+ cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
+ }
+
+ [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
+ protected virtual WaitHandle CreateWaitHandle()
+ {
+ return new ManualResetEvent(false);
+ }
+
+ public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
+ {
+ return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
+ }
+
+ internal IAsyncResult BeginReadInternal(
+ byte[] buffer, int offset, int count, AsyncCallback callback, Object state,
+ bool serializeAsynchronously, bool apm)
+ {
+ if (!CanRead) throw Error.GetReadNotSupported();
+
+ // To avoid a race with a stream's position pointer & generating race conditions
+ // with internal buffer indexes in our own streams that
+ // don't natively support async IO operations when there are multiple
+ // async requests outstanding, we will block the application's main
+ // thread if it does a second IO request until the first one completes.
+ var semaphore = EnsureAsyncActiveSemaphoreInitialized();
+ Task semaphoreTask = null;
+ if (serializeAsynchronously)
+ {
+ semaphoreTask = semaphore.WaitAsync();
+ }
+ else
+ {
+ semaphore.Wait();
+ }
+
+ // Create the task to asynchronously do a Read. This task serves both
+ // as the asynchronous work item and as the IAsyncResult returned to the user.
+ var asyncResult = new ReadWriteTask(true /*isRead*/, apm, delegate
+ {
+ // The ReadWriteTask stores all of the parameters to pass to Read.
+ // As we're currently inside of it, we can get the current task
+ // and grab the parameters from it.
+ var thisTask = Task.InternalCurrent as ReadWriteTask;
+ Debug.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
+
+ try
+ {
+ // Do the Read and return the number of bytes read
+ return thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
+ }
+ finally
+ {
+ // If this implementation is part of Begin/EndXx, then the EndXx method will handle
+ // finishing the async operation. However, if this is part of XxAsync, then there won't
+ // be an end method, and this task is responsible for cleaning up.
+ if (!thisTask._apm)
+ {
+ thisTask._stream.FinishTrackingAsyncOperation();
+ }
+
+ thisTask.ClearBeginState(); // just to help alleviate some memory pressure
+ }
+ }, state, this, buffer, offset, count, callback);
+
+ // Schedule it
+ if (semaphoreTask != null)
+ RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
+ else
+ RunReadWriteTask(asyncResult);
+
+
+ return asyncResult; // return it
+ }
+
+ public virtual int EndRead(IAsyncResult asyncResult)
+ {
+ if (asyncResult == null)
+ throw new ArgumentNullException(nameof(asyncResult));
+
+ var readTask = _activeReadWriteTask;
+
+ if (readTask == null)
+ {
+ throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
+ }
+ else if (readTask != asyncResult)
+ {
+ throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
+ }
+ else if (!readTask._isRead)
+ {
+ throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
+ }
+
+ try
+ {
+ return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
+ }
+ finally
+ {
+ FinishTrackingAsyncOperation();
+ }
+ }
+
+ public Task ReadAsync(Byte[] buffer, int offset, int count)
+ {
+ return ReadAsync(buffer, offset, count, CancellationToken.None);
+ }
+
+ public virtual Task ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ // If cancellation was requested, bail early with an already completed task.
+ // Otherwise, return a task that represents the Begin/End methods.
+ return cancellationToken.IsCancellationRequested
+ ? Task.FromCanceled(cancellationToken)
+ : BeginEndReadAsync(buffer, offset, count);
+ }
+
+ public virtual ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default)
+ {
+ if (MemoryMarshal.TryGetArray(buffer, out ArraySegment array))
+ {
+ return new ValueTask(ReadAsync(array.Array, array.Offset, array.Count, cancellationToken));
+ }
+ else
+ {
+ byte[] sharedBuffer = ArrayPool.Shared.Rent(buffer.Length);
+ return FinishReadAsync(ReadAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer, buffer);
+
+ async ValueTask FinishReadAsync(Task readTask, byte[] localBuffer, Memory localDestination)
+ {
+ try
+ {
+ int result = await readTask.ConfigureAwait(false);
+ new Span(localBuffer, 0, result).CopyTo(localDestination.Span);
+ return result;
+ }
+ finally
+ {
+ ArrayPool.Shared.Return(localBuffer);
+ }
+ }
+ }
+ }
+
+ private Task BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
+ {
+ if (!HasOverriddenBeginEndRead())
+ {
+ // If the Stream does not override Begin/EndRead, then we can take an optimized path
+ // that skips an extra layer of tasks / IAsyncResults.
+ return (Task)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
+ }
+
+ // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
+ return TaskFactory.FromAsyncTrim(
+ this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
+ (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
+ (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
+ }
+
+ private struct ReadWriteParameters // struct for arguments to Read and Write calls
+ {
+ internal byte[] Buffer;
+ internal int Offset;
+ internal int Count;
+ }
+
+
+
+ public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
+ {
+ return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
+ }
+
+ internal IAsyncResult BeginWriteInternal(
+ byte[] buffer, int offset, int count, AsyncCallback callback, Object state,
+ bool serializeAsynchronously, bool apm)
+ {
+ if (!CanWrite) throw Error.GetWriteNotSupported();
+
+ // To avoid a race condition with a stream's position pointer & generating conditions
+ // with internal buffer indexes in our own streams that
+ // don't natively support async IO operations when there are multiple
+ // async requests outstanding, we will block the application's main
+ // thread if it does a second IO request until the first one completes.
+ var semaphore = EnsureAsyncActiveSemaphoreInitialized();
+ Task semaphoreTask = null;
+ if (serializeAsynchronously)
+ {
+ semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
+ }
+ else
+ {
+ semaphore.Wait(); // synchronously wait here
+ }
+
+ // Create the task to asynchronously do a Write. This task serves both
+ // as the asynchronous work item and as the IAsyncResult returned to the user.
+ var asyncResult = new ReadWriteTask(false /*isRead*/, apm, delegate
+ {
+ // The ReadWriteTask stores all of the parameters to pass to Write.
+ // As we're currently inside of it, we can get the current task
+ // and grab the parameters from it.
+ var thisTask = Task.InternalCurrent as ReadWriteTask;
+ Debug.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
+
+ try
+ {
+ // Do the Write
+ thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
+ return 0; // not used, but signature requires a value be returned
+ }
+ finally
+ {
+ // If this implementation is part of Begin/EndXx, then the EndXx method will handle
+ // finishing the async operation. However, if this is part of XxAsync, then there won't
+ // be an end method, and this task is responsible for cleaning up.
+ if (!thisTask._apm)
+ {
+ thisTask._stream.FinishTrackingAsyncOperation();
+ }
+
+ thisTask.ClearBeginState(); // just to help alleviate some memory pressure
+ }
+ }, state, this, buffer, offset, count, callback);
+
+ // Schedule it
+ if (semaphoreTask != null)
+ RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
+ else
+ RunReadWriteTask(asyncResult);
+
+ return asyncResult; // return it
+ }
+
+ private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
+ {
+ Debug.Assert(readWriteTask != null);
+ Debug.Assert(asyncWaiter != null);
+
+ // If the wait has already completed, run the task.
+ if (asyncWaiter.IsCompleted)
+ {
+ Debug.Assert(asyncWaiter.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
+ RunReadWriteTask(readWriteTask);
+ }
+ else // Otherwise, wait for our turn, and then run the task.
+ {
+ asyncWaiter.ContinueWith((t, state) =>
+ {
+ Debug.Assert(t.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
+ var rwt = (ReadWriteTask)state;
+ rwt._stream.RunReadWriteTask(rwt); // RunReadWriteTask(readWriteTask);
+ }, readWriteTask, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
+ }
+ }
+
+ private void RunReadWriteTask(ReadWriteTask readWriteTask)
+ {
+ Debug.Assert(readWriteTask != null);
+ Debug.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
+
+ // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
+ // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
+ // two interlocked operations. However, if ReadWriteTask is ever changed to use
+ // a cancellation token, this should be changed to use Start.
+ _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
+ readWriteTask.m_taskScheduler = TaskScheduler.Default;
+ readWriteTask.ScheduleAndStart(needsProtection: false);
+ }
+
+ private void FinishTrackingAsyncOperation()
+ {
+ _activeReadWriteTask = null;
+ Debug.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
+ _asyncActiveSemaphore.Release();
+ }
+
+ public virtual void EndWrite(IAsyncResult asyncResult)
+ {
+ if (asyncResult == null)
+ throw new ArgumentNullException(nameof(asyncResult));
+
+ var writeTask = _activeReadWriteTask;
+ if (writeTask == null)
+ {
+ throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
+ }
+ else if (writeTask != asyncResult)
+ {
+ throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
+ }
+ else if (writeTask._isRead)
+ {
+ throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
+ }
+
+ try
+ {
+ writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
+ Debug.Assert(writeTask.Status == TaskStatus.RanToCompletion);
+ }
+ finally
+ {
+ FinishTrackingAsyncOperation();
+ }
+ }
+
+ // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
+ // A single instance of this task serves four purposes:
+ // 1. The work item scheduled to run the Read / Write operation
+ // 2. The state holding the arguments to be passed to Read / Write
+ // 3. The IAsyncResult returned from BeginRead / BeginWrite
+ // 4. The completion action that runs to invoke the user-provided callback.
+ // This last item is a bit tricky. Before the AsyncCallback is invoked, the
+ // IAsyncResult must have completed, so we can't just invoke the handler
+ // from within the task, since it is the IAsyncResult, and thus it's not
+ // yet completed. Instead, we use AddCompletionAction to install this
+ // task as its own completion handler. That saves the need to allocate
+ // a separate completion handler, it guarantees that the task will
+ // have completed by the time the handler is invoked, and it allows
+ // the handler to be invoked synchronously upon the completion of the
+ // task. This all enables BeginRead / BeginWrite to be implemented
+ // with a single allocation.
+ private sealed class ReadWriteTask : Task, ITaskCompletionAction
+ {
+ internal readonly bool _isRead;
+ internal readonly bool _apm; // true if this is from Begin/EndXx; false if it's from XxAsync
+ internal Stream _stream;
+ internal byte[] _buffer;
+ internal readonly int _offset;
+ internal readonly int _count;
+ private AsyncCallback _callback;
+ private ExecutionContext _context;
+
+ internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
+ {
+ _stream = null;
+ _buffer = null;
+ }
+
+ public ReadWriteTask(
+ bool isRead,
+ bool apm,
+ Func