Skip to content

june-it/MyStack.DistributedMessage4RabbitMQ

Repository files navigation

MyStack.DistributedMessage4RabbitMQ

开源的轻量级消息总线类库(RabbitMQ),支持发布/订阅、RPC

nuget stats
nuget stats

安装MyStack.DistributedMessage4RabbitMQ

可以通过NuGet安装:

Install-Package MyStack.DistributedMessage4RabbitMQ

开始使用

添加服务支持

services.AddDistributedMessage4RabbitMQ(configure =>
{
    configure.HostName = "localhost";
    configure.VirtualHost = "/";
    configure.Port = 5672;
    configure.UserName = "admin";
    configure.Password = "admin";
    configure.QueueOptions.Name = "MyStack";
    configure.ExchangeOptions.Name = "MyStack";
    configure.ExchangeOptions.ExchangeType = "topic";
    configure.RPCTimeout = 2000;
},
Assembly.GetExecutingAssembly());

1、事件订阅

定义事件

public class HelloMessage : IDistributedEvent
{
    public string Message { get; set; }
}

or

[MessageName("HelloMessage")]
public class HelloMessage : IDistributedEvent
{
    public string Message { get; set; }
}

订阅事件

  public class HelloMessageHandler : IDistributedEventHandler<HelloMessage>
    {
        public async Task HandleAsync(HelloMessage message, CancellationToken cancellationToken)
        {
            Console.WriteLine("Hello");
            await Task.CompletedTask;
        }
    }

发布事件

await messageBus.PublishAsync(new HelloMessage() { Message = "Hello" });

2、事件体订阅

定义事件数据

public class WrappedData 
{
    public string Message { get; set; }
}

订阅事件

public class DistributedEventWrapperHandler : IDistributedEventHandler<DistributedEventWrapper<WrappedData>>
{
    public async Task HandleAsync(DistributedEventWrapper<WrappedData> eventData, CancellationToken cancellationToken = default)
    {
        await Task.CompletedTask;
        Console.WriteLine("DistributedEventWrapper");
    }
}

发布事件

await messageBus.PublishAsync(new WrappedData());

3、自定义键值订阅

定义事件数据

public class SubscribeData
{
}

订阅事件

[Subscribe("ABC")]
public class SubscribeDataHandler : IDistributedEventHandler
{
    public async Task HandleAsync(object eventData, CancellationToken cancellationToken = default)
    {
        Console.WriteLine("SubscribeData");
        await Task.CompletedTask;
    }
}

发布事件

await messageBus.PublishAsync("ABC",new SubscribeData());

4、RPC请求

定义请求

public class Ping : IRpcRequest<Pong>
{
    public string SendBy { get; set; }
}

定义响应

 public class Pong
 {
     public string ReplyBy { get; set; }
 }

订阅消息

  public class PingHandler : IRpcRequestHandler<Ping, Pong>
  {

      public Task<Pong> HandleAsync(Ping message, CancellationToken cancellationToken = default)
      {
          Console.WriteLine("Ping");
          return Task.FromResult(new Pong() { ReplyBy = "B" });
      }
  }

发布消息

var pongMessage = messageBus.SendAsync(ping);

许可证

MIT

Releases

No releases published

Packages

No packages published

Languages