Skip to content

Commit

Permalink
Added cancellation token to acknowledgement channel writer
Browse files Browse the repository at this point in the history
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
  • Loading branch information
WhitWaldo committed Sep 26, 2024
1 parent 67a637b commit a67185d
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default
/// </summary>
/// <param name="messageId">The identifier of the message the behavior is in reference to.</param>
/// <param name="behavior">The behavior to take on the message as indicated by either the message handler or timeout message handling configuration.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction behavior)
private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction behavior, CancellationToken cancellationToken)
{
var action = behavior switch
{
Expand All @@ -165,7 +166,7 @@ private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction
};

var acknowledgement = new TopicAcknowledgement(messageId, action);
await acknowledgementsChannel.Writer.WriteAsync(acknowledgement);
await acknowledgementsChannel.Writer.WriteAsync(acknowledgement, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -205,12 +206,12 @@ private async Task ProcessTopicChannelMessagesAsync(CancellationToken cancellati
try
{
//Share the result with the sidecar
await AcknowledgeMessageAsync(message.Id, messageAction);
await AcknowledgeMessageAsync(message.Id, messageAction, cancellationToken);
}
catch (OperationCanceledException)
{
//Acknowledge the message using the configured default response action
await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction);
await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction, cancellationToken);
}
}
}
Expand Down

0 comments on commit a67185d

Please sign in to comment.