This repository has been archived by the owner on Aug 2, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 344
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Golang took the name channels so no we have pipelines
- Loading branch information
Showing
161 changed files
with
18,063 additions
and
69 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
38 changes: 38 additions & 0 deletions
38
samples/System.IO.Pipelines.Samples/AspNetHttpServerSample.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
using System.Text; | ||
using System.IO.Pipelines.Samples.Http; | ||
using Microsoft.AspNetCore.Builder; | ||
using Microsoft.AspNetCore.Hosting; | ||
|
||
namespace System.IO.Pipelines.Samples | ||
{ | ||
public class AspNetHttpServerSample | ||
{ | ||
private static readonly UTF8Encoding _utf8Encoding = new UTF8Encoding(false); | ||
private static readonly byte[] _helloWorldPayload = Encoding.UTF8.GetBytes("Hello, World!"); | ||
|
||
public static void Run() | ||
{ | ||
using (var httpServer = new HttpServer()) | ||
{ | ||
var host = new WebHostBuilder() | ||
.UseUrls("http://*:5000") | ||
.UseServer(httpServer) | ||
// .UseKestrel() | ||
.Configure(app => | ||
{ | ||
app.Run(context => | ||
{ | ||
context.Response.StatusCode = 200; | ||
context.Response.ContentType = "text/plain"; | ||
// HACK: Setting the Content-Length header manually avoids the cost of serializing the int to a string. | ||
// This is instead of: httpContext.Response.ContentLength = _helloWorldPayload.Length; | ||
context.Response.Headers["Content-Length"] = "13"; | ||
return context.Response.Body.WriteAsync(_helloWorldPayload, 0, _helloWorldPayload.Length); | ||
}); | ||
}) | ||
.Build(); | ||
host.Run(); | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
using System; | ||
using System.IO; | ||
using System.IO.Compression; | ||
using System.IO.Pipelines.Compression; | ||
using System.IO.Pipelines.File; | ||
|
||
namespace System.IO.Pipelines.Samples | ||
{ | ||
public class CompressionSample | ||
{ | ||
public static void Run() | ||
{ | ||
using (var cf = new PipelineFactory()) | ||
{ | ||
var filePath = Path.GetFullPath("Program.cs"); | ||
|
||
// This is what Stream looks like | ||
//var fs = File.OpenRead(filePath); | ||
//var compressed = new MemoryStream(); | ||
//var compressStream = new DeflateStream(compressed, CompressionMode.Compress); | ||
//fs.CopyTo(compressStream); | ||
//compressStream.Flush(); | ||
//compressed.Seek(0, SeekOrigin.Begin); | ||
// var input = channelFactory.MakeReadableChannel(compressed); | ||
|
||
var input = cf.ReadFile(filePath) | ||
.DeflateCompress(cf, CompressionLevel.Optimal) | ||
.DeflateDecompress(cf); | ||
|
||
// Wrap the console in a writable channel | ||
var output = cf.CreateWriter(Console.OpenStandardOutput()); | ||
|
||
// Copy from the file channel to the console channel | ||
input.CopyToAsync(output).GetAwaiter().GetResult(); | ||
|
||
input.Complete(); | ||
|
||
output.Complete(); | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
using System; | ||
using System.IO; | ||
using System.Net; | ||
using System.Text; | ||
using System.Text.Formatting; | ||
using System.Threading.Tasks; | ||
using System.IO.Pipelines.Networking.Libuv; | ||
using System.IO.Pipelines.Text.Primitives; | ||
|
||
namespace System.IO.Pipelines.Samples.Framing | ||
{ | ||
public static class ProtocolHandling | ||
{ | ||
public static void Run() | ||
{ | ||
var ip = IPAddress.Any; | ||
int port = 5000; | ||
var thread = new UvThread(); | ||
var listener = new UvTcpListener(thread, new IPEndPoint(ip, port)); | ||
listener.OnConnection(async connection => | ||
{ | ||
var channel = MakePipeline(connection); | ||
|
||
var decoder = new LineDecoder(); | ||
var handler = new LineHandler(); | ||
|
||
// Initialize the handler with the channel | ||
handler.Initialize(channel); | ||
|
||
try | ||
{ | ||
while (true) | ||
{ | ||
// Wait for data | ||
var result = await channel.Input.ReadAsync(); | ||
var input = result.Buffer; | ||
|
||
try | ||
{ | ||
if (input.IsEmpty && result.IsCompleted) | ||
{ | ||
// No more data | ||
break; | ||
} | ||
|
||
Line line; | ||
while (decoder.TryDecode(ref input, out line)) | ||
{ | ||
await handler.HandleAsync(line); | ||
} | ||
|
||
if (!input.IsEmpty && result.IsCompleted) | ||
{ | ||
// Didn't get the whole frame and the connection ended | ||
throw new EndOfStreamException(); | ||
} | ||
} | ||
finally | ||
{ | ||
// Consume the input | ||
channel.Input.Advance(input.Start, input.End); | ||
} | ||
} | ||
} | ||
finally | ||
{ | ||
// Close the input channel, which will tell the producer to stop producing | ||
channel.Input.Complete(); | ||
|
||
// Close the output channel, which will close the connection | ||
channel.Output.Complete(); | ||
} | ||
}); | ||
|
||
listener.StartAsync().GetAwaiter().GetResult(); | ||
|
||
Console.WriteLine($"Listening on {ip} on port {port}"); | ||
Console.ReadKey(); | ||
|
||
listener.Dispose(); | ||
thread.Dispose(); | ||
} | ||
|
||
public static IPipelineConnection MakePipeline(IPipelineConnection channel) | ||
{ | ||
// Do something fancy here to wrap the channel, SSL etc | ||
return channel; | ||
} | ||
} | ||
|
||
public class Line | ||
{ | ||
public string Data { get; set; } | ||
} | ||
|
||
public class LineHandler : IFrameHandler<Line> | ||
{ | ||
private WritableChannelFormatter _formatter; | ||
|
||
public void Initialize(IPipelineConnection channel) | ||
{ | ||
_formatter = new WritableChannelFormatter(channel.Output, EncodingData.InvariantUtf8); | ||
} | ||
|
||
public Task HandleAsync(Line message) | ||
{ | ||
// Echo back to the caller | ||
_formatter.Append(message.Data); | ||
return _formatter.FlushAsync(); | ||
} | ||
} | ||
|
||
public class LineDecoder : IFrameDecoder<Line> | ||
{ | ||
public bool TryDecode(ref ReadableBuffer input, out Line frame) | ||
{ | ||
ReadableBuffer slice; | ||
ReadCursor cursor; | ||
if (input.TrySliceTo((byte)'\r', (byte)'\n', out slice, out cursor)) | ||
{ | ||
frame = new Line { Data = slice.GetUtf8String() }; | ||
input = input.Slice(cursor).Slice(1); | ||
return true; | ||
} | ||
|
||
frame = null; | ||
return false; | ||
} | ||
} | ||
|
||
public interface IFrameDecoder<TInput> | ||
{ | ||
bool TryDecode(ref ReadableBuffer input, out TInput frame); | ||
} | ||
|
||
public interface IFrameHandler<TInput> | ||
{ | ||
void Initialize(IPipelineConnection channel); | ||
|
||
Task HandleAsync(TInput message); | ||
} | ||
} |
77 changes: 77 additions & 0 deletions
77
samples/System.IO.Pipelines.Samples/HttpClient/ChannelHttpContent.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.IO; | ||
using System.Linq; | ||
using System.Net; | ||
using System.Net.Http; | ||
using System.Threading.Tasks; | ||
|
||
namespace System.IO.Pipelines.Samples | ||
{ | ||
public class ChannelHttpContent : HttpContent | ||
{ | ||
private readonly IPipelineReader _output; | ||
|
||
public ChannelHttpContent(IPipelineReader output) | ||
{ | ||
_output = output; | ||
} | ||
|
||
public int ContentLength { get; set; } | ||
|
||
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) | ||
{ | ||
int remaining = ContentLength; | ||
|
||
while (remaining > 0) | ||
{ | ||
var result = await _output.ReadAsync(); | ||
var inputBuffer = result.Buffer; | ||
|
||
var fin = result.IsCompleted; | ||
|
||
var consumed = inputBuffer.Start; | ||
|
||
try | ||
{ | ||
if (inputBuffer.IsEmpty && fin) | ||
{ | ||
return; | ||
} | ||
|
||
var data = inputBuffer.Slice(0, remaining); | ||
|
||
foreach (var memory in data) | ||
{ | ||
ArraySegment<byte> buffer; | ||
|
||
unsafe | ||
{ | ||
if (!memory.TryGetArray(out buffer)) | ||
{ | ||
// Fall back to copies if this was native memory and we were unable to get | ||
// something we could write | ||
buffer = new ArraySegment<byte>(memory.Span.ToArray()); | ||
} | ||
} | ||
|
||
await stream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count); | ||
} | ||
|
||
consumed = data.End; | ||
remaining -= data.Length; | ||
} | ||
finally | ||
{ | ||
_output.Advance(consumed); | ||
} | ||
} | ||
} | ||
|
||
protected override bool TryComputeLength(out long length) | ||
{ | ||
length = ContentLength; | ||
return true; | ||
} | ||
} | ||
} |
Oops, something went wrong.