From 6de357e7f27086bd03005cbf3e71c27107bcfc7d Mon Sep 17 00:00:00 2001 From: David Fowler Date: Wed, 26 Jun 2019 22:06:09 -0700 Subject: [PATCH] Revert the output pipe in the DuplexStreamPipeAdapter (#11601) * Revert back to copying data to pipes * Replace the output pipe only * Don't complete the connection pipe in Http2FrameWriter - This leads to trunated data in some cases. Instead just yield the middleware so we can be sure no more user code is running (Http1OutputProducer does this as well). There are still cases where a misbeaving application that doesn't properly await writes gets cut off but that will be fixed in the SteamPipeWriter itself. - Updated tests --- .../src/Internal/Http2/Http2FrameWriter.cs | 1 - .../Middleware/HttpsConnectionMiddleware.cs | 10 +- .../Internal/DuplexPipeStreamAdapter.cs | 99 +++++++++++++++++-- .../Middleware/LoggingConnectionMiddleware.cs | 1 + .../Http2/Http2TestBase.cs | 17 +++- 5 files changed, 114 insertions(+), 14 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs index 630927bb3ff6..593408337142 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs @@ -89,7 +89,6 @@ public void Complete() _completed = true; _connectionOutputFlowControl.Abort(); - _outputWriter.Complete(); } } diff --git a/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs b/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs index 2449fc514629..7c34e025c6e8 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs @@ -103,7 +103,10 @@ private async Task InnerOnConnectionAsync(ConnectionContext context) if (_options.ClientCertificateMode == ClientCertificateMode.NoCertificate) { - sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions); + sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions) + { + Log = _logger + }; certificateRequired = false; } else @@ -140,7 +143,10 @@ private async Task InnerOnConnectionAsync(ConnectionContext context) } return true; - })); + })) + { + Log = _logger + }; certificateRequired = true; } diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 5b5cc019057c..0a0d09384c21 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -5,6 +5,7 @@ using System.IO; using System.IO.Pipelines; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal { @@ -14,36 +15,114 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal /// internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream { + private readonly Pipe _output; + private Task _outputTask; + public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) : this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream) { } - public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : base(duplexPipe.Input, duplexPipe.Output) + public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : + base(duplexPipe.Input, duplexPipe.Output) { Stream = createStream(this); + + var outputOptions = new PipeOptions(pool: writerOptions.Pool, + readerScheduler: PipeScheduler.Inline, + writerScheduler: PipeScheduler.Inline, + pauseWriterThreshold: 1, + resumeWriterThreshold: 1, + minimumSegmentSize: writerOptions.MinimumBufferSize, + useSynchronizationContext: false); + Input = PipeReader.Create(Stream, readerOptions); - Output = PipeWriter.Create(Stream, writerOptions); + + // We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions + // about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once + // those patterns are fixed. + _output = new Pipe(outputOptions); } + public ILogger Log { get; set; } + public TStream Stream { get; } public PipeReader Input { get; } - public PipeWriter Output { get; } + public PipeWriter Output + { + get + { + if (_outputTask == null) + { + _outputTask = WriteOutputAsync(); + } - protected override void Dispose(bool disposing) + return _output.Writer; + } + } + + public override async ValueTask DisposeAsync() { Input.Complete(); - Output.Complete(); - base.Dispose(disposing); + _output.Writer.Complete(); + + if (_outputTask != null) + { + // Wait for the output task to complete, this ensures that we've copied + // the application data to the underlying stream + await _outputTask; + } } - public override ValueTask DisposeAsync() + private async Task WriteOutputAsync() { - Input.Complete(); - Output.Complete(); - return base.DisposeAsync(); + try + { + while (true) + { + var result = await _output.Reader.ReadAsync(); + var buffer = result.Buffer; + + try + { + if (buffer.IsEmpty) + { + if (result.IsCompleted) + { + break; + } + + await Stream.FlushAsync(); + } + else if (buffer.IsSingleSegment) + { + await Stream.WriteAsync(buffer.First); + } + else + { + foreach (var memory in buffer) + { + await Stream.WriteAsync(memory); + } + } + } + finally + { + _output.Reader.AdvanceTo(buffer.End); + } + } + } + catch (Exception ex) + { + Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}"); + } + finally + { + _output.Reader.Complete(); + } } } } + diff --git a/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs b/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs index ed9ffb819c7b..5ce9f6df510f 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs @@ -44,6 +44,7 @@ private class LoggingDuplexPipe : DuplexPipeStreamAdapter public LoggingDuplexPipe(IDuplexPipe transport, ILogger logger) : base(transport, stream => new LoggingStream(stream, logger)) { + Log = logger; } } } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs index 543a9dc2896b..d9fd7fe6f52f 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs @@ -467,7 +467,22 @@ protected async Task InitializeConnectionAsync(RequestDelegate application, int CreateConnection(); } - _connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application)); + var connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application)); + + async Task CompletePipeOnTaskCompletion() + { + try + { + await connectionTask; + } + finally + { + _pair.Transport.Input.Complete(); + _pair.Transport.Output.Complete(); + } + } + + _connectionTask = CompletePipeOnTaskCompletion(); await SendPreambleAsync().ConfigureAwait(false); await SendSettingsAsync();