diff --git a/src/RabbitMQ.EventBus.AspNetCore/Configurations/QueuePrefixType.cs b/src/RabbitMQ.EventBus.AspNetCore/Configurations/QueuePrefixType.cs new file mode 100644 index 0000000..ee607e4 --- /dev/null +++ b/src/RabbitMQ.EventBus.AspNetCore/Configurations/QueuePrefixType.cs @@ -0,0 +1,17 @@ +namespace RabbitMQ.EventBus.AspNetCore.Configurations +{ + /// + /// 队列名前缀 + /// + public enum QueuePrefixType + { + /// + /// 交换机名 + /// + ExchangeName, + /// + /// + /// + ClientProvidedName + } +} diff --git a/src/RabbitMQ.EventBus.AspNetCore/Configurations/RabbitMQEventBusConnectionConfiguration.cs b/src/RabbitMQ.EventBus.AspNetCore/Configurations/RabbitMQEventBusConnectionConfiguration.cs index 77724f3..3b9fe9f 100644 --- a/src/RabbitMQ.EventBus.AspNetCore/Configurations/RabbitMQEventBusConnectionConfiguration.cs +++ b/src/RabbitMQ.EventBus.AspNetCore/Configurations/RabbitMQEventBusConnectionConfiguration.cs @@ -33,6 +33,10 @@ public sealed class RabbitMQEventBusConnectionConfiguration /// public LogLevel Level { get; set; } /// + /// 队列名前缀(默认交换机名) + /// + public QueuePrefixType Prefix { get; set; } + /// /// /// public RabbitMQEventBusConnectionConfiguration() diff --git a/src/RabbitMQ.EventBus.AspNetCore/Configurations/RabbitMQEventBusConnectionConfigurationBuild.cs b/src/RabbitMQ.EventBus.AspNetCore/Configurations/RabbitMQEventBusConnectionConfigurationBuild.cs index dbaa5c4..5e25944 100644 --- a/src/RabbitMQ.EventBus.AspNetCore/Configurations/RabbitMQEventBusConnectionConfigurationBuild.cs +++ b/src/RabbitMQ.EventBus.AspNetCore/Configurations/RabbitMQEventBusConnectionConfigurationBuild.cs @@ -54,5 +54,13 @@ public void LoggingWriteLevel(LogLevel level) { Configuration.Level = level; } + /// + /// 队列名前缀 + /// + /// + public void QueuePrefix(QueuePrefixType queuePrefix = QueuePrefixType.ExchangeName) + { + Configuration.Prefix = queuePrefix; + } } } diff --git a/src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBus.cs b/src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBus.cs index e7af834..10febab 100644 --- a/src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBus.cs +++ b/src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBus.cs @@ -4,6 +4,7 @@ using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.EventBus.AspNetCore.Attributes; +using RabbitMQ.EventBus.AspNetCore.Configurations; using RabbitMQ.EventBus.AspNetCore.Events; using RabbitMQ.EventBus.AspNetCore.Factories; using RabbitMQ.EventBus.AspNetCore.Modules; @@ -68,68 +69,6 @@ public void Publish(TMessage message, string exchange, string routingK _logger.WriteLog(_persistentConnection.Configuration.Level, $"{DateTimeOffset.Now.ToString("yyyy-MM-dd HH:mm:ss")}\t{exchange}\t{routingKey}\t{body}"); _eventHandlerFactory?.PubliushEvent(new EventBusArgs(_persistentConnection.Endpoint, exchange, "", routingKey, type, _persistentConnection.Configuration.ClientProvidedName, body, true)); } - //public void Subscribe(string type = ExchangeType.Topic) - // where TEvent : IEvent - // where THandler : IEventHandler - //{ - // //Subscribe(typeof(TEvent), typeof(THandler)); - // #region MyRegion - // /*object attribute = typeof(TEvent).GetCustomAttributes(typeof(EventBusAttribute), true).FirstOrDefault(); - // if (attribute is EventBusAttribute attr) - // { - // string queue = attr.Queue ?? $"{ attr.Exchange }.{ typeof(TEvent).Name }"; - // if (!_persistentConnection.IsConnected) - // { - // _persistentConnection.TryConnect(); - // } - // IModel channel; - // #region snippet - // try - // { - // channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type); - // channel.QueueDeclarePassive(queue); - // } - // catch - // { - // channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type); - // channel.QueueDeclare(queue: queue, - // durable: true, - // exclusive: false, - // autoDelete: false, - // arguments: null); - // } - // #endregion - // channel.QueueBind(queue, attr.Exchange, attr.RoutingKey, null); - // channel.BasicQos(0, 1, false); - // EventingBasicConsumer consumer = new EventingBasicConsumer(channel); - // consumer.Received += async (model, ea) => - // { - // string body = Encoding.UTF8.GetString(ea.Body); - // bool isAck = false; - // try - // { - // await ProcessEvent(body); - // channel.BasicAck(ea.DeliveryTag, multiple: false); - // isAck = true; - // } - // catch (Exception ex) - // { - // _logger.LogError(new EventId(ex.HResult), ex, ex.Message); - // } - // finally - // { - // _logger.Information($"RabbitMQEventBus\t{DateTimeOffset.Now.ToString("yyyy-MM-dd HH:mm:ss")}\t{isAck}\t{ea.Exchange}\t{ea.RoutingKey}\t{body}"); - // } - // }; - // channel.CallbackException += (sender, ex) => - // { - - // }; - // channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer); - // }*/ - // #endregion - //} - public void Subscribe(Type eventType, string type = ExchangeType.Topic) { var attributes = eventType.GetCustomAttributes(typeof(EventBusAttribute), true); @@ -138,7 +77,7 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic) { if (attribute is EventBusAttribute attr) { - string queue = attr.Queue ?? $"{ attr.Exchange }.{ eventType.Name }"; + string queue = attr.Queue ?? (_persistentConnection.Configuration.Prefix == QueuePrefixType.ExchangeName ? $"{ attr.Exchange }.{ eventType.Name }" : _persistentConnection.Configuration.ClientProvidedName); if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); @@ -152,8 +91,9 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic) } catch { + channel = _persistentConnection.ExchangeDeclare(exchange: attr.Exchange, type: type); - channel.QueueDeclare(queue: queue, + channel.QueueDeclare(queue: queue,//_persistentConnection.Configuration.ClientProvidedName durable: true, exclusive: false, autoDelete: false, @@ -199,24 +139,6 @@ public void Subscribe(Type eventType, string type = ExchangeType.Topic) /// /// /// - /// - /// - /// - /// - //private async Task ProcessEvent(string body) - // where TEvent : IEvent - // where TEventHandle : IEventHandler - //{ - // using (var scope = _serviceProvider.CreateScope()) - // { - // TEventHandle eventHandler = scope.ServiceProvider.GetRequiredService(); - // TEvent integrationEvent = JsonConvert.DeserializeObject(body); - // await eventHandler.Handle(integrationEvent/*, new MessageEventArgs(body, false)*/); - // } - //} - /// - /// - /// /// /// /// diff --git a/src/RabbitMQ.EventBus.AspNetCore/IRabbitMQEventBus.cs b/src/RabbitMQ.EventBus.AspNetCore/IRabbitMQEventBus.cs index 73a6261..d3f8853 100644 --- a/src/RabbitMQ.EventBus.AspNetCore/IRabbitMQEventBus.cs +++ b/src/RabbitMQ.EventBus.AspNetCore/IRabbitMQEventBus.cs @@ -1,5 +1,4 @@ using RabbitMQ.Client; -using RabbitMQ.EventBus.AspNetCore.Events; using System; namespace RabbitMQ.EventBus.AspNetCore @@ -22,15 +21,6 @@ public interface IRabbitMQEventBus /// /// 订阅消息 /// - /// 消息体 - /// 消息处理 - /// 消息类型 - //void Subscribe(string type = ExchangeType.Topic) - // where TEvent : IEvent - // where THandler : IEventHandler; - /// - /// 订阅消息 - /// /// 消息体 /// 消息类型 void Subscribe(Type eventType, string type = ExchangeType.Topic);