Skip to content

Commit

Permalink
[SignalR] Pass cancellation token to FlushAsync (#41180)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy committed Apr 14, 2022
1 parent 181db59 commit d800343
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
7 changes: 1 addition & 6 deletions src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -869,19 +869,14 @@ private async Task InvokeStreamCore(ConnectionState connectionState, string meth
}
}

#pragma warning disable IDE0060 // Remove unused parameter. Resolving tracked via https://github.com/dotnet/aspnetcore/issues/40475
private async Task SendHubMessage(ConnectionState connectionState, HubMessage hubMessage, CancellationToken cancellationToken = default)
#pragma warning restore IDE0060 // Remove unused parameter
{
_state.AssertConnectionValid();
_protocol.WriteMessage(hubMessage, connectionState.Connection.Transport.Output);

Log.SendingMessage(_logger, hubMessage);

#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods
// REVIEW: If a token is passed in and is canceled during FlushAsync it seems to break .Complete()...
await connectionState.Connection.Transport.Output.FlushAsync().ConfigureAwait(false);
#pragma warning restore CA2016 // Forward the 'CancellationToken' parameter to methods
await connectionState.Connection.Transport.Output.FlushAsync(cancellationToken).ConfigureAwait(false);
Log.MessageSent(_logger, hubMessage);

// We've sent a message, so don't ping for a while
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Net;
using System.Net.WebSockets;
using System.Threading;
Expand Down Expand Up @@ -220,6 +221,29 @@ await Assert.ThrowsAsync<TaskCanceledException>(() =>
}
}

[Fact]
public async Task SendAsyncCanceledWhenTokenCanceledDuringSend()
{
using (StartVerifiableLog())
{
// Use pause threshold to block FlushAsync when writing 100+ bytes
var connection = new TestConnection(pipeOptions: new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, pauseWriterThreshold: 100, useSynchronizationContext: false));
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);

await hubConnection.StartAsync().DefaultTimeout();

var cts = new CancellationTokenSource();
// Send 100+ bytes to trigger pauseWriterThreshold
var sendTask = hubConnection.SendAsync("testMethod", new byte[100], cts.Token);

cts.Cancel();

await Assert.ThrowsAsync<OperationCanceledException>(() => sendTask.DefaultTimeout());

await hubConnection.StopAsync().DefaultTimeout();
}
}

[Fact]
public async Task StreamAsChannelAsyncCanceledWhenPassedCanceledToken()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ internal class TestConnection : ConnectionContext, IConnectionInherentKeepAliveF

bool IConnectionInherentKeepAliveFeature.HasInherentKeepAlive => _hasInherentKeepAlive;

public TestConnection(Func<Task> onStart = null, Func<Task> onDispose = null, bool autoHandshake = true, bool hasInherentKeepAlive = false)
public TestConnection(Func<Task> onStart = null, Func<Task> onDispose = null, bool autoHandshake = true, bool hasInherentKeepAlive = false, PipeOptions pipeOptions = null)
{
_autoHandshake = autoHandshake;
_onStart = onStart ?? (() => Task.CompletedTask);
_onDispose = onDispose ?? (() => Task.CompletedTask);
_hasInherentKeepAlive = hasInherentKeepAlive;

var options = new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
var options = pipeOptions ?? new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);

var pair = DuplexPipe.CreateConnectionPair(options, options);
Application = pair.Application;
Expand Down

0 comments on commit d800343

Please sign in to comment.