From a67185d5c8bc7b1cd64cf1e3cef85dee84ac2249 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Wed, 25 Sep 2024 23:49:33 -0500 Subject: [PATCH] Added cancellation token to acknowledgement channel writer Signed-off-by: Whit Waldo --- .../PublishSubscribe/PublishSubscribeReceiver.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 4cc1ea80..7bb7b814 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -152,8 +152,9 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default /// /// The identifier of the message the behavior is in reference to. /// The behavior to take on the message as indicated by either the message handler or timeout message handling configuration. + /// Cancellation token. /// - private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction behavior) + private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction behavior, CancellationToken cancellationToken) { var action = behavior switch { @@ -165,7 +166,7 @@ private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction }; var acknowledgement = new TopicAcknowledgement(messageId, action); - await acknowledgementsChannel.Writer.WriteAsync(acknowledgement); + await acknowledgementsChannel.Writer.WriteAsync(acknowledgement, cancellationToken); } /// @@ -205,12 +206,12 @@ private async Task ProcessTopicChannelMessagesAsync(CancellationToken cancellati try { //Share the result with the sidecar - await AcknowledgeMessageAsync(message.Id, messageAction); + await AcknowledgeMessageAsync(message.Id, messageAction, cancellationToken); } catch (OperationCanceledException) { //Acknowledge the message using the configured default response action - await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction); + await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction, cancellationToken); } } }