diff --git a/projects/client/RabbitMQ.Client/src/client/api/AsyncDefaultBasicConsumer.cs b/projects/client/RabbitMQ.Client/src/client/api/AsyncDefaultBasicConsumer.cs
new file mode 100644
index 0000000000..51a1f41942
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/api/AsyncDefaultBasicConsumer.cs
@@ -0,0 +1,144 @@
+using System;
+using System.Threading.Tasks;
+using RabbitMQ.Client.Events;
+using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;
+
+namespace RabbitMQ.Client
+{
+ public class AsyncDefaultBasicConsumer : IBasicConsumer, IAsyncBasicConsumer
+ {
+ ///
+ /// Creates a new instance of an .
+ ///
+ public AsyncDefaultBasicConsumer()
+ {
+ ShutdownReason = null;
+ Model = null;
+ IsRunning = false;
+ ConsumerTag = null;
+ }
+
+ ///
+ /// Constructor which sets the Model property to the given value.
+ ///
+ /// Common AMQP model.
+ public AsyncDefaultBasicConsumer(IModel model)
+ {
+ ShutdownReason = null;
+ IsRunning = false;
+ ConsumerTag = null;
+ Model = model;
+ }
+
+ ///
+ /// Retrieve the consumer tag this consumer is registered as; to be used when discussing this consumer
+ /// with the server, for instance with .
+ ///
+ public string ConsumerTag { get; protected set; }
+
+ ///
+ /// Returns true while the consumer is registered and expecting deliveries from the broker.
+ ///
+ public bool IsRunning { get; protected set; }
+
+ ///
+ /// If our shuts down, this property will contain a description of the reason for the
+ /// shutdown. Otherwise it will contain null. See .
+ ///
+ public ShutdownEventArgs ShutdownReason { get; protected set; }
+
+ ///
+ /// Retrieve the this consumer is associated with,
+ /// for use in acknowledging received messages, for instance.
+ ///
+ public IModel Model { get; protected set; }
+
+ ///
+ /// Called when the consumer is cancelled for reasons other than by a basicCancel:
+ /// e.g. the queue has been deleted (either by this channel or by any other channel).
+ /// See for notification of consumer cancellation due to basicCancel
+ ///
+ /// Consumer tag this consumer is registered.
+ public virtual Task HandleBasicCancel(string consumerTag)
+ {
+ IsRunning = false;
+ return TaskExtensions.CompletedTask;
+ }
+
+ ///
+ /// Called upon successful deregistration of the consumer from the broker.
+ ///
+ /// Consumer tag this consumer is registered.
+ public virtual Task HandleBasicCancelOk(string consumerTag)
+ {
+ IsRunning = false;
+ return TaskExtensions.CompletedTask;
+ }
+
+ ///
+ /// Called upon successful registration of the consumer with the broker.
+ ///
+ /// Consumer tag this consumer is registered.
+ public virtual Task HandleBasicConsumeOk(string consumerTag)
+ {
+ ConsumerTag = consumerTag;
+ IsRunning = true;
+ return TaskExtensions.CompletedTask;
+ }
+
+ ///
+ /// Called each time a message arrives for this consumer.
+ ///
+ ///
+ /// Does nothing with the passed in information.
+ /// Note that in particular, some delivered messages may require acknowledgement via .
+ /// The implementation of this method in this class does NOT acknowledge such messages.
+ ///
+ public virtual Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
+ {
+ return TaskExtensions.CompletedTask;
+ }
+
+ ///
+ /// Called when the model shuts down.
+ ///
+ /// Common AMQP model.
+ /// Information about the reason why a particular model, session, or connection was destroyed.
+ public virtual Task HandleModelShutdown(object model, ShutdownEventArgs reason)
+ {
+ ShutdownReason = reason;
+ return TaskExtensions.CompletedTask;
+ }
+
+ event EventHandler IBasicConsumer.ConsumerCancelled
+ {
+ add { throw new InvalidOperationException("Should never be called."); }
+ remove { throw new InvalidOperationException("Should never be called."); }
+ }
+
+ void IBasicConsumer.HandleBasicCancelOk(string consumerTag)
+ {
+ throw new InvalidOperationException("Should never be called.");
+ }
+
+ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
+ {
+ throw new InvalidOperationException("Should never be called.");
+ }
+
+ void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
+ {
+ throw new InvalidOperationException("Should never be called.");
+ }
+
+ void IBasicConsumer.HandleModelShutdown(object model, ShutdownEventArgs reason)
+ {
+ throw new InvalidOperationException("Should never be called.");
+ }
+
+ void IBasicConsumer.HandleBasicCancel(string consumerTag)
+ {
+ throw new InvalidOperationException("Should never be called.");
+ }
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs b/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs
index ccd48f04be..35b6cac52e 100644
--- a/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs
+++ b/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs
@@ -98,7 +98,7 @@ namespace RabbitMQ.Client
///"amqp://foo/" (note the trailling slash) also represent the
///default virtual host. The latter issue means that virtual
///hosts with an empty name are not addressable.
- public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
+ public class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionFactory
{
///
/// Default value for the desired maximum channel number, with zero meaning unlimited (value: 0).
@@ -158,6 +158,12 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
///
public bool AutomaticRecoveryEnabled { get; set; } = true;
+ ///
+ /// Set to true will enable a asynchronous consumer dispatcher which is compatible with .
+ /// Defaults to false.
+ ///
+ public bool DispatchConsumersAsync { get; set; } = false;
+
/// The host to connect to.
public string HostName { get; set; } = "localhost";
diff --git a/projects/client/RabbitMQ.Client/src/client/api/IAsyncBasicConsumer.cs b/projects/client/RabbitMQ.Client/src/client/api/IAsyncBasicConsumer.cs
new file mode 100644
index 0000000000..1389b28420
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/api/IAsyncBasicConsumer.cs
@@ -0,0 +1,57 @@
+using System.Threading.Tasks;
+using RabbitMQ.Client.Events;
+
+namespace RabbitMQ.Client
+{
+ public interface IAsyncBasicConsumer
+ {
+ ///
+ /// Retrieve the this consumer is associated with,
+ /// for use in acknowledging received messages, for instance.
+ ///
+ IModel Model { get; }
+
+ ///
+ /// Called when the consumer is cancelled for reasons other than by a basicCancel:
+ /// e.g. the queue has been deleted (either by this channel or by any other channel).
+ /// See for notification of consumer cancellation due to basicCancel
+ ///
+ /// Consumer tag this consumer is registered.
+ Task HandleBasicCancel(string consumerTag);
+
+ ///
+ /// Called upon successful deregistration of the consumer from the broker.
+ ///
+ /// Consumer tag this consumer is registered.
+ Task HandleBasicCancelOk(string consumerTag);
+
+ ///
+ /// Called upon successful registration of the consumer with the broker.
+ ///
+ /// Consumer tag this consumer is registered.
+ Task HandleBasicConsumeOk(string consumerTag);
+
+ ///
+ /// Called each time a message arrives for this consumer.
+ ///
+ ///
+ /// Does nothing with the passed in information.
+ /// Note that in particular, some delivered messages may require acknowledgement via .
+ /// The implementation of this method in this class does NOT acknowledge such messages.
+ ///
+ Task HandleBasicDeliver(string consumerTag,
+ ulong deliveryTag,
+ bool redelivered,
+ string exchange,
+ string routingKey,
+ IBasicProperties properties,
+ byte[] body);
+
+ ///
+ /// Called when the model shuts down.
+ ///
+ /// Common AMQP model.
+ /// Information about the reason why a particular model, session, or connection was destroyed.
+ Task HandleModelShutdown(object model, ShutdownEventArgs reason);
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/api/IAsyncConnectionFactory.cs b/projects/client/RabbitMQ.Client/src/client/api/IAsyncConnectionFactory.cs
new file mode 100644
index 0000000000..313e1b5cdf
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/api/IAsyncConnectionFactory.cs
@@ -0,0 +1,7 @@
+namespace RabbitMQ.Client
+{
+ internal interface IAsyncConnectionFactory : IConnectionFactory
+ {
+ bool DispatchConsumersAsync { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/events/AsyncEventHandler.cs b/projects/client/RabbitMQ.Client/src/client/events/AsyncEventHandler.cs
new file mode 100644
index 0000000000..7ab70fbefc
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/events/AsyncEventHandler.cs
@@ -0,0 +1,7 @@
+using System;
+using System.Threading.Tasks;
+
+namespace RabbitMQ.Client.Events
+{
+ public delegate Task AsyncEventHandler(object sender, TEvent @event) where TEvent : EventArgs;
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/events/AsyncEventingBasicConsumer.cs b/projects/client/RabbitMQ.Client/src/client/events/AsyncEventingBasicConsumer.cs
new file mode 100644
index 0000000000..f57ed53060
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/events/AsyncEventingBasicConsumer.cs
@@ -0,0 +1,53 @@
+using System;
+using System.Threading.Tasks;
+using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;
+
+namespace RabbitMQ.Client.Events
+{
+ public class AsyncEventingBasicConsumer : AsyncDefaultBasicConsumer
+ {
+ ///Constructor which sets the Model property to the
+ ///given value.
+ public AsyncEventingBasicConsumer(IModel model) : base(model)
+ {
+ Received = (model1, args) => TaskExtensions.CompletedTask;
+ Registered = (model1, args) => TaskExtensions.CompletedTask;
+ Unregistered = (model1, args) => TaskExtensions.CompletedTask;
+ Shutdown = (model1, args) => TaskExtensions.CompletedTask;
+ }
+
+ public Func Received { get; set; }
+
+ public Func Registered { get; set; }
+ public Func Unregistered { get; set; }
+ public Func Shutdown { get; set; }
+
+ public override Task HandleBasicCancelOk(string consumerTag)
+ {
+ return base.HandleBasicCancelOk(consumerTag).ContinueWith(t => Unregistered(Model, new ConsumerEventArgs(consumerTag))).Unwrap();
+ }
+
+ public override Task HandleBasicConsumeOk(string consumerTag)
+ {
+ return base.HandleBasicCancelOk(consumerTag).ContinueWith(t => Registered(Model, new ConsumerEventArgs(consumerTag))).Unwrap();
+ }
+
+ public override Task HandleBasicDeliver(string consumerTag,
+ ulong deliveryTag,
+ bool redelivered,
+ string exchange,
+ string routingKey,
+ IBasicProperties properties,
+ byte[] body)
+ {
+ return base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
+ .ContinueWith(t => Received(Model, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)))
+ .Unwrap();
+ }
+
+ public override Task HandleModelShutdown(object model, ShutdownEventArgs reason)
+ {
+ return base.HandleModelShutdown(model, reason).ContinueWith(t => Shutdown(Model, reason)).Unwrap();
+ }
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerDispatcher.cs b/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerDispatcher.cs
new file mode 100644
index 0000000000..705532a4e0
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerDispatcher.cs
@@ -0,0 +1,87 @@
+namespace RabbitMQ.Client.Impl
+{
+ internal class AsyncConsumerDispatcher : IConsumerDispatcher
+ {
+ private readonly ModelBase model;
+ private readonly AsyncConsumerWorkService workService;
+
+ public AsyncConsumerDispatcher(ModelBase model, AsyncConsumerWorkService ws)
+ {
+ this.model = model;
+ this.workService = ws;
+ this.IsShutdown = false;
+ }
+
+ public void Quiesce()
+ {
+ IsShutdown = true;
+ }
+
+ public void Shutdown()
+ {
+ // necessary evil
+ this.workService.Stop().GetAwaiter().GetResult();
+ }
+
+ public void Shutdown(IModel model)
+ {
+ // necessary evil
+ this.workService.Stop(model).GetAwaiter().GetResult();
+ }
+
+ public bool IsShutdown
+ {
+ get;
+ private set;
+ }
+
+ public void HandleBasicConsumeOk(IBasicConsumer consumer,
+ string consumerTag)
+ {
+ ScheduleUnlessShuttingDown(new BasicConsumeOk(consumer, consumerTag));
+ }
+
+ public void HandleBasicDeliver(IBasicConsumer consumer,
+ string consumerTag,
+ ulong deliveryTag,
+ bool redelivered,
+ string exchange,
+ string routingKey,
+ IBasicProperties basicProperties,
+ byte[] body)
+ {
+ ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body));
+ }
+
+ public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)
+ {
+ ScheduleUnlessShuttingDown(new BasicCancelOk(consumer, consumerTag));
+ }
+
+ public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag)
+ {
+ ScheduleUnlessShuttingDown(new BasicCancel(consumer, consumerTag));
+ }
+
+ public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
+ {
+ // the only case where we ignore the shutdown flag.
+ new ModelShutdown(consumer,reason).Execute(model).GetAwaiter().GetResult();
+ }
+
+ private void ScheduleUnlessShuttingDown(TWork work)
+ where TWork : Work
+ {
+ if (!this.IsShutdown)
+ {
+ Schedule(work);
+ }
+ }
+
+ private void Schedule(TWork work)
+ where TWork : Work
+ {
+ this.workService.Schedule(this.model, work);
+ }
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs b/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs
new file mode 100644
index 0000000000..abf4d65215
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs
@@ -0,0 +1,104 @@
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using RabbitMQ.Client.Impl;
+
+namespace RabbitMQ.Client
+{
+ internal class AsyncConsumerWorkService : ConsumerWorkService
+ {
+ readonly ConcurrentDictionary workPools = new ConcurrentDictionary();
+
+ public void Schedule(ModelBase model, TWork work)
+ where TWork : Work
+ {
+ // two step approach is taken, as TryGetValue does not aquire locks
+ // if this fails, GetOrAdd is called, which takes a lock
+
+ WorkPool workPool;
+ if (workPools.TryGetValue(model, out workPool) == false)
+ {
+ var newWorkPool = new WorkPool(model);
+ workPool = workPools.GetOrAdd(model, newWorkPool);
+
+ // start if it's only the workpool that has been just created
+ if (newWorkPool == workPool)
+ {
+ newWorkPool.Start();
+ }
+ }
+
+ workPool.Enqueue(work);
+ }
+
+ public async Task Stop(IModel model)
+ {
+ WorkPool workPool;
+ if (workPools.TryRemove(model, out workPool))
+ {
+ await workPool.Stop().ConfigureAwait(false);
+ }
+ }
+
+ public async Task Stop()
+ {
+ foreach (var model in workPools.Keys)
+ {
+ await Stop(model).ConfigureAwait(false);
+ }
+ }
+
+ class WorkPool
+ {
+ readonly ConcurrentQueue workQueue;
+ readonly CancellationTokenSource tokenSource;
+ readonly ModelBase model;
+ TaskCompletionSource messageArrived;
+ private Task task;
+
+ public WorkPool(ModelBase model)
+ {
+ this.model = model;
+ workQueue = new ConcurrentQueue();
+ messageArrived = new TaskCompletionSource();
+ tokenSource = new CancellationTokenSource();
+ }
+
+ public void Start()
+ {
+ task = Task.Run(Loop, CancellationToken.None);
+ }
+
+ public void Enqueue(Work work)
+ {
+ workQueue.Enqueue(work);
+ messageArrived.TrySetResult(true);
+ }
+
+ async Task Loop()
+ {
+ using (tokenSource.Token.Register(() => messageArrived.TrySetResult(true)))
+ {
+ while (tokenSource.IsCancellationRequested == false)
+ {
+ Work work;
+ while (workQueue.TryDequeue(out work))
+ {
+ await work.Execute(model).ConfigureAwait(false);
+ }
+
+ await messageArrived.Task.ConfigureAwait(false);
+ messageArrived = new TaskCompletionSource();
+ }
+ }
+ }
+
+ public Task Stop()
+ {
+ tokenSource.Cancel();
+ return task;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/BasicCancel.cs b/projects/client/RabbitMQ.Client/src/client/impl/BasicCancel.cs
new file mode 100644
index 0000000000..c44da7206c
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/impl/BasicCancel.cs
@@ -0,0 +1,34 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using RabbitMQ.Client.Events;
+
+namespace RabbitMQ.Client.Impl
+{
+ sealed class BasicCancel : Work
+ {
+ readonly string consumerTag;
+
+ public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer)
+ {
+ this.consumerTag = consumerTag;
+ }
+
+ protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
+ {
+ try
+ {
+ await consumer.HandleBasicCancel(consumerTag).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ var details = new Dictionary
+ {
+ {"consumer", consumer},
+ {"context", "HandleBasicCancel"}
+ };
+ model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/BasicCancelOk.cs b/projects/client/RabbitMQ.Client/src/client/impl/BasicCancelOk.cs
new file mode 100644
index 0000000000..29aea095b3
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/impl/BasicCancelOk.cs
@@ -0,0 +1,34 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using RabbitMQ.Client.Events;
+
+namespace RabbitMQ.Client.Impl
+{
+ sealed class BasicCancelOk : Work
+ {
+ readonly string consumerTag;
+
+ public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consumer)
+ {
+ this.consumerTag = consumerTag;
+ }
+
+ protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
+ {
+ try
+ {
+ await consumer.HandleBasicCancelOk(consumerTag).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ var details = new Dictionary()
+ {
+ {"consumer", consumer},
+ {"context", "HandleBasicCancelOk"}
+ };
+ model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/BasicConsumeOk.cs b/projects/client/RabbitMQ.Client/src/client/impl/BasicConsumeOk.cs
new file mode 100644
index 0000000000..53f7b020fa
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/impl/BasicConsumeOk.cs
@@ -0,0 +1,34 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using RabbitMQ.Client.Events;
+
+namespace RabbitMQ.Client.Impl
+{
+ sealed class BasicConsumeOk : Work
+ {
+ readonly string consumerTag;
+
+ public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consumer)
+ {
+ this.consumerTag = consumerTag;
+ }
+
+ protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
+ {
+ try
+ {
+ await consumer.HandleBasicConsumeOk(consumerTag).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ var details = new Dictionary()
+ {
+ {"consumer", consumer},
+ {"context", "HandleBasicConsumeOk"}
+ };
+ model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/BasicDeliver.cs b/projects/client/RabbitMQ.Client/src/client/impl/BasicDeliver.cs
new file mode 100644
index 0000000000..cd65c3582b
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/impl/BasicDeliver.cs
@@ -0,0 +1,59 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using RabbitMQ.Client.Events;
+
+namespace RabbitMQ.Client.Impl
+{
+ sealed class BasicDeliver : Work
+ {
+ readonly string consumerTag;
+ readonly ulong deliveryTag;
+ readonly bool redelivered;
+ readonly string exchange;
+ readonly string routingKey;
+ readonly IBasicProperties basicProperties;
+ readonly byte[] body;
+
+ public BasicDeliver(IBasicConsumer consumer,
+ string consumerTag,
+ ulong deliveryTag,
+ bool redelivered,
+ string exchange,
+ string routingKey,
+ IBasicProperties basicProperties,
+ byte[] body) : base(consumer)
+ {
+ this.consumerTag = consumerTag;
+ this.deliveryTag = deliveryTag;
+ this.redelivered = redelivered;
+ this.exchange = exchange;
+ this.routingKey = routingKey;
+ this.basicProperties = basicProperties;
+ this.body = body;
+ }
+
+ protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
+ {
+ try
+ {
+ await consumer.HandleBasicDeliver(consumerTag,
+ deliveryTag,
+ redelivered,
+ exchange,
+ routingKey,
+ basicProperties,
+ body).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ var details = new Dictionary()
+ {
+ {"consumer", consumer},
+ {"context", "HandleBasicDeliver"}
+ };
+ model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs b/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs
index d02c33fb64..eebec4b879 100644
--- a/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs
+++ b/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs
@@ -120,7 +120,16 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
FrameMax = 0;
m_factory = factory;
m_frameHandler = frameHandler;
- ConsumerWorkService = new ConsumerWorkService();
+
+ var asyncConnectionFactory = factory as IAsyncConnectionFactory;
+ if (asyncConnectionFactory != null && asyncConnectionFactory.DispatchConsumersAsync)
+ {
+ ConsumerWorkService = new AsyncConsumerWorkService();
+ }
+ else
+ {
+ ConsumerWorkService = new ConsumerWorkService();
+ }
m_sessionManager = new SessionManager(this, 0);
m_session0 = new MainSession(this) { Handler = NotifyReceivedCloseOk };
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs
index 91336151d0..339296aea2 100644
--- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs
+++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs
@@ -96,7 +96,16 @@ public ModelBase(ISession session)
public ModelBase(ISession session, ConsumerWorkService workService)
{
- ConsumerDispatcher = new ConcurrentConsumerDispatcher(this, workService);
+ var asyncConsumerWorkService = workService as AsyncConsumerWorkService;
+ if (asyncConsumerWorkService != null)
+ {
+ ConsumerDispatcher = new AsyncConsumerDispatcher(this, asyncConsumerWorkService);
+ }
+ else
+ {
+ ConsumerDispatcher = new ConcurrentConsumerDispatcher(this, workService);
+ }
+
Initialise(session);
}
@@ -1159,6 +1168,17 @@ public string BasicConsume(string queue,
IDictionary arguments,
IBasicConsumer consumer)
{
+ // TODO: Replace with flag
+ var asyncDispatcher = ConsumerDispatcher as AsyncConsumerDispatcher;
+ if (asyncDispatcher != null)
+ {
+ var asyncConsumer = consumer as IAsyncBasicConsumer;
+ if (asyncConsumer == null)
+ {
+ // TODO: Friendly message
+ throw new InvalidOperationException("In the async mode you have to use an async consumer");
+ }
+ }
var k = new BasicConsumerRpcContinuation { m_consumer = consumer };
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelShutdown.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelShutdown.cs
new file mode 100644
index 0000000000..8357399c30
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelShutdown.cs
@@ -0,0 +1,34 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using RabbitMQ.Client.Events;
+
+namespace RabbitMQ.Client.Impl
+{
+ sealed class ModelShutdown : Work
+ {
+ readonly ShutdownEventArgs reason;
+
+ public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(consumer)
+ {
+ this.reason = reason;
+ }
+
+ protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
+ {
+ try
+ {
+ await consumer.HandleModelShutdown(model, reason).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ var details = new Dictionary()
+ {
+ { "consumer", consumer },
+ { "context", "HandleModelShutdown" }
+ };
+ model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs
index 7677eb6c5b..c77d35d21a 100644
--- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs
+++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs
@@ -53,6 +53,8 @@ namespace RabbitMQ.Client.Impl
{
static class TaskExtensions
{
+ public static Task CompletedTask = Task.FromResult(0);
+
public static async Task TimeoutAfter(this Task task, int millisecondsTimeout)
{
if (task == await Task.WhenAny(task, Task.Delay(millisecondsTimeout)).ConfigureAwait(false))
diff --git a/projects/client/RabbitMQ.Client/src/client/impl/Work.cs b/projects/client/RabbitMQ.Client/src/client/impl/Work.cs
new file mode 100644
index 0000000000..a04e04fff8
--- /dev/null
+++ b/projects/client/RabbitMQ.Client/src/client/impl/Work.cs
@@ -0,0 +1,29 @@
+using System;
+using System.Threading.Tasks;
+
+namespace RabbitMQ.Client.Impl
+{
+ internal abstract class Work
+ {
+ readonly IAsyncBasicConsumer asyncConsumer;
+
+ protected Work(IBasicConsumer consumer)
+ {
+ asyncConsumer = (IAsyncBasicConsumer)consumer;
+ }
+
+ public async Task Execute(ModelBase model)
+ {
+ try
+ {
+ await Execute(model, asyncConsumer).ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ // intentionally caught
+ }
+ }
+
+ protected abstract Task Execute(ModelBase model, IAsyncBasicConsumer consumer);
+ }
+}
\ No newline at end of file