Skip to content

Commit

Permalink
[FIX]Event应该注册一个,收到消息后,处理多个Handler该Event的事件
Browse files Browse the repository at this point in the history
  • Loading branch information
ojdev committed May 10, 2019
1 parent 3d5d7ff commit dff7d2b
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,32 @@

namespace RabbitMQ.EventBus.AspNetCore.Simple.Controllers
{
public class MessageBodyHandle00 : IEventHandler<MessageBody>, IDisposable
{
private Guid id;
private readonly ILogger<MessageBodyHandle> _logger;

public MessageBodyHandle00(ILogger<MessageBodyHandle> logger)
{
id = Guid.NewGuid();
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public void Dispose()
{
Console.WriteLine("释放");
}

public Task Handle(EventHandlerArgs<MessageBody> args)
{
Console.WriteLine("==================================================");
Console.WriteLine(id + "=>" + typeof(MessageBody).Name);
Console.WriteLine(args.Event.Body);
Console.WriteLine(args.Original);
Console.WriteLine(args.Redelivered);
Console.WriteLine("==================================================");
return Task.CompletedTask;
}
}
public class MessageBodyHandle : IEventHandler<MessageBody1>, IDisposable
{
private Guid id;
Expand All @@ -20,6 +46,32 @@ public void Dispose()
Console.WriteLine("释放");
}

public Task Handle(EventHandlerArgs<MessageBody1> args)
{
Console.WriteLine("==================================================");
Console.WriteLine(id + "=>" + typeof(MessageBody1).Name);
Console.WriteLine(args.Event.Body);
Console.WriteLine(args.Original);
Console.WriteLine(args.Redelivered);
Console.WriteLine("==================================================");
return Task.CompletedTask;
}
}
public class MessageBodyHandle111 : IEventHandler<MessageBody1>, IDisposable
{
private Guid id;
private readonly ILogger<MessageBodyHandle> _logger;

public MessageBodyHandle111(ILogger<MessageBodyHandle> logger)
{
id = Guid.NewGuid();
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public void Dispose()
{
Console.WriteLine("释放");
}

public Task Handle(EventHandlerArgs<MessageBody1> args)
{
Console.WriteLine("==================================================");
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.EventBus.AspNetCore.Simple/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void ConfigureServices(IServiceCollection services)
string assemblyName = typeof(Startup).GetTypeInfo().Assembly.GetName().Name;
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);

services.AddRabbitMQEventBus(() => "amqp://guest:guest@192.168.0.252:5672/", eventBusOptionAction: eventBusOption =>
services.AddRabbitMQEventBus(() => "amqp://guest:guest@192.168.0.251:5672/", eventBusOptionAction: eventBusOption =>
{
eventBusOption.ClientProvidedAssembly(assemblyName);
eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30));
Expand Down
147 changes: 75 additions & 72 deletions src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,69 +68,69 @@ public void Publish<TMessage>(TMessage message, string exchange, string routingK
_logger.WriteLog(_persistentConnection.Configuration.Level, $"{DateTimeOffset.Now.ToString("yyyy-MM-dd HH:mm:ss")}\t{exchange}\t{routingKey}\t{body}");
_eventHandlerFactory?.PubliushEvent(new EventBusArgs(_persistentConnection.Endpoint, exchange, "", routingKey, type, _persistentConnection.Configuration.ClientProvidedName, body, true));
}
public void Subscribe<TEvent, THandler>(string type = ExchangeType.Topic)
where TEvent : IEvent
where THandler : IEventHandler<TEvent>
{
Subscribe(typeof(TEvent), typeof(THandler));
#region MyRegion
/*object attribute = typeof(TEvent).GetCustomAttributes(typeof(EventBusAttribute), true).FirstOrDefault();
if (attribute is EventBusAttribute attr)
{
string queue = attr.Queue ?? $"{ attr.Exchange }.{ typeof(TEvent).Name }";
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
IModel channel;
#region snippet
try
{
channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type);
channel.QueueDeclarePassive(queue);
}
catch
{
channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type);
channel.QueueDeclare(queue: queue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
}
#endregion
channel.QueueBind(queue, attr.Exchange, attr.RoutingKey, null);
channel.BasicQos(0, 1, false);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
string body = Encoding.UTF8.GetString(ea.Body);
bool isAck = false;
try
{
await ProcessEvent<TEvent, THandler>(body);
channel.BasicAck(ea.DeliveryTag, multiple: false);
isAck = true;
}
catch (Exception ex)
{
_logger.LogError(new EventId(ex.HResult), ex, ex.Message);
}
finally
{
_logger.Information($"RabbitMQEventBus\t{DateTimeOffset.Now.ToString("yyyy-MM-dd HH:mm:ss")}\t{isAck}\t{ea.Exchange}\t{ea.RoutingKey}\t{body}");
}
};
channel.CallbackException += (sender, ex) =>
{
//public void Subscribe<TEvent, THandler>(string type = ExchangeType.Topic)
// where TEvent : IEvent
// where THandler : IEventHandler<TEvent>
//{
// //Subscribe(typeof(TEvent), typeof(THandler));
// #region MyRegion
// /*object attribute = typeof(TEvent).GetCustomAttributes(typeof(EventBusAttribute), true).FirstOrDefault();
// if (attribute is EventBusAttribute attr)
// {
// string queue = attr.Queue ?? $"{ attr.Exchange }.{ typeof(TEvent).Name }";
// if (!_persistentConnection.IsConnected)
// {
// _persistentConnection.TryConnect();
// }
// IModel channel;
// #region snippet
// try
// {
// channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type);
// channel.QueueDeclarePassive(queue);
// }
// catch
// {
// channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type);
// channel.QueueDeclare(queue: queue,
// durable: true,
// exclusive: false,
// autoDelete: false,
// arguments: null);
// }
// #endregion
// channel.QueueBind(queue, attr.Exchange, attr.RoutingKey, null);
// channel.BasicQos(0, 1, false);
// EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
// consumer.Received += async (model, ea) =>
// {
// string body = Encoding.UTF8.GetString(ea.Body);
// bool isAck = false;
// try
// {
// await ProcessEvent<TEvent, THandler>(body);
// channel.BasicAck(ea.DeliveryTag, multiple: false);
// isAck = true;
// }
// catch (Exception ex)
// {
// _logger.LogError(new EventId(ex.HResult), ex, ex.Message);
// }
// finally
// {
// _logger.Information($"RabbitMQEventBus\t{DateTimeOffset.Now.ToString("yyyy-MM-dd HH:mm:ss")}\t{isAck}\t{ea.Exchange}\t{ea.RoutingKey}\t{body}");
// }
// };
// channel.CallbackException += (sender, ex) =>
// {

};
channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
}*/
#endregion
}
// };
// channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
// }*/
// #endregion
//}

public void Subscribe(Type eventType, Type eventHandleType, string type = ExchangeType.Topic)
public void Subscribe(Type eventType, string type = ExchangeType.Topic)
{
var attributes = eventType.GetCustomAttributes(typeof(EventBusAttribute), true);
var millisecondsDelay = (int?)_persistentConnection?.Configuration?.ConsumerFailRetryInterval.TotalMilliseconds ?? 1000;
Expand Down Expand Up @@ -169,7 +169,7 @@ public void Subscribe(Type eventType, Type eventHandleType, string type = Exchan
bool isAck = false;
try
{
await ProcessEvent(body, eventType, eventHandleType, ea);
await ProcessEvent(body, eventType, ea);
channel.BasicAck(ea.DeliveryTag, multiple: false);
isAck = true;
}
Expand Down Expand Up @@ -222,21 +222,24 @@ public void Subscribe(Type eventType, Type eventHandleType, string type = Exchan
/// <param name="eventHandleType"></param>
/// <param name="args"></param>
/// <returns></returns>
private async Task ProcessEvent(string body, Type eventType, Type eventHandleType, BasicDeliverEventArgs args)
private async Task ProcessEvent(string body, Type eventType, BasicDeliverEventArgs args)
{
using (var scope = _serviceProvider.CreateScope())
{
object eventHandler = scope.ServiceProvider.GetRequiredService(eventHandleType);
if (eventHandler == null)
foreach (Type eventHandleType in typeof(IEventHandler<>).GetMakeGenericType(eventType))
{
throw new InvalidOperationException(eventHandleType.Name);
}
Type concreteType = typeof(IEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod(nameof(IEventHandler<IEvent>.Handle)).Invoke(
eventHandler,
new object[] {
object eventHandler = scope.ServiceProvider.GetRequiredService(eventHandleType);
if (eventHandler == null)
{
throw new InvalidOperationException(eventHandleType.Name);
}
Type concreteType = typeof(IEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod(nameof(IEventHandler<IEvent>.Handle)).Invoke(
eventHandler,
new object[] {
Activator.CreateInstance(typeof(EventHandlerArgs<>).MakeGenericType(eventType), new object[] { body, args.Redelivered, args.Exchange, args.RoutingKey })
});
});
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using RabbitMQ.EventBus.AspNetCore.Factories;
using RabbitMQ.EventBus.AspNetCore.Modules;
using System;
using System.Linq;

namespace Microsoft.Extensions.DependencyInjection
{
Expand Down Expand Up @@ -60,10 +61,12 @@ public static void RabbitMQEventBusAutoSubscribe(this IApplicationBuilder app)
logger.LogInformation($"=======================================================================");
foreach (Type mType in typeof(IEvent).GetAssemblies())
{
foreach (Type hType in typeof(IEventHandler<>).GetMakeGenericType(mType))
var handlesAny = typeof(IEventHandler<>).GetMakeGenericType(mType);
if (handlesAny.Any())
{
logger.LogInformation($"{mType.Name}\t=>\t{hType.Name}");
eventBus.Subscribe(mType, hType);

logger.LogInformation($"{mType.Name}\t=>\t{string.Join("", handlesAny)}");
eventBus.Subscribe(mType);
}
}
logger.LogInformation($"=======================================================================");
Expand Down
9 changes: 4 additions & 5 deletions src/RabbitMQ.EventBus.AspNetCore/IRabbitMQEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ public interface IRabbitMQEventBus
/// <typeparam name="TEvent">消息体</typeparam>
/// <typeparam name="THandler">消息处理</typeparam>
/// <param name="type">消息类型</param>
void Subscribe<TEvent, THandler>(string type = ExchangeType.Topic)
where TEvent : IEvent
where THandler : IEventHandler<TEvent>;
//void Subscribe<TEvent, THandler>(string type = ExchangeType.Topic)
// where TEvent : IEvent
// where THandler : IEventHandler<TEvent>;
/// <summary>
/// 订阅消息
/// </summary>
/// <param name="eventType">消息体</param>
/// <param name="eventHandleType">消息处理</param>
/// <param name="type">消息类型</param>
void Subscribe(Type eventType, Type eventHandleType, string type = ExchangeType.Topic);
void Subscribe(Type eventType, string type = ExchangeType.Topic);
}
}

0 comments on commit dff7d2b

Please sign in to comment.