Skip to content

Commit

Permalink
Updated to eliminate the double call for 'Register.SubscribeAsync' as…
Browse files Browse the repository at this point in the history
… the GrpcClient creates the PublishSubscribeReceiver, then calls SubscribeAsync internally. Renamed Register to `SubscribeAsync` and updated return type + example

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
  • Loading branch information
WhitWaldo committed Sep 25, 2024
1 parent 1cb6855 commit 462e9a1
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ async Task<TopicResponseAction> HandleMessage(TopicMessage message, Cancellation
}
}

//Create a dynamic streaming subscription
var subscription = daprMessagingClient.Register("pubsub", "myTopic",
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(15), TopicResponseAction.Retry)),
HandleMessage, CancellationToken.None);

//Subscribe to messages on it with a timeout of 30 seconds
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await subscription.SubscribeAsync(cancellationTokenSource.Token);
//Create a dynamic streaming subscription and subscribe with a timeout of 20 seconds
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(20));
var subscription = await daprMessagingClient.SubscribeAsync("pubsub", "myTopic",
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)),
HandleMessage, cancellationTokenSource.Token);

await Task.Delay(TimeSpan.FromMinutes(1));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ public abstract class DaprPublishSubscribeClient
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
public abstract PublishSubscribeReceiver Register(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken);
public abstract Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,10 @@ public DaprPublishSubscribeGrpcClient(P.DaprClient client)
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
public override PublishSubscribeReceiver Register(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken) => new(pubSubName, topicName, options, messageHandler, daprClient);
public override async Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken)
{
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient);
await receiver.SubscribeAsync(cancellationToken);
return receiver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubsc
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>An <see cref="IAsyncEnumerable{TopicMessage}"/> containing messages provided by the sidecar.</returns>
public async Task SubscribeAsync(CancellationToken cancellationToken = default)
internal async Task SubscribeAsync(CancellationToken cancellationToken = default)
{
//Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream).
if (hasInitialized)
Expand Down

0 comments on commit 462e9a1

Please sign in to comment.