Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce async consumer dispatcher #307

Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
using System;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;

namespace RabbitMQ.Client
{
public class AsyncDefaultBasicConsumer : IBasicConsumer, IAsyncBasicConsumer
{
/// <summary>
/// Creates a new instance of an <see cref="AsyncDefaultBasicConsumer"/>.
/// </summary>
public AsyncDefaultBasicConsumer()
{
ShutdownReason = null;
Model = null;
IsRunning = false;
ConsumerTag = null;
}

/// <summary>
/// Constructor which sets the Model property to the given value.
/// </summary>
/// <param name="model">Common AMQP model.</param>
public AsyncDefaultBasicConsumer(IModel model)
{
ShutdownReason = null;
IsRunning = false;
ConsumerTag = null;
Model = model;
}

/// <summary>
/// Retrieve the consumer tag this consumer is registered as; to be used when discussing this consumer
/// with the server, for instance with <see cref="IModel.BasicCancel"/>.
/// </summary>
public string ConsumerTag { get; protected set; }

/// <summary>
/// Returns true while the consumer is registered and expecting deliveries from the broker.
/// </summary>
public bool IsRunning { get; protected set; }

/// <summary>
/// If our <see cref="IModel"/> shuts down, this property will contain a description of the reason for the
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
/// </summary>
public ShutdownEventArgs ShutdownReason { get; protected set; }

/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
public IModel Model { get; protected set; }

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
/// e.g. the queue has been deleted (either by this channel or by any other channel).
/// See <see cref="HandleBasicCancelOk"/> for notification of consumer cancellation due to basicCancel
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicCancel(string consumerTag)
{
IsRunning = false;
return TaskExtensions.CompletedTask;
}

/// <summary>
/// Called upon successful deregistration of the consumer from the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicCancelOk(string consumerTag)
{
IsRunning = false;
return TaskExtensions.CompletedTask;
}

/// <summary>
/// Called upon successful registration of the consumer with the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicConsumeOk(string consumerTag)
{
ConsumerTag = consumerTag;
IsRunning = true;
return TaskExtensions.CompletedTask;
}

/// <summary>
/// Called each time a message arrives for this consumer.
/// </summary>
/// <remarks>
/// Does nothing with the passed in information.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
/// The implementation of this method in this class does NOT acknowledge such messages.
/// </remarks>
public virtual Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
{
return TaskExtensions.CompletedTask;
}

/// <summary>
/// Called when the model shuts down.
/// </summary>
/// <param name="model"> Common AMQP model.</param>
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
public virtual Task HandleModelShutdown(object model, ShutdownEventArgs reason)
{
ShutdownReason = reason;
return TaskExtensions.CompletedTask;
}

event EventHandler<ConsumerEventArgs> IBasicConsumer.ConsumerCancelled
{
add { throw new InvalidOperationException("Should never be called."); }
remove { throw new InvalidOperationException("Should never be called."); }
}

void IBasicConsumer.HandleBasicCancelOk(string consumerTag)
{
throw new InvalidOperationException("Should never be called.");
}

void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
{
throw new InvalidOperationException("Should never be called.");
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
{
throw new InvalidOperationException("Should never be called.");
}

void IBasicConsumer.HandleModelShutdown(object model, ShutdownEventArgs reason)
{
throw new InvalidOperationException("Should never be called.");
}

void IBasicConsumer.HandleBasicCancel(string consumerTag)
{
throw new InvalidOperationException("Should never be called.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ namespace RabbitMQ.Client
///"amqp://foo/" (note the trailling slash) also represent the
///default virtual host. The latter issue means that virtual
///hosts with an empty name are not addressable. </para></remarks>
public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
public class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionFactory
{
/// <summary>
/// Default value for the desired maximum channel number, with zero meaning unlimited (value: 0).
Expand Down Expand Up @@ -158,6 +158,12 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
/// </summary>
public bool AutomaticRecoveryEnabled { get; set; } = true;

/// <summary>
/// Set to true will enable a asynchronous consumer dispatcher which is compatible with <see cref="IAsyncBasicConsumer"/>.
/// Defaults to false.
/// </summary>
public bool DispatchConsumersAsync { get; set; } = false;

/// <summary>The host to connect to.</summary>
public string HostName { get; set; } = "localhost";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System.Threading.Tasks;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
{
public interface IAsyncBasicConsumer
{
/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
IModel Model { get; }

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
/// e.g. the queue has been deleted (either by this channel or by any other channel).
/// See <see cref="HandleBasicCancelOk"/> for notification of consumer cancellation due to basicCancel
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicCancel(string consumerTag);

/// <summary>
/// Called upon successful deregistration of the consumer from the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicCancelOk(string consumerTag);

/// <summary>
/// Called upon successful registration of the consumer with the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicConsumeOk(string consumerTag);

/// <summary>
/// Called each time a message arrives for this consumer.
/// </summary>
/// <remarks>
/// Does nothing with the passed in information.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
/// The implementation of this method in this class does NOT acknowledge such messages.
/// </remarks>
Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body);

/// <summary>
/// Called when the model shuts down.
/// </summary>
/// <param name="model"> Common AMQP model.</param>
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
Task HandleModelShutdown(object model, ShutdownEventArgs reason);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace RabbitMQ.Client
{
internal interface IAsyncConnectionFactory : IConnectionFactory
{
bool DispatchConsumersAsync { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
using System;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Events
{
public delegate Task AsyncEventHandler<in TEvent>(object sender, TEvent @event) where TEvent : EventArgs;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.Threading.Tasks;
using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;

namespace RabbitMQ.Client.Events
{
public class AsyncEventingBasicConsumer : AsyncDefaultBasicConsumer
{
///<summary>Constructor which sets the Model property to the
///given value.</summary>
public AsyncEventingBasicConsumer(IModel model) : base(model)
{
Received = (model1, args) => TaskExtensions.CompletedTask;
Registered = (model1, args) => TaskExtensions.CompletedTask;
Unregistered = (model1, args) => TaskExtensions.CompletedTask;
Shutdown = (model1, args) => TaskExtensions.CompletedTask;
}

public Func<IModel, BasicDeliverEventArgs, Task> Received { get; set; }

public Func<IModel, ConsumerEventArgs, Task> Registered { get; set; }
public Func<IModel, ConsumerEventArgs, Task> Unregistered { get; set; }
public Func<IModel, ShutdownEventArgs, Task> Shutdown { get; set; }

public override Task HandleBasicCancelOk(string consumerTag)
{
return base.HandleBasicCancelOk(consumerTag).ContinueWith(t => Unregistered(Model, new ConsumerEventArgs(consumerTag))).Unwrap();
}

public override Task HandleBasicConsumeOk(string consumerTag)
{
return base.HandleBasicCancelOk(consumerTag).ContinueWith(t => Registered(Model, new ConsumerEventArgs(consumerTag))).Unwrap();
}

public override Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
{
return base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
.ContinueWith(t => Received(Model, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)))
.Unwrap();
}

public override Task HandleModelShutdown(object model, ShutdownEventArgs reason)
{
return base.HandleModelShutdown(model, reason).ContinueWith(t => Shutdown(Model, reason)).Unwrap();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
namespace RabbitMQ.Client.Impl
{
internal class AsyncConsumerDispatcher : IConsumerDispatcher
{
private readonly ModelBase model;
private readonly AsyncConsumerWorkService workService;

public AsyncConsumerDispatcher(ModelBase model, AsyncConsumerWorkService ws)
{
this.model = model;
this.workService = ws;
this.IsShutdown = false;
}

public void Quiesce()
{
IsShutdown = true;
}

public void Shutdown()
{
// necessary evil
this.workService.Stop().GetAwaiter().GetResult();
}

public void Shutdown(IModel model)
{
// necessary evil
this.workService.Stop(model).GetAwaiter().GetResult();
}

public bool IsShutdown
{
get;
private set;
}

public void HandleBasicConsumeOk(IBasicConsumer consumer,
string consumerTag)
{
ScheduleUnlessShuttingDown(new BasicConsumeOk(consumer, consumerTag));
}

public void HandleBasicDeliver(IBasicConsumer consumer,
string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties basicProperties,
byte[] body)
{
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body));
}

public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)
{
ScheduleUnlessShuttingDown(new BasicCancelOk(consumer, consumerTag));
}

public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag)
{
ScheduleUnlessShuttingDown(new BasicCancel(consumer, consumerTag));
}

public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
{
// the only case where we ignore the shutdown flag.
new ModelShutdown(consumer,reason).Execute(model).GetAwaiter().GetResult();
}

private void ScheduleUnlessShuttingDown<TWork>(TWork work)
where TWork : Work
{
if (!this.IsShutdown)
{
Schedule(work);
}
}

private void Schedule<TWork>(TWork work)
where TWork : Work
{
this.workService.Schedule(this.model, work);
}
}
}
Loading