Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
Introducing System.IO.Pipelines (#980)
Browse files Browse the repository at this point in the history
* Introducing System.IO.Pipelines
- Golang took the name channels so now we have pipelines
  • Loading branch information
davidfowl authored Nov 15, 2016
1 parent e925016 commit df543b2
Show file tree
Hide file tree
Showing 162 changed files with 18,258 additions and 69 deletions.
198 changes: 129 additions & 69 deletions corefxlab.sln

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions samples/System.IO.Pipelines.Samples/AspNetHttpServerSample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.Text;
using System.IO.Pipelines.Samples.Http;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;

namespace System.IO.Pipelines.Samples
{
public class AspNetHttpServerSample
{
private static readonly UTF8Encoding _utf8Encoding = new UTF8Encoding(false);
private static readonly byte[] _helloWorldPayload = Encoding.UTF8.GetBytes("Hello, World!");

public static void Run()
{
using (var httpServer = new HttpServer())
{
var host = new WebHostBuilder()
.UseUrls("http://*:5000")
.UseServer(httpServer)
// .UseKestrel()
.Configure(app =>
{
app.Run(context =>
{
context.Response.StatusCode = 200;
context.Response.ContentType = "text/plain";
// HACK: Setting the Content-Length header manually avoids the cost of serializing the int to a string.
// This is instead of: httpContext.Response.ContentLength = _helloWorldPayload.Length;
context.Response.Headers["Content-Length"] = "13";
return context.Response.Body.WriteAsync(_helloWorldPayload, 0, _helloWorldPayload.Length);
});
})
.Build();
host.Run();
}
}
}
}
41 changes: 41 additions & 0 deletions samples/System.IO.Pipelines.Samples/CompressionSample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.IO;
using System.IO.Compression;
using System.IO.Pipelines.Compression;
using System.IO.Pipelines.File;

namespace System.IO.Pipelines.Samples
{
public class CompressionSample
{
public static void Run()
{
using (var factory = new PipelineFactory())
{
var filePath = Path.GetFullPath("Program.cs");

// This is what Stream looks like
//var fs = File.OpenRead(filePath);
//var compressed = new MemoryStream();
//var compressStream = new DeflateStream(compressed, CompressionMode.Compress);
//fs.CopyTo(compressStream);
//compressStream.Flush();
//compressed.Seek(0, SeekOrigin.Begin);

var input = factory.ReadFile(filePath)
.DeflateCompress(factory, CompressionLevel.Optimal)
.DeflateDecompress(factory);

// Wrap the console in a pipeline writer
var output = factory.CreateWriter(Console.OpenStandardOutput());

// Copy from the file reader to the console writer
input.CopyToAsync(output).GetAwaiter().GetResult();

input.Complete();

output.Complete();
}
}
}
}
142 changes: 142 additions & 0 deletions samples/System.IO.Pipelines.Samples/Framing/Codec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
using System;
using System.IO;
using System.Net;
using System.Text;
using System.Text.Formatting;
using System.Threading.Tasks;
using System.IO.Pipelines.Networking.Libuv;
using System.IO.Pipelines.Text.Primitives;

namespace System.IO.Pipelines.Samples.Framing
{
public static class ProtocolHandling
{
public static void Run()
{
var ip = IPAddress.Any;
int port = 5000;
var thread = new UvThread();
var listener = new UvTcpListener(thread, new IPEndPoint(ip, port));
listener.OnConnection(async connection =>
{
var pipelineConnection = MakePipeline(connection);

var decoder = new LineDecoder();
var handler = new LineHandler();

// Initialize the handler with the connection
handler.Initialize(pipelineConnection);

try
{
while (true)
{
// Wait for data
var result = await pipelineConnection.Input.ReadAsync();
var input = result.Buffer;

try
{
if (input.IsEmpty && result.IsCompleted)
{
// No more data
break;
}

Line line;
while (decoder.TryDecode(ref input, out line))
{
await handler.HandleAsync(line);
}

if (!input.IsEmpty && result.IsCompleted)
{
// Didn't get the whole frame and the connection ended
throw new EndOfStreamException();
}
}
finally
{
// Consume the input
pipelineConnection.Input.Advance(input.Start, input.End);
}
}
}
finally
{
// Close the input, which will tell the producer to stop producing
pipelineConnection.Input.Complete();

// Close the output, which will close the connection
pipelineConnection.Output.Complete();
}
});

listener.StartAsync().GetAwaiter().GetResult();

Console.WriteLine($"Listening on {ip} on port {port}");
Console.ReadKey();

listener.Dispose();
thread.Dispose();
}

public static IPipelineConnection MakePipeline(IPipelineConnection connection)
{
// Do something fancy here to wrap the connection, SSL etc
return connection;
}
}

public class Line
{
public string Data { get; set; }
}

public class LineHandler : IFrameHandler<Line>
{
private PipelineTextOutput _textOutput;

public void Initialize(IPipelineConnection connection)
{
_textOutput = new PipelineTextOutput(connection.Output, EncodingData.InvariantUtf8);
}

public Task HandleAsync(Line message)
{
// Echo back to the caller
_textOutput.Append(message.Data);
return _textOutput.FlushAsync();
}
}

public class LineDecoder : IFrameDecoder<Line>
{
public bool TryDecode(ref ReadableBuffer input, out Line frame)
{
ReadableBuffer slice;
ReadCursor cursor;
if (input.TrySliceTo((byte)'\r', (byte)'\n', out slice, out cursor))
{
frame = new Line { Data = slice.GetUtf8String() };
input = input.Slice(cursor).Slice(1);
return true;
}

frame = null;
return false;
}
}

public interface IFrameDecoder<TInput>
{
bool TryDecode(ref ReadableBuffer input, out TInput frame);
}

public interface IFrameHandler<TInput>
{
void Initialize(IPipelineConnection connection);

Task HandleAsync(TInput message);
}
}
Loading

0 comments on commit df543b2

Please sign in to comment.