Skip to content

Commit

Permalink
Available streams count is thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
ManickaP committed Apr 26, 2024
1 parent e99641d commit 06fab49
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
35 changes: 24 additions & 11 deletions src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,11 @@ static async ValueTask<QuicConnection> StartConnectAsync(QuicClientConnectionOpt
/// </summary>
private IPEndPoint _localEndPoint = null!;
/// <summary>
/// Represents how many bidirectional streams can be accepted by the peer. Is only manipulated from MsQuic thread.
/// </summary>
private int _availableBidirectionalStreamsCount;
/// <summary>
/// Represents how many unidirectional streams can be accepted by the peer. Is only manipulated from MsQuic thread.
/// </summary>
private int _availableUnidirectionalStreamsCount;
/// <summary>
Expand Down Expand Up @@ -434,6 +436,25 @@ internal ValueTask FinishHandshakeAsync(QuicServerConnectionOptions options, str
return valueTask;
}

/// <summary>
/// In order to provide meaningful increments in <see cref="StreamsAvailable"/>, available streams count can be only manipulated from MsQuic thread.
/// For that purpose we pass this function to <see cref="QuicStream"/> so that it can call it from <c>START_COMPLETE</c> event handler.
///
/// Note that MsQuic itself manipulates stream counts right before indicating <c>START_COMPLETE</c> event.
/// </summary>
/// <param name="streamType">Type of the stream to decrement appropriate field.</param>
private void DecrementAvailableStreamCount(QuicStreamType streamType)
{
if (streamType == QuicStreamType.Unidirectional)
{
--_availableUnidirectionalStreamsCount;
}
else
{
--_availableBidirectionalStreamsCount;
}
}

/// <summary>
/// Create an outbound uni/bidirectional <see cref="QuicStream" />.
/// In case the connection doesn't have any available stream capacity, i.e.: the peer limits the concurrent stream count,
Expand All @@ -456,15 +477,7 @@ public async ValueTask<QuicStream> OpenOutboundStreamAsync(QuicStreamType type,
NetEventSource.Info(this, $"{this} New outbound {type} stream {stream}.");
}

await stream.StartAsync(cancellationToken).ConfigureAwait(false);
if (type == QuicStreamType.Unidirectional)
{
Interlocked.Decrement(ref _availableUnidirectionalStreamsCount);
}
else
{
Interlocked.Decrement(ref _availableBidirectionalStreamsCount);
}
await stream.StartAsync(DecrementAvailableStreamCount, cancellationToken).ConfigureAwait(false);
}
catch
{
Expand Down Expand Up @@ -637,8 +650,8 @@ private unsafe int HandleEventStreamsAvailable(ref STREAMS_AVAILABLE_DATA data)
{
int bidirectionalStreamsCountIncrement = data.BidirectionalCount - _availableBidirectionalStreamsCount;
int unidirectionalStreamsCountIncrement = data.UnidirectionalCount - _availableUnidirectionalStreamsCount;
Volatile.Write(ref _availableBidirectionalStreamsCount, data.BidirectionalCount);
Volatile.Write(ref _availableUnidirectionalStreamsCount, data.UnidirectionalCount);
_availableBidirectionalStreamsCount = data.BidirectionalCount;
_availableUnidirectionalStreamsCount = data.UnidirectionalCount;
OnStreamsAvailable(bidirectionalStreamsCountIncrement, unidirectionalStreamsCountIncrement);
return QUIC_STATUS_SUCCESS;
}
Expand Down
15 changes: 14 additions & 1 deletion src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public sealed partial class QuicStream
private long _id = -1;
private readonly QuicStreamType _type;

/// <summary>
/// Provided via <see cref="StartAsync(Action{QuicStreamType}, CancellationToken)" /> from <see cref="QuicConnection" /> so that <see cref="QuicStream"/> can decrement its available stream count field.
/// When <see cref="HandleEventStartComplete(ref START_COMPLETE_DATA)">START_COMPLETE</see> arrives it gets invoked and unset back to <c>null</c> to not to hold any unintended reference to <see cref="QuicConnection"/>.
/// </summary>
private Action<QuicStreamType>? _decrementAvailableStreamCount;

/// <summary>
/// Stream id, see <see href="https://www.rfc-editor.org/rfc/rfc9000.html#name-stream-types-and-identifier" />.
/// </summary>
Expand Down Expand Up @@ -234,9 +240,10 @@ internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QUIC_HANDLE
/// If no more concurrent streams can be opened at the moment, the operation will wait until it can,
/// either by closing some existing streams or receiving more available stream ids from the peer.
/// </summary>
/// <param name="decrementAvailableStreamCount"></param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <returns>An asynchronous task that completes with the opened <see cref="QuicStream" />.</returns>
internal ValueTask StartAsync(CancellationToken cancellationToken = default)
internal ValueTask StartAsync(Action<QuicStreamType> decrementAvailableStreamCount, CancellationToken cancellationToken = default)
{
_startedTcs.TryInitialize(out ValueTask valueTask, this, cancellationToken);
{
Expand All @@ -252,6 +259,7 @@ internal ValueTask StartAsync(CancellationToken cancellationToken = default)
}
}

_decrementAvailableStreamCount = decrementAvailableStreamCount;
return valueTask;
}

Expand Down Expand Up @@ -518,9 +526,13 @@ public void CompleteWrites()

private unsafe int HandleEventStartComplete(ref START_COMPLETE_DATA data)
{
Debug.Assert(_decrementAvailableStreamCount is not null);

_id = unchecked((long)data.ID);
if (StatusSucceeded(data.Status))
{
_decrementAvailableStreamCount(Type);

if (data.PeerAccepted != 0)
{
_startedTcs.TrySetResult();
Expand All @@ -535,6 +547,7 @@ private unsafe int HandleEventStartComplete(ref START_COMPLETE_DATA data)
}
}

_decrementAvailableStreamCount = null;
return QUIC_STATUS_SUCCESS;
}
private unsafe int HandleEventReceive(ref RECEIVE_DATA data)
Expand Down

0 comments on commit 06fab49

Please sign in to comment.