diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs index 7988e131..c2a6fb21 100644 --- a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs +++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs @@ -19,14 +19,11 @@ async Task HandleMessage(TopicMessage message, Cancellation } } -//Create a dynamic streaming subscription -var subscription = daprMessagingClient.Register("pubsub", "myTopic", - new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(15), TopicResponseAction.Retry)), - HandleMessage, CancellationToken.None); - -//Subscribe to messages on it with a timeout of 30 seconds -var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); -await subscription.SubscribeAsync(cancellationTokenSource.Token); +//Create a dynamic streaming subscription and subscribe with a timeout of 20 seconds +var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(20)); +var subscription = await daprMessagingClient.SubscribeAsync("pubsub", "myTopic", + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)), + HandleMessage, cancellationTokenSource.Token); await Task.Delay(TimeSpan.FromMinutes(1)); diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs index 2c0c256f..5d933a86 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs @@ -27,5 +27,5 @@ public abstract class DaprPublishSubscribeClient /// The delegate reflecting the action to take upon messages received by the subscription. /// Cancellation token. /// - public abstract PublishSubscribeReceiver Register(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken); + public abstract Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken); } diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs index 3c1da481..fa552d7e 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs @@ -39,5 +39,10 @@ public DaprPublishSubscribeGrpcClient(P.DaprClient client) /// The delegate reflecting the action to take upon messages received by the subscription. /// Cancellation token. /// - public override PublishSubscribeReceiver Register(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken) => new(pubSubName, topicName, options, messageHandler, daprClient); + public override async Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken) + { + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient); + await receiver.SubscribeAsync(cancellationToken); + return receiver; + } } diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 9a92d44e..7ac07de0 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -91,7 +91,7 @@ internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubsc /// /// Cancellation token. /// An containing messages provided by the sidecar. - public async Task SubscribeAsync(CancellationToken cancellationToken = default) + internal async Task SubscribeAsync(CancellationToken cancellationToken = default) { //Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream). if (hasInitialized)