Skip to content

Commit

Permalink
feat: Add WebSocket pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
HMBSbige committed Sep 1, 2021
1 parent 47bc76f commit b151fe1
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 2 deletions.
8 changes: 8 additions & 0 deletions Pipelines.Extensions/PipeReaderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using Microsoft;
using Pipelines.Extensions.SocketPipe;
using Pipelines.Extensions.WebSocketPipe;
using System;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -119,5 +121,11 @@ public static PipeReader AsPipeReader(this Socket socket, SocketPipeReaderOption
{
return new SocketPipeReader(socket, options ?? SocketPipeReaderOptions.Default);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static PipeReader AsPipeReader(this WebSocket webSocket, WebSocketPipeReaderOptions? options = null)
{
return new WebSocketPipeReader(webSocket, options ?? WebSocketPipeReaderOptions.Default);
}
}
}
8 changes: 8 additions & 0 deletions Pipelines.Extensions/PipeWriterExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using Pipelines.Extensions.SocketPipe;
using Pipelines.Extensions.WebSocketPipe;
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -93,5 +95,11 @@ public static PipeWriter AsPipeWriter(this Socket socket, SocketPipeWriterOption
{
return new SocketPipeWriter(socket, options ?? SocketPipeWriterOptions.Default);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static PipeWriter AsPipeWriter(this WebSocket webSocket, WebSocketPipeWriterOptions? options = null)
{
return new WebSocketPipeWriter(webSocket, options ?? WebSocketPipeWriterOptions.Default);
}
}
}
15 changes: 13 additions & 2 deletions Pipelines.Extensions/PipelinesExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using Microsoft;
using Pipelines.Extensions.SocketPipe;
using Pipelines.Extensions.WebSocketPipe;
using System.IO;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -39,12 +41,21 @@ public static IDuplexPipe AsDuplexPipe(this Socket socket,
SocketPipeReaderOptions? readerOptions = null,
SocketPipeWriterOptions? writerOptions = null)
{
Requires.Argument(socket.Connected, nameof(socket), @"Socket must be connected.");

var reader = socket.AsPipeReader(readerOptions);
var writer = socket.AsPipeWriter(writerOptions);

return DefaultDuplexPipe.Create(reader, writer);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static IDuplexPipe AsDuplexPipe(this WebSocket webSocket,
WebSocketPipeReaderOptions? readerOptions = null,
WebSocketPipeWriterOptions? writerOptions = null)
{
var reader = webSocket.AsPipeReader(readerOptions);
var writer = webSocket.AsPipeWriter(writerOptions);

return DefaultDuplexPipe.Create(reader, writer);
}
}
}
101 changes: 101 additions & 0 deletions Pipelines.Extensions/WebSocketPipe/WebSocketPipeReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
using Microsoft;
using Microsoft.VisualStudio.Threading;
using System;
using System.IO.Pipelines;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace Pipelines.Extensions.WebSocketPipe
{
internal sealed class WebSocketPipeReader : PipeReader
{
public WebSocket InternalWebSocket { get; }

private readonly WebSocketPipeReaderOptions _options;
private readonly Pipe _pipe;
private PipeWriter Writer => _pipe.Writer;
private PipeReader Reader => _pipe.Reader;

private readonly CancellationTokenSource _cancellationTokenSource;

public WebSocketPipeReader(WebSocket webSocket, WebSocketPipeReaderOptions options)
{
Requires.NotNull(webSocket, nameof(webSocket));
Requires.NotNull(options, nameof(options));

InternalWebSocket = webSocket;
_options = options;
_pipe = new Pipe(options.PipeOptions);
_cancellationTokenSource = new CancellationTokenSource();

WrapWriterAsync(_cancellationTokenSource.Token).Forget();
}

private Task WrapWriterAsync(CancellationToken cancellationToken)
{
return Task.Run(async () =>
{
try
{
while (true)
{
var memory = Writer.GetMemory(_options.SizeHint);

var readResult = await InternalWebSocket.ReceiveAsync(memory, cancellationToken);

if (readResult.Count is 0)
{
break;
}

Writer.Advance(readResult.Count);

var flushResult = await Writer.FlushAsync(cancellationToken);
if (flushResult.IsCompleted)
{
break;
}
}

await Writer.CompleteAsync();
}
catch (Exception ex)
{
await Writer.CompleteAsync(ex);
}
}, cancellationToken);
}

public override void AdvanceTo(SequencePosition consumed)
{
Reader.AdvanceTo(consumed);
}

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
Reader.AdvanceTo(consumed, examined);
}

public override void CancelPendingRead()
{
Reader.CancelPendingRead();
}

public override void Complete(Exception? exception = null)
{
_cancellationTokenSource.Cancel();
Reader.Complete(exception);
}

public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
return Reader.ReadAsync(cancellationToken);
}

