diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs index 630927bb3ff6..593408337142 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs @@ -89,7 +89,6 @@ public void Complete() _completed = true; _connectionOutputFlowControl.Abort(); - _outputWriter.Complete(); } } diff --git a/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs b/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs index 2449fc514629..7c34e025c6e8 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs @@ -103,7 +103,10 @@ private async Task InnerOnConnectionAsync(ConnectionContext context) if (_options.ClientCertificateMode == ClientCertificateMode.NoCertificate) { - sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions); + sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions) + { + Log = _logger + }; certificateRequired = false; } else @@ -140,7 +143,10 @@ private async Task InnerOnConnectionAsync(ConnectionContext context) } return true; - })); + })) + { + Log = _logger + }; certificateRequired = true; } diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 5b5cc019057c..0a0d09384c21 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -5,6 +5,7 @@ using System.IO; using System.IO.Pipelines; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal { @@ -14,36 +15,114 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal /// internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream { + private readonly Pipe _output; + private Task _outputTask; + public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) : this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream) { } - public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : base(duplexPipe.Input, duplexPipe.Output) + public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : + base(duplexPipe.Input, duplexPipe.Output) { Stream = createStream(this); + + var outputOptions = new PipeOptions(pool: writerOptions.Pool, + readerScheduler: PipeScheduler.Inline, + writerScheduler: PipeScheduler.Inline, + pauseWriterThreshold: 1, + resumeWriterThreshold: 1, + minimumSegmentSize: writerOptions.MinimumBufferSize, + useSynchronizationContext: false); + Input = PipeReader.Create(Stream, readerOptions); - Output = PipeWriter.Create(Stream, writerOptions); + + // We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions + // about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once + // those patterns are fixed. + _output = new Pipe(outputOptions); } + public ILogger Log { get; set; } + public TStream Stream { get; } public PipeReader Input { get; } - public PipeWriter Output { get; } + public PipeWriter Output + { + get + { + if (_outputTask == null) + { + _outputTask = WriteOutputAsync(); + } - protected override void Dispose(bool disposing) + return _output.Writer; + } + } + + public override async ValueTask DisposeAsync() { Input.Complete(); - Output.Complete(); - base.Dispose(disposing); + _output.Writer.Complete(); + + if (_outputTask != null) + { + // Wait for the output task to complete, this ensures that we've copied + // the application data to the underlying stream + await _outputTask; + } } - public override ValueTask DisposeAsync() + private async Task WriteOutputAsync() { - Input.Complete(); - Output.Complete(); - return base.DisposeAsync(); + try + { + while (true) + { + var result = await _output.Reader.ReadAsync(); + var buffer = result.Buffer; + + try + { + if (buffer.IsEmpty) + { + if (result.IsCompleted) + { + break; + } + + await Stream.FlushAsync(); + } + else if (buffer.IsSingleSegment) + { + await Stream.WriteAsync(buffer.First); + } + else + { + foreach (var memory in buffer) + { + await Stream.WriteAsync(memory); + } + } + } + finally + { + _output.Reader.AdvanceTo(buffer.End); + } + } + } + catch (Exception ex) + { + Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}"); + } + finally + { + _output.Reader.Complete(); + } } } } + diff --git a/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs b/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs index ed9ffb819c7b..5ce9f6df510f 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs @@ -44,6 +44,7 @@ private class LoggingDuplexPipe : DuplexPipeStreamAdapter public LoggingDuplexPipe(IDuplexPipe transport, ILogger logger) : base(transport, stream => new LoggingStream(stream, logger)) { + Log = logger; } } } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs index 543a9dc2896b..d9fd7fe6f52f 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs @@ -467,7 +467,22 @@ protected async Task InitializeConnectionAsync(RequestDelegate application, int CreateConnection(); } - _connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application)); + var connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application)); + + async Task CompletePipeOnTaskCompletion() + { + try + { + await connectionTask; + } + finally + { + _pair.Transport.Input.Complete(); + _pair.Transport.Output.Complete(); + } + } + + _connectionTask = CompletePipeOnTaskCompletion(); await SendPreambleAsync().ConfigureAwait(false); await SendSettingsAsync();