diff --git a/projects/client/RabbitMQ.Client/src/client/api/AsyncDefaultBasicConsumer.cs b/projects/client/RabbitMQ.Client/src/client/api/AsyncDefaultBasicConsumer.cs new file mode 100644 index 0000000000..51a1f41942 --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/api/AsyncDefaultBasicConsumer.cs @@ -0,0 +1,144 @@ +using System; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; +using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions; + +namespace RabbitMQ.Client +{ + public class AsyncDefaultBasicConsumer : IBasicConsumer, IAsyncBasicConsumer + { + /// + /// Creates a new instance of an . + /// + public AsyncDefaultBasicConsumer() + { + ShutdownReason = null; + Model = null; + IsRunning = false; + ConsumerTag = null; + } + + /// + /// Constructor which sets the Model property to the given value. + /// + /// Common AMQP model. + public AsyncDefaultBasicConsumer(IModel model) + { + ShutdownReason = null; + IsRunning = false; + ConsumerTag = null; + Model = model; + } + + /// + /// Retrieve the consumer tag this consumer is registered as; to be used when discussing this consumer + /// with the server, for instance with . + /// + public string ConsumerTag { get; protected set; } + + /// + /// Returns true while the consumer is registered and expecting deliveries from the broker. + /// + public bool IsRunning { get; protected set; } + + /// + /// If our shuts down, this property will contain a description of the reason for the + /// shutdown. Otherwise it will contain null. See . + /// + public ShutdownEventArgs ShutdownReason { get; protected set; } + + /// + /// Retrieve the this consumer is associated with, + /// for use in acknowledging received messages, for instance. + /// + public IModel Model { get; protected set; } + + /// + /// Called when the consumer is cancelled for reasons other than by a basicCancel: + /// e.g. the queue has been deleted (either by this channel or by any other channel). + /// See for notification of consumer cancellation due to basicCancel + /// + /// Consumer tag this consumer is registered. + public virtual Task HandleBasicCancel(string consumerTag) + { + IsRunning = false; + return TaskExtensions.CompletedTask; + } + + /// + /// Called upon successful deregistration of the consumer from the broker. + /// + /// Consumer tag this consumer is registered. + public virtual Task HandleBasicCancelOk(string consumerTag) + { + IsRunning = false; + return TaskExtensions.CompletedTask; + } + + /// + /// Called upon successful registration of the consumer with the broker. + /// + /// Consumer tag this consumer is registered. + public virtual Task HandleBasicConsumeOk(string consumerTag) + { + ConsumerTag = consumerTag; + IsRunning = true; + return TaskExtensions.CompletedTask; + } + + /// + /// Called each time a message arrives for this consumer. + /// + /// + /// Does nothing with the passed in information. + /// Note that in particular, some delivered messages may require acknowledgement via . + /// The implementation of this method in this class does NOT acknowledge such messages. + /// + public virtual Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) + { + return TaskExtensions.CompletedTask; + } + + /// + /// Called when the model shuts down. + /// + /// Common AMQP model. + /// Information about the reason why a particular model, session, or connection was destroyed. + public virtual Task HandleModelShutdown(object model, ShutdownEventArgs reason) + { + ShutdownReason = reason; + return TaskExtensions.CompletedTask; + } + + event EventHandler IBasicConsumer.ConsumerCancelled + { + add { throw new InvalidOperationException("Should never be called."); } + remove { throw new InvalidOperationException("Should never be called."); } + } + + void IBasicConsumer.HandleBasicCancelOk(string consumerTag) + { + throw new InvalidOperationException("Should never be called."); + } + + void IBasicConsumer.HandleBasicConsumeOk(string consumerTag) + { + throw new InvalidOperationException("Should never be called."); + } + + void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) + { + throw new InvalidOperationException("Should never be called."); + } + + void IBasicConsumer.HandleModelShutdown(object model, ShutdownEventArgs reason) + { + throw new InvalidOperationException("Should never be called."); + } + + void IBasicConsumer.HandleBasicCancel(string consumerTag) + { + throw new InvalidOperationException("Should never be called."); + } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs b/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs index ccd48f04be..35b6cac52e 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs @@ -98,7 +98,7 @@ namespace RabbitMQ.Client ///"amqp://foo/" (note the trailling slash) also represent the ///default virtual host. The latter issue means that virtual ///hosts with an empty name are not addressable. - public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory + public class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionFactory { /// /// Default value for the desired maximum channel number, with zero meaning unlimited (value: 0). @@ -158,6 +158,12 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory /// public bool AutomaticRecoveryEnabled { get; set; } = true; + /// + /// Set to true will enable a asynchronous consumer dispatcher which is compatible with . + /// Defaults to false. + /// + public bool DispatchConsumersAsync { get; set; } = false; + /// The host to connect to. public string HostName { get; set; } = "localhost"; diff --git a/projects/client/RabbitMQ.Client/src/client/api/IAsyncBasicConsumer.cs b/projects/client/RabbitMQ.Client/src/client/api/IAsyncBasicConsumer.cs new file mode 100644 index 0000000000..1389b28420 --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/api/IAsyncBasicConsumer.cs @@ -0,0 +1,57 @@ +using System.Threading.Tasks; +using RabbitMQ.Client.Events; + +namespace RabbitMQ.Client +{ + public interface IAsyncBasicConsumer + { + /// + /// Retrieve the this consumer is associated with, + /// for use in acknowledging received messages, for instance. + /// + IModel Model { get; } + + /// + /// Called when the consumer is cancelled for reasons other than by a basicCancel: + /// e.g. the queue has been deleted (either by this channel or by any other channel). + /// See for notification of consumer cancellation due to basicCancel + /// + /// Consumer tag this consumer is registered. + Task HandleBasicCancel(string consumerTag); + + /// + /// Called upon successful deregistration of the consumer from the broker. + /// + /// Consumer tag this consumer is registered. + Task HandleBasicCancelOk(string consumerTag); + + /// + /// Called upon successful registration of the consumer with the broker. + /// + /// Consumer tag this consumer is registered. + Task HandleBasicConsumeOk(string consumerTag); + + /// + /// Called each time a message arrives for this consumer. + /// + /// + /// Does nothing with the passed in information. + /// Note that in particular, some delivered messages may require acknowledgement via . + /// The implementation of this method in this class does NOT acknowledge such messages. + /// + Task HandleBasicDeliver(string consumerTag, + ulong deliveryTag, + bool redelivered, + string exchange, + string routingKey, + IBasicProperties properties, + byte[] body); + + /// + /// Called when the model shuts down. + /// + /// Common AMQP model. + /// Information about the reason why a particular model, session, or connection was destroyed. + Task HandleModelShutdown(object model, ShutdownEventArgs reason); + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/api/IAsyncConnectionFactory.cs b/projects/client/RabbitMQ.Client/src/client/api/IAsyncConnectionFactory.cs new file mode 100644 index 0000000000..313e1b5cdf --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/api/IAsyncConnectionFactory.cs @@ -0,0 +1,7 @@ +namespace RabbitMQ.Client +{ + internal interface IAsyncConnectionFactory : IConnectionFactory + { + bool DispatchConsumersAsync { get; set; } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/events/AsyncEventHandler.cs b/projects/client/RabbitMQ.Client/src/client/events/AsyncEventHandler.cs new file mode 100644 index 0000000000..7ab70fbefc --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/events/AsyncEventHandler.cs @@ -0,0 +1,7 @@ +using System; +using System.Threading.Tasks; + +namespace RabbitMQ.Client.Events +{ + public delegate Task AsyncEventHandler(object sender, TEvent @event) where TEvent : EventArgs; +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/events/AsyncEventingBasicConsumer.cs b/projects/client/RabbitMQ.Client/src/client/events/AsyncEventingBasicConsumer.cs new file mode 100644 index 0000000000..f57ed53060 --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/events/AsyncEventingBasicConsumer.cs @@ -0,0 +1,53 @@ +using System; +using System.Threading.Tasks; +using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions; + +namespace RabbitMQ.Client.Events +{ + public class AsyncEventingBasicConsumer : AsyncDefaultBasicConsumer + { + ///Constructor which sets the Model property to the + ///given value. + public AsyncEventingBasicConsumer(IModel model) : base(model) + { + Received = (model1, args) => TaskExtensions.CompletedTask; + Registered = (model1, args) => TaskExtensions.CompletedTask; + Unregistered = (model1, args) => TaskExtensions.CompletedTask; + Shutdown = (model1, args) => TaskExtensions.CompletedTask; + } + + public Func Received { get; set; } + + public Func Registered { get; set; } + public Func Unregistered { get; set; } + public Func Shutdown { get; set; } + + public override Task HandleBasicCancelOk(string consumerTag) + { + return base.HandleBasicCancelOk(consumerTag).ContinueWith(t => Unregistered(Model, new ConsumerEventArgs(consumerTag))).Unwrap(); + } + + public override Task HandleBasicConsumeOk(string consumerTag) + { + return base.HandleBasicCancelOk(consumerTag).ContinueWith(t => Registered(Model, new ConsumerEventArgs(consumerTag))).Unwrap(); + } + + public override Task HandleBasicDeliver(string consumerTag, + ulong deliveryTag, + bool redelivered, + string exchange, + string routingKey, + IBasicProperties properties, + byte[] body) + { + return base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body) + .ContinueWith(t => Received(Model, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body))) + .Unwrap(); + } + + public override Task HandleModelShutdown(object model, ShutdownEventArgs reason) + { + return base.HandleModelShutdown(model, reason).ContinueWith(t => Shutdown(Model, reason)).Unwrap(); + } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerDispatcher.cs b/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerDispatcher.cs new file mode 100644 index 0000000000..705532a4e0 --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerDispatcher.cs @@ -0,0 +1,87 @@ +namespace RabbitMQ.Client.Impl +{ + internal class AsyncConsumerDispatcher : IConsumerDispatcher + { + private readonly ModelBase model; + private readonly AsyncConsumerWorkService workService; + + public AsyncConsumerDispatcher(ModelBase model, AsyncConsumerWorkService ws) + { + this.model = model; + this.workService = ws; + this.IsShutdown = false; + } + + public void Quiesce() + { + IsShutdown = true; + } + + public void Shutdown() + { + // necessary evil + this.workService.Stop().GetAwaiter().GetResult(); + } + + public void Shutdown(IModel model) + { + // necessary evil + this.workService.Stop(model).GetAwaiter().GetResult(); + } + + public bool IsShutdown + { + get; + private set; + } + + public void HandleBasicConsumeOk(IBasicConsumer consumer, + string consumerTag) + { + ScheduleUnlessShuttingDown(new BasicConsumeOk(consumer, consumerTag)); + } + + public void HandleBasicDeliver(IBasicConsumer consumer, + string consumerTag, + ulong deliveryTag, + bool redelivered, + string exchange, + string routingKey, + IBasicProperties basicProperties, + byte[] body) + { + ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body)); + } + + public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag) + { + ScheduleUnlessShuttingDown(new BasicCancelOk(consumer, consumerTag)); + } + + public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag) + { + ScheduleUnlessShuttingDown(new BasicCancel(consumer, consumerTag)); + } + + public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) + { + // the only case where we ignore the shutdown flag. + new ModelShutdown(consumer,reason).Execute(model).GetAwaiter().GetResult(); + } + + private void ScheduleUnlessShuttingDown(TWork work) + where TWork : Work + { + if (!this.IsShutdown) + { + Schedule(work); + } + } + + private void Schedule(TWork work) + where TWork : Work + { + this.workService.Schedule(this.model, work); + } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs b/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs new file mode 100644 index 0000000000..abf4d65215 --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs @@ -0,0 +1,104 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client.Impl; + +namespace RabbitMQ.Client +{ + internal class AsyncConsumerWorkService : ConsumerWorkService + { + readonly ConcurrentDictionary workPools = new ConcurrentDictionary(); + + public void Schedule(ModelBase model, TWork work) + where TWork : Work + { + // two step approach is taken, as TryGetValue does not aquire locks + // if this fails, GetOrAdd is called, which takes a lock + + WorkPool workPool; + if (workPools.TryGetValue(model, out workPool) == false) + { + var newWorkPool = new WorkPool(model); + workPool = workPools.GetOrAdd(model, newWorkPool); + + // start if it's only the workpool that has been just created + if (newWorkPool == workPool) + { + newWorkPool.Start(); + } + } + + workPool.Enqueue(work); + } + + public async Task Stop(IModel model) + { + WorkPool workPool; + if (workPools.TryRemove(model, out workPool)) + { + await workPool.Stop().ConfigureAwait(false); + } + } + + public async Task Stop() + { + foreach (var model in workPools.Keys) + { + await Stop(model).ConfigureAwait(false); + } + } + + class WorkPool + { + readonly ConcurrentQueue workQueue; + readonly CancellationTokenSource tokenSource; + readonly ModelBase model; + TaskCompletionSource messageArrived; + private Task task; + + public WorkPool(ModelBase model) + { + this.model = model; + workQueue = new ConcurrentQueue(); + messageArrived = new TaskCompletionSource(); + tokenSource = new CancellationTokenSource(); + } + + public void Start() + { + task = Task.Run(Loop, CancellationToken.None); + } + + public void Enqueue(Work work) + { + workQueue.Enqueue(work); + messageArrived.TrySetResult(true); + } + + async Task Loop() + { + using (tokenSource.Token.Register(() => messageArrived.TrySetResult(true))) + { + while (tokenSource.IsCancellationRequested == false) + { + Work work; + while (workQueue.TryDequeue(out work)) + { + await work.Execute(model).ConfigureAwait(false); + } + + await messageArrived.Task.ConfigureAwait(false); + messageArrived = new TaskCompletionSource(); + } + } + } + + public Task Stop() + { + tokenSource.Cancel(); + return task; + } + } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/BasicCancel.cs b/projects/client/RabbitMQ.Client/src/client/impl/BasicCancel.cs new file mode 100644 index 0000000000..c44da7206c --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/impl/BasicCancel.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; + +namespace RabbitMQ.Client.Impl +{ + sealed class BasicCancel : Work + { + readonly string consumerTag; + + public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer) + { + this.consumerTag = consumerTag; + } + + protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) + { + try + { + await consumer.HandleBasicCancel(consumerTag).ConfigureAwait(false); + } + catch (Exception e) + { + var details = new Dictionary + { + {"consumer", consumer}, + {"context", "HandleBasicCancel"} + }; + model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + } + } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/BasicCancelOk.cs b/projects/client/RabbitMQ.Client/src/client/impl/BasicCancelOk.cs new file mode 100644 index 0000000000..29aea095b3 --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/impl/BasicCancelOk.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; + +namespace RabbitMQ.Client.Impl +{ + sealed class BasicCancelOk : Work + { + readonly string consumerTag; + + public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consumer) + { + this.consumerTag = consumerTag; + } + + protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) + { + try + { + await consumer.HandleBasicCancelOk(consumerTag).ConfigureAwait(false); + } + catch (Exception e) + { + var details = new Dictionary() + { + {"consumer", consumer}, + {"context", "HandleBasicCancelOk"} + }; + model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + } + } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/BasicConsumeOk.cs b/projects/client/RabbitMQ.Client/src/client/impl/BasicConsumeOk.cs new file mode 100644 index 0000000000..53f7b020fa --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/impl/BasicConsumeOk.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; + +namespace RabbitMQ.Client.Impl +{ + sealed class BasicConsumeOk : Work + { + readonly string consumerTag; + + public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consumer) + { + this.consumerTag = consumerTag; + } + + protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) + { + try + { + await consumer.HandleBasicConsumeOk(consumerTag).ConfigureAwait(false); + } + catch (Exception e) + { + var details = new Dictionary() + { + {"consumer", consumer}, + {"context", "HandleBasicConsumeOk"} + }; + model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + } + } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/BasicDeliver.cs b/projects/client/RabbitMQ.Client/src/client/impl/BasicDeliver.cs new file mode 100644 index 0000000000..cd65c3582b --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/impl/BasicDeliver.cs @@ -0,0 +1,59 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; + +namespace RabbitMQ.Client.Impl +{ + sealed class BasicDeliver : Work + { + readonly string consumerTag; + readonly ulong deliveryTag; + readonly bool redelivered; + readonly string exchange; + readonly string routingKey; + readonly IBasicProperties basicProperties; + readonly byte[] body; + + public BasicDeliver(IBasicConsumer consumer, + string consumerTag, + ulong deliveryTag, + bool redelivered, + string exchange, + string routingKey, + IBasicProperties basicProperties, + byte[] body) : base(consumer) + { + this.consumerTag = consumerTag; + this.deliveryTag = deliveryTag; + this.redelivered = redelivered; + this.exchange = exchange; + this.routingKey = routingKey; + this.basicProperties = basicProperties; + this.body = body; + } + + protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) + { + try + { + await consumer.HandleBasicDeliver(consumerTag, + deliveryTag, + redelivered, + exchange, + routingKey, + basicProperties, + body).ConfigureAwait(false); + } + catch (Exception e) + { + var details = new Dictionary() + { + {"consumer", consumer}, + {"context", "HandleBasicDeliver"} + }; + model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + } + } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs b/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs index d02c33fb64..eebec4b879 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs @@ -120,7 +120,16 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa FrameMax = 0; m_factory = factory; m_frameHandler = frameHandler; - ConsumerWorkService = new ConsumerWorkService(); + + var asyncConnectionFactory = factory as IAsyncConnectionFactory; + if (asyncConnectionFactory != null && asyncConnectionFactory.DispatchConsumersAsync) + { + ConsumerWorkService = new AsyncConsumerWorkService(); + } + else + { + ConsumerWorkService = new ConsumerWorkService(); + } m_sessionManager = new SessionManager(this, 0); m_session0 = new MainSession(this) { Handler = NotifyReceivedCloseOk }; diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index 91336151d0..339296aea2 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -96,7 +96,16 @@ public ModelBase(ISession session) public ModelBase(ISession session, ConsumerWorkService workService) { - ConsumerDispatcher = new ConcurrentConsumerDispatcher(this, workService); + var asyncConsumerWorkService = workService as AsyncConsumerWorkService; + if (asyncConsumerWorkService != null) + { + ConsumerDispatcher = new AsyncConsumerDispatcher(this, asyncConsumerWorkService); + } + else + { + ConsumerDispatcher = new ConcurrentConsumerDispatcher(this, workService); + } + Initialise(session); } @@ -1159,6 +1168,17 @@ public string BasicConsume(string queue, IDictionary arguments, IBasicConsumer consumer) { + // TODO: Replace with flag + var asyncDispatcher = ConsumerDispatcher as AsyncConsumerDispatcher; + if (asyncDispatcher != null) + { + var asyncConsumer = consumer as IAsyncBasicConsumer; + if (asyncConsumer == null) + { + // TODO: Friendly message + throw new InvalidOperationException("In the async mode you have to use an async consumer"); + } + } var k = new BasicConsumerRpcContinuation { m_consumer = consumer }; diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelShutdown.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelShutdown.cs new file mode 100644 index 0000000000..8357399c30 --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelShutdown.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; + +namespace RabbitMQ.Client.Impl +{ + sealed class ModelShutdown : Work + { + readonly ShutdownEventArgs reason; + + public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(consumer) + { + this.reason = reason; + } + + protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) + { + try + { + await consumer.HandleModelShutdown(model, reason).ConfigureAwait(false); + } + catch (Exception e) + { + var details = new Dictionary() + { + { "consumer", consumer }, + { "context", "HandleModelShutdown" } + }; + model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + } + } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs index 7677eb6c5b..c77d35d21a 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs @@ -53,6 +53,8 @@ namespace RabbitMQ.Client.Impl { static class TaskExtensions { + public static Task CompletedTask = Task.FromResult(0); + public static async Task TimeoutAfter(this Task task, int millisecondsTimeout) { if (task == await Task.WhenAny(task, Task.Delay(millisecondsTimeout)).ConfigureAwait(false)) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/Work.cs b/projects/client/RabbitMQ.Client/src/client/impl/Work.cs new file mode 100644 index 0000000000..a04e04fff8 --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/impl/Work.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading.Tasks; + +namespace RabbitMQ.Client.Impl +{ + internal abstract class Work + { + readonly IAsyncBasicConsumer asyncConsumer; + + protected Work(IBasicConsumer consumer) + { + asyncConsumer = (IAsyncBasicConsumer)consumer; + } + + public async Task Execute(ModelBase model) + { + try + { + await Execute(model, asyncConsumer).ConfigureAwait(false); + } + catch (Exception) + { + // intentionally caught + } + } + + protected abstract Task Execute(ModelBase model, IAsyncBasicConsumer consumer); + } +} \ No newline at end of file