diff --git a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs index 46101e410213..a8d79d86bcc1 100644 --- a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs +++ b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs @@ -869,19 +869,14 @@ private async Task InvokeStreamCore(ConnectionState connectionState, string meth } } -#pragma warning disable IDE0060 // Remove unused parameter. Resolving tracked via https://github.com/dotnet/aspnetcore/issues/40475 private async Task SendHubMessage(ConnectionState connectionState, HubMessage hubMessage, CancellationToken cancellationToken = default) -#pragma warning restore IDE0060 // Remove unused parameter { _state.AssertConnectionValid(); _protocol.WriteMessage(hubMessage, connectionState.Connection.Transport.Output); Log.SendingMessage(_logger, hubMessage); -#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods - // REVIEW: If a token is passed in and is canceled during FlushAsync it seems to break .Complete()... - await connectionState.Connection.Transport.Output.FlushAsync().ConfigureAwait(false); -#pragma warning restore CA2016 // Forward the 'CancellationToken' parameter to methods + await connectionState.Connection.Transport.Output.FlushAsync(cancellationToken).ConfigureAwait(false); Log.MessageSent(_logger, hubMessage); // We've sent a message, so don't ping for a while diff --git a/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs b/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs index cf3a09211928..0151f8462a74 100644 --- a/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs +++ b/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs @@ -4,6 +4,7 @@ using System; using System.Buffers; using System.IO; +using System.IO.Pipelines; using System.Net; using System.Net.WebSockets; using System.Threading; @@ -220,6 +221,29 @@ await Assert.ThrowsAsync(() => } } + [Fact] + public async Task SendAsyncCanceledWhenTokenCanceledDuringSend() + { + using (StartVerifiableLog()) + { + // Use pause threshold to block FlushAsync when writing 100+ bytes + var connection = new TestConnection(pipeOptions: new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, pauseWriterThreshold: 100, useSynchronizationContext: false)); + var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory); + + await hubConnection.StartAsync().DefaultTimeout(); + + var cts = new CancellationTokenSource(); + // Send 100+ bytes to trigger pauseWriterThreshold + var sendTask = hubConnection.SendAsync("testMethod", new byte[100], cts.Token); + + cts.Cancel(); + + await Assert.ThrowsAsync(() => sendTask.DefaultTimeout()); + + await hubConnection.StopAsync().DefaultTimeout(); + } + } + [Fact] public async Task StreamAsChannelAsyncCanceledWhenPassedCanceledToken() { diff --git a/src/SignalR/clients/csharp/Client/test/UnitTests/TestConnection.cs b/src/SignalR/clients/csharp/Client/test/UnitTests/TestConnection.cs index 411b8a8c0340..65a378f9de5d 100644 --- a/src/SignalR/clients/csharp/Client/test/UnitTests/TestConnection.cs +++ b/src/SignalR/clients/csharp/Client/test/UnitTests/TestConnection.cs @@ -45,14 +45,14 @@ internal class TestConnection : ConnectionContext, IConnectionInherentKeepAliveF bool IConnectionInherentKeepAliveFeature.HasInherentKeepAlive => _hasInherentKeepAlive; - public TestConnection(Func onStart = null, Func onDispose = null, bool autoHandshake = true, bool hasInherentKeepAlive = false) + public TestConnection(Func onStart = null, Func onDispose = null, bool autoHandshake = true, bool hasInherentKeepAlive = false, PipeOptions pipeOptions = null) { _autoHandshake = autoHandshake; _onStart = onStart ?? (() => Task.CompletedTask); _onDispose = onDispose ?? (() => Task.CompletedTask); _hasInherentKeepAlive = hasInherentKeepAlive; - var options = new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false); + var options = pipeOptions ?? new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false); var pair = DuplexPipe.CreateConnectionPair(options, options); Application = pair.Application;