This repo is no longer supported and will not receive any updates nor bug fixes, This code has been moved to the main Liquid repository and will be maintained there.
This repository is part of the Liquid Application Framework, a modern Dotnet Core Application Framework for building cloud native microservices.
The main repository contains the examples and documentation on how to use Liquid.
This package contains the messaging subsystem of Liquid, for interacting with assynchronous services. It also features several implementations. In order to use it, add the main package (Liquid.Messaging) to your project, along with the specific implementation that you will need.
Available Cartridges | Badges |
---|---|
Liquid.Messaging.Aws | |
Liquid.Messaging.ServiceBus | |
Liquid.Messaging.Gcp | |
Liquid.Messaging.Kafka | |
Liquid.Messaging.RabbitMq |
Register a producer in service provider
//the value of the parameter must be the name of the appsettings section where the configuration of your producer is defined.
services.AddServiceBusProducer<SampleMessageEntity>("Liquid:Messaging:ServiceBus:SampleProducer");
Include dependency in domain class constructor, and invoke this send method
using Liquid.Messaging;
public class MySampleProducer
{
//Dependency must be a ILiquidProducer implementation
private ILiquidProducer<MySampleMessage> _producer;
public MySampleProducer(ILiquidProducer<MySampleMessage> producer)
{
_producer = producer;
}
public async Task Handle()
{
// Create a instance of message object defined as the type of the producer
var message = new MySampleMessage();
// Just invoke the Send method
await _producer.SendMessageAsync(message, new Dictionary<string, object> { { "headerTest", "value" } });
}
}
public class Worker : ILiquidWorker<SampleMessageEntity>
{
private readonly IMediator _mediator;
public Worker(IMediator mediator)
{
_mediator = mediator;
}
//the ProcessMessageAsync method is invoked when a message arrives
public async Task ProcessMessageAsync(ProcessMessageEventArgs<SampleMessageEntity> args, CancellationToken cancellationToken)
{
// Do what you need with the message, usually translate to a command and send it to the Mediator Service
await _mediator.Send(new PutCommandRequest(args.Data));
}
}
The dependency injection method register the LiquidBackgroundService{TMessage}, that is the Liquid implementation of IHostedService and work as the host of Consumer.
//the value of the first parameter must be the name of the appsettings section where the configuration of your consumer
//is defined, and second parameter is an array of assemblies where domain handlers are defined.
services.AddLiquidServiceBusConsumer<Worker, SampleMessageEntity>("Liquid:Messaging:ServiceBus:SampleConsumer", typeof(PutCommandRequest).Assembly);
"liquid": {
"messaging": {
"serviceBus": {
"sampleProducer": {
"ConnectionString": "",
"EntityPath": "sampleentity"
},
"sampleConsumer": {
"ConnectionString": "",
"EntityPath": "samplemessage"
}
}
}
}