Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConfigureAwait(false) for close stream/consumer/producer, query offset and create producer #231

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ csharp_space_between_square_brackets = false
# Analyzers
dotnet_code_quality.ca1802.api_surface = private, internal

# CA2007: Do not directly await a Task
dotnet_diagnostic.CA2007.severity = warning

# IDE0073: File header
dotnet_diagnostic.IDE0073.severity = warning
file_header_template = This source code is dual-licensed under the Apache License, version\n2.0, and the Mozilla Public License, version 2.0.\nCopyright (c) 2007-2023 VMware, Inc.
Expand Down
70 changes: 33 additions & 37 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,43 +190,39 @@ private async Task OnConnectionClosed(string reason)
{
if (ConnectionClosed != null)
{
await ConnectionClosed?.Invoke(reason)!;
var t = ConnectionClosed?.Invoke(reason)!;
await t.ConfigureAwait(false);
}
}

public static async Task<Client> Create(ClientParameters parameters, ILogger logger = null)
{
var client = new Client(parameters, logger);

client.connection = await Connection.Create(parameters.Endpoint,
client.HandleIncoming, client.HandleClosed, parameters.Ssl);
client.connection = await Connection.Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl).ConfigureAwait(false);

// exchange properties
var peerPropertiesResponse =
await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
new PeerPropertiesRequest(corr, parameters.Properties));
await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false);
logger?.LogDebug("Server properties: {@Properties}", parameters.Properties);

//auth
var saslHandshakeResponse =
await client.Request<SaslHandshakeRequest, SaslHandshakeResponse>(
corr => new SaslHandshakeRequest(corr));
await client.Request<SaslHandshakeRequest, SaslHandshakeResponse>(corr => new SaslHandshakeRequest(corr)).ConfigureAwait(false);
logger?.LogDebug("Sasl mechanism: {Mechanisms}", saslHandshakeResponse.Mechanisms);

var saslData = Encoding.UTF8.GetBytes($"\0{parameters.UserName}\0{parameters.Password}");
var authResponse =
await client.Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr =>
new SaslAuthenticateRequest(corr, "PLAIN", saslData));
await client.Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr => new SaslAuthenticateRequest(corr, "PLAIN", saslData)).ConfigureAwait(false);
ClientExceptions.MaybeThrowException(authResponse.ResponseCode, parameters.UserName);

//tune
await client.tuneReceived.Task;
await client.tuneReceived.Task.ConfigureAwait(false);
await client.Publish(new TuneRequest(0,
(uint)client.Parameters.Heartbeat.TotalSeconds));
(uint)client.Parameters.Heartbeat.TotalSeconds)).ConfigureAwait(false);

// open
var open = await client.Request<OpenRequest, OpenResponse>(corr =>
new OpenRequest(corr, parameters.VirtualHost));
var open = await client.Request<OpenRequest, OpenResponse>(corr => new OpenRequest(corr, parameters.VirtualHost)).ConfigureAwait(false);
ClientExceptions.MaybeThrowException(open.ResponseCode, parameters.VirtualHost);
logger?.LogDebug("Open: ConnectionProperties: {ConnectionProperties}", open.ConnectionProperties);
client.ConnectionProperties = open.ConnectionProperties;
Expand All @@ -239,7 +235,7 @@ await client.Publish(new TuneRequest(0,

public async ValueTask<bool> Publish(Publish publishMsg)
{
var publishTask = await Publish<Publish>(publishMsg);
var publishTask = await Publish<Publish>(publishMsg).ConfigureAwait(false);

publishCommandsSent += 1;
messagesSent += publishMsg.MessageCount;
Expand All @@ -259,7 +255,7 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
var publisherId = nextPublisherId++;
publishers.Add(publisherId, (confirmCallback, errorCallback));
return (publisherId, await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)));
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false));
}

public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
Expand All @@ -268,7 +264,7 @@ public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
{
var result =
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
new DeletePublisherRequest(corr, publisherId));
new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false);

return result;
}
Expand All @@ -287,7 +283,7 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
initialCredit,
properties,
deliverHandler,
consumerUpdateHandler);
consumerUpdateHandler).ConfigureAwait(false);
}

public async Task<(byte, SubscribeResponse)> Subscribe(RawConsumerConfig config,
Expand All @@ -305,7 +301,7 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
return (subscriptionId,
await Request<SubscribeRequest, SubscribeResponse>(corr =>
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
properties)));
properties)).ConfigureAwait(false));
}

public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
Expand All @@ -314,7 +310,7 @@ public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
{
var result =
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
new UnsubscribeRequest(corr, subscriptionId));
new UnsubscribeRequest(corr, subscriptionId)).ConfigureAwait(false);
return result;
}
finally
Expand All @@ -327,7 +323,7 @@ await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
public async Task<PartitionsQueryResponse> QueryPartition(string superStream)
{
return await Request<PartitionsQueryRequest, PartitionsQueryResponse>(corr =>
new PartitionsQueryRequest(corr, superStream));
new PartitionsQueryRequest(corr, superStream)).ConfigureAwait(false);
}

