Skip to content

Commit

Permalink
可设置队列名前缀
Browse files Browse the repository at this point in the history
  • Loading branch information
ojdev committed May 10, 2019
1 parent dff7d2b commit 1ef9798
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 92 deletions.
17 changes: 17 additions & 0 deletions src/RabbitMQ.EventBus.AspNetCore/Configurations/QueuePrefixType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace RabbitMQ.EventBus.AspNetCore.Configurations
{
/// <summary>
/// 队列名前缀
/// </summary>
public enum QueuePrefixType
{
/// <summary>
/// 交换机名
/// </summary>
ExchangeName,
/// <summary>
///
/// </summary>
ClientProvidedName
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public sealed class RabbitMQEventBusConnectionConfiguration
/// </summary>
public LogLevel Level { get; set; }
/// <summary>
/// 队列名前缀(默认交换机名)
/// </summary>
public QueuePrefixType Prefix { get; set; }
/// <summary>
///
/// </summary>
public RabbitMQEventBusConnectionConfiguration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,13 @@ public void LoggingWriteLevel(LogLevel level)
{
Configuration.Level = level;
}
/// <summary>
/// 队列名前缀
/// </summary>
/// <param name="queuePrefix"><see cref="QueuePrefixType"/></param>
public void QueuePrefix(QueuePrefixType queuePrefix = QueuePrefixType.ExchangeName)
{
Configuration.Prefix = queuePrefix;
}
}
}
86 changes: 4 additions & 82 deletions src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.EventBus.AspNetCore.Attributes;
using RabbitMQ.EventBus.AspNetCore.Configurations;
using RabbitMQ.EventBus.AspNetCore.Events;
using RabbitMQ.EventBus.AspNetCore.Factories;
using RabbitMQ.EventBus.AspNetCore.Modules;
Expand Down Expand Up @@ -68,68 +69,6 @@ public void Publish<TMessage>(TMessage message, string exchange, string routingK
_logger.WriteLog(_persistentConnection.Configuration.Level, $"{DateTimeOffset.Now.ToString("yyyy-MM-dd HH:mm:ss")}\t{exchange}\t{routingKey}\t{body}");
_eventHandlerFactory?.PubliushEvent(new EventBusArgs(_persistentConnection.Endpoint, exchange, "", routingKey, type, _persistentConnection.Configuration.ClientProvidedName, body, true));
}
//public void Subscribe<TEvent, THandler>(string type = ExchangeType.Topic)
// where TEvent : IEvent
// where THandler : IEventHandler<TEvent>
//{
// //Subscribe(typeof(TEvent), typeof(THandler));
// #region MyRegion
// /*object attribute = typeof(TEvent).GetCustomAttributes(typeof(EventBusAttribute), true).FirstOrDefault();
// if (attribute is EventBusAttribute attr)
// {
// string queue = attr.Queue ?? $"{ attr.Exchange }.{ typeof(TEvent).Name }";
// if (!_persistentConnection.IsConnected)
// {
// _persistentConnection.TryConnect();
// }
// IModel channel;
// #region snippet
// try
// {
// channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type);
// channel.QueueDeclarePassive(queue);
// }
// catch
// {
// channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type);
// channel.QueueDeclare(queue: queue,
// durable: true,
// exclusive: false,
// autoDelete: false,
// arguments: null);
// }
// #endregion
// channel.QueueBind(queue, attr.Exchange, attr.RoutingKey, null);
// channel.BasicQos(0, 1, false);
// EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
// consumer.Received += async (model, ea) =>
// {
// string body = Encoding.UTF8.GetString(ea.Body);
// bool isAck = false;
// try
// {
// await ProcessEvent<TEvent, THandler>(body);
// channel.BasicAck(ea.DeliveryTag, multiple: false);
// isAck = true;
// }
// catch (Exception ex)
// {
// _logger.LogError(new EventId(ex.HResult), ex, ex.Message);
// }
// finally
// {
// _logger.Information($"RabbitMQEventBus\t{DateTimeOffset.Now.ToString("yyyy-MM-dd HH:mm:ss")}\t{isAck}\t{ea.Exchange}\t{ea.RoutingKey}\t{body}");
// }
// };
// channel.CallbackException += (sender, ex) =>
// {

// };
// channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
// }*/
// #endregion
//}

public void Subscribe(Type eventType, string type = ExchangeType.Topic)
{
var attributes = eventType.GetCustomAttributes(typeof(EventBusAttribute), true);
Expand All @@ -138,7 +77,7 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic)
{
if (attribute is EventBusAttribute attr)
{
string queue = attr.Queue ?? $"{ attr.Exchange }.{ eventType.Name }";
string queue = attr.Queue ?? (_persistentConnection.Configuration.Prefix == QueuePrefixType.ExchangeName ? $"{ attr.Exchange }.{ eventType.Name }" : _persistentConnection.Configuration.ClientProvidedName);
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
Expand All @@ -152,8 +91,9 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic)
}
catch
{

channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type);
channel.QueueDeclare(queue: queue,
channel.QueueDeclare(queue: queue,//_persistentConnection.Configuration.ClientProvidedName
durable: true,
exclusive: false,
autoDelete: false,
Expand Down Expand Up @@ -199,24 +139,6 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic)
/// <summary>
///
/// </summary>
/// <typeparam name="TEvent"></typeparam>
/// <typeparam name="TEventHandle"></typeparam>
/// <param name="body"></param>
/// <returns></returns>
//private async Task ProcessEvent<TEvent, TEventHandle>(string body)
// where TEvent : IEvent
// where TEventHandle : IEventHandler<TEvent>
//{
// using (var scope = _serviceProvider.CreateScope())
// {
// TEventHandle eventHandler = scope.ServiceProvider.GetRequiredService<TEventHandle>();
// TEvent integrationEvent = JsonConvert.DeserializeObject<TEvent>(body);
// await eventHandler.Handle(integrationEvent/*, new MessageEventArgs(body, false)*/);
// }
//}
/// <summary>
///
/// </summary>
/// <param name="body"></param>
/// <param name="eventType"></param>
/// <param name="eventHandleType"></param>
Expand Down
10 changes: 0 additions & 10 deletions src/RabbitMQ.EventBus.AspNetCore/IRabbitMQEventBus.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using RabbitMQ.Client;
using RabbitMQ.EventBus.AspNetCore.Events;
using System;

namespace RabbitMQ.EventBus.AspNetCore
Expand All @@ -22,15 +21,6 @@ public interface IRabbitMQEventBus
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TEvent">消息体</typeparam>
/// <typeparam name="THandler">消息处理</typeparam>
/// <param name="type">消息类型</param>
//void Subscribe<TEvent, THandler>(string type = ExchangeType.Topic)
// where TEvent : IEvent
// where THandler : IEventHandler<TEvent>;
/// <summary>
/// 订阅消息
/// </summary>
/// <param name="eventType">消息体</param>
/// <param name="type">消息类型</param>
void Subscribe(Type eventType, string type = ExchangeType.Topic);
Expand Down

0 comments on commit 1ef9798

Please sign in to comment.