public override bool TryRead(out ReadResult result)
{
return Reader.TryRead(out result);
}
}
}
19 changes: 19 additions & 0 deletions Pipelines.Extensions/WebSocketPipe/WebSocketPipeReaderOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.IO.Pipelines;

namespace Pipelines.Extensions.WebSocketPipe
{
public class WebSocketPipeReaderOptions
{
public PipeOptions PipeOptions { get; }

public int SizeHint { get; }

internal static readonly WebSocketPipeReaderOptions Default = new();

public WebSocketPipeReaderOptions(PipeOptions? pipeOptions = null, int sizeHint = 0)
{
PipeOptions = pipeOptions ?? PipeOptions.Default;
SizeHint = sizeHint;
}
}
}
82 changes: 82 additions & 0 deletions Pipelines.Extensions/WebSocketPipe/WebSocketPipeWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using Microsoft;
using System;
using System.IO.Pipelines;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace Pipelines.Extensions.WebSocketPipe
{
internal sealed class WebSocketPipeWriter : PipeWriter
{
public WebSocket InternalWebSocket { get; }

private readonly Pipe _pipe;
private PipeWriter Writer => _pipe.Writer;
private PipeReader Reader => _pipe.Reader;

public WebSocketPipeWriter(WebSocket webSocket, WebSocketPipeWriterOptions options)
{
Requires.NotNull(webSocket, nameof(webSocket));
Requires.NotNull(options, nameof(options));

InternalWebSocket = webSocket;
_pipe = new Pipe(options.PipeOptions);
}

public override void Advance(int bytes)
{
Writer.Advance(bytes);
}

public override Memory<byte> GetMemory(int sizeHint = 0)
{
return Writer.GetMemory(sizeHint);
}

public override Span<byte> GetSpan(int sizeHint = 0)
{
return Writer.GetSpan(sizeHint);
}

public override void CancelPendingFlush()
{
Writer.CancelPendingFlush();
}

public override void Complete(Exception? exception = null)
{
Writer.Complete(exception);
}

public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
var flushResult = await Writer.FlushAsync(cancellationToken);

try
{
var result = await Reader.ReadAsync(cancellationToken);
var buffer = result.Buffer;

foreach (var memory in buffer)
{
await InternalWebSocket.SendAsync(memory, WebSocketMessageType.Binary, true, cancellationToken);
}

Reader.AdvanceTo(buffer.End);

if (result.IsCompleted)
{
await Reader.CompleteAsync();
}
}
catch (Exception ex)
{
await Reader.CompleteAsync(ex);
throw;
}

return flushResult;
}
}
}
16 changes: 16 additions & 0 deletions Pipelines.Extensions/WebSocketPipe/WebSocketPipeWriterOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.IO.Pipelines;

namespace Pipelines.Extensions.WebSocketPipe
{
public class WebSocketPipeWriterOptions
{
public PipeOptions PipeOptions { get; }

internal static readonly WebSocketPipeWriterOptions Default = new();

public WebSocketPipeWriterOptions(PipeOptions? pipeOptions = null)
{
PipeOptions = pipeOptions ?? PipeOptions.Default;
}
}
}

0 comments on commit b151fe1

Please sign in to comment.