private async ValueTask<TOut> Request<TIn, TOut>(Func<uint, TIn> request, TimeSpan? timeout = null)
Expand All @@ -336,15 +332,15 @@ private async ValueTask<TOut> Request<TIn, TOut>(Func<uint, TIn> request, TimeSp
var corr = NextCorrelationId();
var tcs = PooledTaskSource<TOut>.Rent();
requests.TryAdd(corr, tcs);
await Publish(request(corr));
await Publish(request(corr)).ConfigureAwait(false);
using var cts = new CancellationTokenSource(timeout ?? defaultTimeout);
await using (cts.Token.Register(
valueTaskSource =>
((ManualResetValueTaskSource<TOut>)valueTaskSource).SetException(
new TimeoutException()), tcs))
new TimeoutException()), tcs).ConfigureAwait(false))
{
var valueTask = new ValueTask<TOut>(tcs, tcs.Version);
var result = await valueTask;
var result = await valueTask.ConfigureAwait(false);
PooledTaskSource<TOut>.Return(tcs);
return result;
}
Expand All @@ -358,7 +354,7 @@ private uint NextCorrelationId()
private async Task HandleClosed(string reason)
{
InternalClose();
await OnConnectionClosed(reason);
await OnConnectionClosed(reason).ConfigureAwait(false);
}

private async Task HandleIncoming(Memory<byte> frameMemory)
Expand Down Expand Up @@ -412,7 +408,7 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
var consumerEventsUpd = consumers[consumerUpdateQueryResponse.SubscriptionId];
await ConsumerUpdateResponse(
consumerUpdateQueryResponse.CorrelationId,
await consumerEventsUpd.ConsumerUpdateHandler(consumerUpdateQueryResponse.IsActive));
await consumerEventsUpd.ConsumerUpdateHandler(consumerUpdateQueryResponse.IsActive).ConfigureAwait(false)).ConfigureAwait(false);
break;
case CreditResponse.Key:
CreditResponse.Read(frame, out var creditResponse);
Expand Down Expand Up @@ -526,7 +522,7 @@ private void HandleCorrelatedResponse<T>(T command) where T : struct, ICommand

private async ValueTask<bool> SendHeartBeat()
{
return await Publish(new HeartBeatRequest());
return await Publish(new HeartBeatRequest()).ConfigureAwait(false);
}

private void InternalClose()
Expand All @@ -537,7 +533,7 @@ private void InternalClose()

private async ValueTask<bool> ConsumerUpdateResponse(uint rCorrelationId, IOffsetType offsetSpecification)
{
return await Publish(new ConsumerUpdateRequest(rCorrelationId, offsetSpecification));
return await Publish(new ConsumerUpdateRequest(rCorrelationId, offsetSpecification)).ConfigureAwait(false);
}

public async Task<CloseResponse> Close(string reason)
Expand All @@ -551,7 +547,7 @@ public async Task<CloseResponse> Close(string reason)
{
var result =
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
TimeSpan.FromSeconds(10));
TimeSpan.FromSeconds(10)).ConfigureAwait(false);

InternalClose();
connection.Dispose();
Expand Down Expand Up @@ -588,46 +584,46 @@ internal async Task<CloseResponse> MaybeClose(string reason)
public async ValueTask<QueryPublisherResponse> QueryPublisherSequence(string publisherRef, string stream)
{
return await Request<QueryPublisherRequest, QueryPublisherResponse>(corr =>
new QueryPublisherRequest(corr, publisherRef, stream));
new QueryPublisherRequest(corr, publisherRef, stream)).ConfigureAwait(false);
}

public async ValueTask<bool> StoreOffset(string reference, string stream, ulong offsetValue)
{
return await Publish(new StoreOffsetRequest(stream, reference, offsetValue));
return await Publish(new StoreOffsetRequest(stream, reference, offsetValue)).ConfigureAwait(false);
}

public async ValueTask<MetaDataResponse> QueryMetadata(string[] streams)
{
return await Request<MetaDataQuery, MetaDataResponse>(corr => new MetaDataQuery(corr, streams.ToList()));
return await Request<MetaDataQuery, MetaDataResponse>(corr => new MetaDataQuery(corr, streams.ToList())).ConfigureAwait(false);
}

public async Task<bool> StreamExists(string stream)
{
var streams = new[] { stream };
var response = await QueryMetadata(streams);
var response = await QueryMetadata(streams).ConfigureAwait(false);
return response.StreamInfos is { Count: >= 1 } &&
response.StreamInfos[stream].ResponseCode == ResponseCode.Ok;
}

public async ValueTask<QueryOffsetResponse> QueryOffset(string reference, string stream)
{
return await Request<QueryOffsetRequest, QueryOffsetResponse>(corr =>
new QueryOffsetRequest(stream, corr, reference));
new QueryOffsetRequest(stream, corr, reference)).ConfigureAwait(false);
}

