From b151fe128b37f379cb55258e6cbdcca4dbad3c82 Mon Sep 17 00:00:00 2001 From: Bruce Wayne Date: Wed, 1 Sep 2021 13:49:38 +0800 Subject: [PATCH] feat: Add WebSocket pipe --- Pipelines.Extensions/PipeReaderExtensions.cs | 8 ++ Pipelines.Extensions/PipeWriterExtensions.cs | 8 ++ Pipelines.Extensions/PipelinesExtensions.cs | 15 ++- .../WebSocketPipe/WebSocketPipeReader.cs | 101 ++++++++++++++++++ .../WebSocketPipeReaderOptions.cs | 19 ++++ .../WebSocketPipe/WebSocketPipeWriter.cs | 82 ++++++++++++++ .../WebSocketPipeWriterOptions.cs | 16 +++ 7 files changed, 247 insertions(+), 2 deletions(-) create mode 100644 Pipelines.Extensions/WebSocketPipe/WebSocketPipeReader.cs create mode 100644 Pipelines.Extensions/WebSocketPipe/WebSocketPipeReaderOptions.cs create mode 100644 Pipelines.Extensions/WebSocketPipe/WebSocketPipeWriter.cs create mode 100644 Pipelines.Extensions/WebSocketPipe/WebSocketPipeWriterOptions.cs diff --git a/Pipelines.Extensions/PipeReaderExtensions.cs b/Pipelines.Extensions/PipeReaderExtensions.cs index 8350c9b..ec5a60f 100644 --- a/Pipelines.Extensions/PipeReaderExtensions.cs +++ b/Pipelines.Extensions/PipeReaderExtensions.cs @@ -1,8 +1,10 @@ using Microsoft; using Pipelines.Extensions.SocketPipe; +using Pipelines.Extensions.WebSocketPipe; using System; using System.IO.Pipelines; using System.Net.Sockets; +using System.Net.WebSockets; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -119,5 +121,11 @@ public static PipeReader AsPipeReader(this Socket socket, SocketPipeReaderOption { return new SocketPipeReader(socket, options ?? SocketPipeReaderOptions.Default); } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static PipeReader AsPipeReader(this WebSocket webSocket, WebSocketPipeReaderOptions? options = null) + { + return new WebSocketPipeReader(webSocket, options ?? WebSocketPipeReaderOptions.Default); + } } } diff --git a/Pipelines.Extensions/PipeWriterExtensions.cs b/Pipelines.Extensions/PipeWriterExtensions.cs index 9167422..4cc6ff6 100644 --- a/Pipelines.Extensions/PipeWriterExtensions.cs +++ b/Pipelines.Extensions/PipeWriterExtensions.cs @@ -1,8 +1,10 @@ using Pipelines.Extensions.SocketPipe; +using Pipelines.Extensions.WebSocketPipe; using System; using System.Buffers; using System.IO.Pipelines; using System.Net.Sockets; +using System.Net.WebSockets; using System.Runtime.CompilerServices; using System.Text; using System.Threading; @@ -93,5 +95,11 @@ public static PipeWriter AsPipeWriter(this Socket socket, SocketPipeWriterOption { return new SocketPipeWriter(socket, options ?? SocketPipeWriterOptions.Default); } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static PipeWriter AsPipeWriter(this WebSocket webSocket, WebSocketPipeWriterOptions? options = null) + { + return new WebSocketPipeWriter(webSocket, options ?? WebSocketPipeWriterOptions.Default); + } } } diff --git a/Pipelines.Extensions/PipelinesExtensions.cs b/Pipelines.Extensions/PipelinesExtensions.cs index 9b0f5bc..5f24fcf 100644 --- a/Pipelines.Extensions/PipelinesExtensions.cs +++ b/Pipelines.Extensions/PipelinesExtensions.cs @@ -1,8 +1,10 @@ using Microsoft; using Pipelines.Extensions.SocketPipe; +using Pipelines.Extensions.WebSocketPipe; using System.IO; using System.IO.Pipelines; using System.Net.Sockets; +using System.Net.WebSockets; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -39,12 +41,21 @@ public static IDuplexPipe AsDuplexPipe(this Socket socket, SocketPipeReaderOptions? readerOptions = null, SocketPipeWriterOptions? writerOptions = null) { - Requires.Argument(socket.Connected, nameof(socket), @"Socket must be connected."); - var reader = socket.AsPipeReader(readerOptions); var writer = socket.AsPipeWriter(writerOptions); return DefaultDuplexPipe.Create(reader, writer); } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static IDuplexPipe AsDuplexPipe(this WebSocket webSocket, + WebSocketPipeReaderOptions? readerOptions = null, + WebSocketPipeWriterOptions? writerOptions = null) + { + var reader = webSocket.AsPipeReader(readerOptions); + var writer = webSocket.AsPipeWriter(writerOptions); + + return DefaultDuplexPipe.Create(reader, writer); + } } } diff --git a/Pipelines.Extensions/WebSocketPipe/WebSocketPipeReader.cs b/Pipelines.Extensions/WebSocketPipe/WebSocketPipeReader.cs new file mode 100644 index 0000000..4327ea6 --- /dev/null +++ b/Pipelines.Extensions/WebSocketPipe/WebSocketPipeReader.cs @@ -0,0 +1,101 @@ +using Microsoft; +using Microsoft.VisualStudio.Threading; +using System; +using System.IO.Pipelines; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace Pipelines.Extensions.WebSocketPipe +{ + internal sealed class WebSocketPipeReader : PipeReader + { + public WebSocket InternalWebSocket { get; } + + private readonly WebSocketPipeReaderOptions _options; + private readonly Pipe _pipe; + private PipeWriter Writer => _pipe.Writer; + private PipeReader Reader => _pipe.Reader; + + private readonly CancellationTokenSource _cancellationTokenSource; + + public WebSocketPipeReader(WebSocket webSocket, WebSocketPipeReaderOptions options) + { + Requires.NotNull(webSocket, nameof(webSocket)); + Requires.NotNull(options, nameof(options)); + + InternalWebSocket = webSocket; + _options = options; + _pipe = new Pipe(options.PipeOptions); + _cancellationTokenSource = new CancellationTokenSource(); + + WrapWriterAsync(_cancellationTokenSource.Token).Forget(); + } + + private Task WrapWriterAsync(CancellationToken cancellationToken) + { + return Task.Run(async () => + { + try + { + while (true) + { + var memory = Writer.GetMemory(_options.SizeHint); + + var readResult = await InternalWebSocket.ReceiveAsync(memory, cancellationToken); + + if (readResult.Count is 0) + { + break; + } + + Writer.Advance(readResult.Count); + + var flushResult = await Writer.FlushAsync(cancellationToken); + if (flushResult.IsCompleted) + { + break; + } + } + + await Writer.CompleteAsync(); + } + catch (Exception ex) + { + await Writer.CompleteAsync(ex); + } + }, cancellationToken); + } + + public override void AdvanceTo(SequencePosition consumed) + { + Reader.AdvanceTo(consumed); + } + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) + { + Reader.AdvanceTo(consumed, examined); + } + + public override void CancelPendingRead() + { + Reader.CancelPendingRead(); + } + + public override void Complete(Exception? exception = null) + { + _cancellationTokenSource.Cancel(); + Reader.Complete(exception); + } + + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + return Reader.ReadAsync(cancellationToken); + } + + public override bool TryRead(out ReadResult result) + { + return Reader.TryRead(out result); + } + } +} diff --git a/Pipelines.Extensions/WebSocketPipe/WebSocketPipeReaderOptions.cs b/Pipelines.Extensions/WebSocketPipe/WebSocketPipeReaderOptions.cs new file mode 100644 index 0000000..419408a --- /dev/null +++ b/Pipelines.Extensions/WebSocketPipe/WebSocketPipeReaderOptions.cs @@ -0,0 +1,19 @@ +using System.IO.Pipelines; + +namespace Pipelines.Extensions.WebSocketPipe +{ + public class WebSocketPipeReaderOptions + { + public PipeOptions PipeOptions { get; } + + public int SizeHint { get; } + + internal static readonly WebSocketPipeReaderOptions Default = new(); + + public WebSocketPipeReaderOptions(PipeOptions? pipeOptions = null, int sizeHint = 0) + { + PipeOptions = pipeOptions ?? PipeOptions.Default; + SizeHint = sizeHint; + } + } +} diff --git a/Pipelines.Extensions/WebSocketPipe/WebSocketPipeWriter.cs b/Pipelines.Extensions/WebSocketPipe/WebSocketPipeWriter.cs new file mode 100644 index 0000000..ba5001c --- /dev/null +++ b/Pipelines.Extensions/WebSocketPipe/WebSocketPipeWriter.cs @@ -0,0 +1,82 @@ +using Microsoft; +using System; +using System.IO.Pipelines; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace Pipelines.Extensions.WebSocketPipe +{ + internal sealed class WebSocketPipeWriter : PipeWriter + { + public WebSocket InternalWebSocket { get; } + + private readonly Pipe _pipe; + private PipeWriter Writer => _pipe.Writer; + private PipeReader Reader => _pipe.Reader; + + public WebSocketPipeWriter(WebSocket webSocket, WebSocketPipeWriterOptions options) + { + Requires.NotNull(webSocket, nameof(webSocket)); + Requires.NotNull(options, nameof(options)); + + InternalWebSocket = webSocket; + _pipe = new Pipe(options.PipeOptions); + } + + public override void Advance(int bytes) + { + Writer.Advance(bytes); + } + + public override Memory GetMemory(int sizeHint = 0) + { + return Writer.GetMemory(sizeHint); + } + + public override Span GetSpan(int sizeHint = 0) + { + return Writer.GetSpan(sizeHint); + } + + public override void CancelPendingFlush() + { + Writer.CancelPendingFlush(); + } + + public override void Complete(Exception? exception = null) + { + Writer.Complete(exception); + } + + public override async ValueTask FlushAsync(CancellationToken cancellationToken = default) + { + var flushResult = await Writer.FlushAsync(cancellationToken); + + try + { + var result = await Reader.ReadAsync(cancellationToken); + var buffer = result.Buffer; + + foreach (var memory in buffer) + { + await InternalWebSocket.SendAsync(memory, WebSocketMessageType.Binary, true, cancellationToken); + } + + Reader.AdvanceTo(buffer.End); + + if (result.IsCompleted) + { + await Reader.CompleteAsync(); + } + } + catch (Exception ex) + { + await Reader.CompleteAsync(ex); + throw; + } + + return flushResult; + } + } +} diff --git a/Pipelines.Extensions/WebSocketPipe/WebSocketPipeWriterOptions.cs b/Pipelines.Extensions/WebSocketPipe/WebSocketPipeWriterOptions.cs new file mode 100644 index 0000000..fa8cf82 --- /dev/null +++ b/Pipelines.Extensions/WebSocketPipe/WebSocketPipeWriterOptions.cs @@ -0,0 +1,16 @@ +using System.IO.Pipelines; + +namespace Pipelines.Extensions.WebSocketPipe +{ + public class WebSocketPipeWriterOptions + { + public PipeOptions PipeOptions { get; } + + internal static readonly WebSocketPipeWriterOptions Default = new(); + + public WebSocketPipeWriterOptions(PipeOptions? pipeOptions = null) + { + PipeOptions = pipeOptions ?? PipeOptions.Default; + } + } +}