public async ValueTask<CreateResponse> CreateStream(string stream, IDictionary<string, string> args)
{
return await Request<CreateRequest, CreateResponse>(corr => new CreateRequest(corr, stream, args));
return await Request<CreateRequest, CreateResponse>(corr => new CreateRequest(corr, stream, args)).ConfigureAwait(false);
}

public async ValueTask<DeleteResponse> DeleteStream(string stream)
{
return await Request<DeleteRequest, DeleteResponse>(corr => new DeleteRequest(corr, stream));
return await Request<DeleteRequest, DeleteResponse>(corr => new DeleteRequest(corr, stream)).ConfigureAwait(false);
}

public async ValueTask<bool> Credit(byte subscriptionId, ushort credit)
{
return await Publish(new CreditRequest(subscriptionId, credit));
return await Publish(new CreditRequest(subscriptionId, credit)).ConfigureAwait(false);
}
}

Expand Down
5 changes: 1 addition & 4 deletions RabbitMQ.Stream.Client/Compression.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ public static class StreamCompressionCodecs

public static void UnRegisterCodec(CompressionType compressionType)
{
if (AvailableCompressCodecs.ContainsKey(compressionType))
{
AvailableCompressCodecs.Remove(compressionType);
}
AvailableCompressCodecs.Remove(compressionType);
}

public static ICompressionCodec GetCompressionCodec(CompressionType compressionType)
Expand Down
15 changes: 8 additions & 7 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static async Task<Connection> Create(EndPoint endpoint, Func<Memory<byte>

try
{
await socket.ConnectAsync(endpoint);
await socket.ConnectAsync(endpoint).ConfigureAwait(false);
}
catch (SocketException ex)
{
Expand All @@ -88,7 +88,7 @@ public static async Task<Connection> Create(EndPoint endpoint, Func<Memory<byte>

public async ValueTask<bool> Write<T>(T command) where T : struct, ICommand
{
await WriteCommand(command);
await WriteCommand(command).ConfigureAwait(false);
// we return true to indicate that the command was written
// In this PR https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/220
// we made all WriteCommand async so await is enough to indicate that the command was written
Expand All @@ -99,16 +99,16 @@ public async ValueTask<bool> Write<T>(T command) where T : struct, ICommand
private async Task WriteCommand<T>(T command) where T : struct, ICommand
{
// Only one thread should be able to write to the output pipeline at a time.
await _writeLock.WaitAsync();
await _writeLock.WaitAsync().ConfigureAwait(false);
try
{
var size = command.SizeNeeded;
var mem = new byte[4 + size]; // + 4 to write the size
WireFormatting.WriteUInt32(mem, (uint)size);
var written = command.Write(mem.AsSpan()[4..]);
await writer.WriteAsync(new ReadOnlyMemory<byte>(mem));
await writer.WriteAsync(new ReadOnlyMemory<byte>(mem)).ConfigureAwait(false);
Debug.Assert(size == written);
await writer.FlushAsync();
await writer.FlushAsync().ConfigureAwait(false);
}
finally
{
Expand Down Expand Up @@ -153,7 +153,7 @@ private async Task ProcessIncomingFrames()

// Mark the PipeReader as complete

await reader.CompleteAsync();
await reader.CompleteAsync().ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -166,7 +166,8 @@ private async Task ProcessIncomingFrames()
finally
{
isClosed = true;
await closedCallback?.Invoke("TCP Connection Closed")!;
var t = closedCallback?.Invoke("TCP Connection Closed")!;
await t.ConfigureAwait(false);
Debug.WriteLine("TCP Connection Closed");
}
}
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/HeartBeatHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private void TimerElapsed(object sender, System.Timers.ElapsedEventArgs e)
private async Task PerformHeartBeatAsync()
{
var f = _sendHeartbeatFunc();
await f.AsTask().WaitAsync(TimeSpan.FromMilliseconds(1000));
await f.AsTask().WaitAsync(TimeSpan.FromMilliseconds(1000)).ConfigureAwait(false);

var seconds = (DateTime.Now - _lastUpdate).TotalSeconds;
if (seconds < _heartbeat)
Expand All @@ -77,7 +77,7 @@ private async Task PerformHeartBeatAsync()
// client will be closed
_logger.LogCritical("Too many heartbeats missed: {MissedHeartbeatCounter}", _missedHeartbeat);
Close();
await _close($"Too many heartbeats missed: {_missedHeartbeat}. Client connection will be closed.");
await _close($"Too many heartbeats missed: {_missedHeartbeat}. Client connection will be closed.").ConfigureAwait(false);
}

internal void UpdateHeartBeat()
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/MetaData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ internal static int Read(ReadOnlySequence<byte> frame, out MetaDataResponse comm
if (brokers.Count > 0)
{
var replicas = replicaRefs.Select(r => brokers[r]).ToList();
var leader = brokers.ContainsKey(leaderRef) ? brokers[leaderRef] : default;
var leader = brokers.TryGetValue(leaderRef, out var value) ? value : default;
streamInfos.Add(stream, new StreamInfo(stream, (ResponseCode)code, leader, replicas));
}
else
Expand Down
Loading