Aggregator contains some fundamental base classes and interfaces for building a CQRS/ES based application.
Supports .NET Standard 2.0 and 2.1 (.NET Core 2.2 and 3.1).
The main package:
PM> Install-Package Aggregator
EventStore persistence integration package:
PM> Install-Package Aggregator.Persistence.EventStore
Microsoft DI integration package:
PM> Install-Package Aggregator.Microsoft.DependencyInjection
Autofac DI integration package:
PM> Install-Package Aggregator.Autofac
Testing package:
PM> Install-Package Aggregator.Testing
Registering the dependencies in an ASP.NET Core application, using Microsoft.Extensions.DependencyInjection, is pretty simple:
- Install the Aggregator.Microsoft.DependencyInjection package
- Call
app.AddAggregator();
inside theConfigure
method in Startup.cs
If you prefer using Autofac, you can use the Aggregator.Autofac package which contains an Autofac module to ease the registration process.
Need support for a different container? Feel free to open an issue.
The AggregateRoot
or AggregateRoot<TEventBase>
class is an abstract base class that should be used as a base for aggregate roots. It allows registering event handlers, initializing the aggregate by replaying events and keeping track of changes getting applied to the aggregate root.
sealed class User : AggregateRoot
{
private string _givenName;
private string _surName;
public User()
{
Register<CreatedUserEvent>(OnCreated);
Register<UpdatedUserGivenNameEvent>(OnGivenNameUpdated);
// ...
}
// When having multiple constructors, always call the one that registers the event handlers
private User(CreatedUserEvent @event) : this()
{
Apply(@event);
}
public static User Create(string givenName, string surname)
=> new User(new CreatedUserEvent
{
GivenName = givenName,
Surname = surname,
// ...
});
public void SetGivenName(string givenName)
{
if (_givenName.Equals(givenName)) return;
Apply(new UpdatedUserGivenNameEvent
{
GivenName = UpdatedInfo.From(_givenName).To(givenName),
Surname = _surname,
// ...
});
}
private void OnCreated(CreatedUserEvent @event)
{
_givenName = @event.GivenName;
_surname = @event.Surname;
}
private void OnGivenNameUpdated(UpdatedUserGivenNameEvent @event)
{
_givenName = @event.GivenName.NewValue;
}
// ...
}
The generic Repository<TAggregateRoot>
class is responsible for creating new aggregate roots, loading aggregate roots from the event store and keeping track of changes applied to new or loaded aggregate roots.
This generic repository is registered as a scoped instance (when using one of the DI integration packages) as an instance should only exist for the lifetime of a single command being processed.
Commands and events can be defined as simple POCO's. All Aggregator related classes have a generic variant which allows you to define a base type for commands and events for those who like type safety.
Example command:
class CreateUserCommand
{
public string Id { get; set; }
public string EmailAddress { get; set; }
public string GivenName { get; set; }
public string Surname { get; set; }
}
Example event:
class UserCreatedEvent
{
public string Id { get; set; }
public string EmailAddress { get; set; }
public string GivenName { get; set; }
public string Surname { get; set; }
public DateTimeOffset CreatedAtUtc { get; set; }
}
The library contains a generic interface definition ICommandHandler<TCommand>
that identifies command handlers.
class CreateUserCommandHandler : ICommandHandler<CreateUserCommand>
{
public override Task Handle(CreateUserCommand command, CancellationToken cancellationToken)
{
// Process the command...
}
}
Command handlers can be registered one-by-one or in one go using Scrutor for Microsoft.Extensions.DependencyInjection or AsClosedTypesOf
when using Autofac.
It's a good practice to validate commands before executing them. A great way to do this is to create a base class that uses FluentValidation.
abstract class SafeCommandHandler<TCommand>
: AbstractValidator<TCommand>
, ICommandHandler<TCommand>
{
public async Task Handle(TCommand command, CancellationToken cancellationToken)
{
DefineRules();
this.ValidateAndThrow(command);
await HandleValidatedCommand(command, cancellationToken).ConfigureAwait(false);
}
protected abstract void DefineRules();
protected abstract Task HandleValidatedCommand(TCommand command, CancellationToken cancellationToken);
}
Since most command handlers will manipulate an aggregate root, you can also create a base class that requires a repository.
abstract class PersistentCommandHandler<TCommand, TAggregateRoot>
: SafeCommandHandler<TCommand>
where TAggregateRoot : AggregateRoot, new()
{
protected readonly IRepository<TAggregateRoot> Repository;
protected PersistentCommandHandler(IRepository<TAggregateRoot> repository)
{
Repository = repository;
}
}
Usage:
sealed class UpdateTodoTitleCommandHandler
: PersistentCommandHandler<UpdateTodoTitleCommand, TodoAggregateRoot>
{
public UpdateTodoTitleCommandHandler(IRepository<TodoAggregateRoot> repository)
: base(repository)
{
}
protected override void DefineRules()
{
RuleFor(x => x.Id).NotEmpty();
RuleFor(x => x.Title).NotEmpty();
}
protected override async Task HandleValidatedCommand(UpdateTodoTitleCommand command, CancellationToken cancellationToken)
{
var aggregateRoot = await Repository.Get(command.Id.ToString());
aggregateRoot.UpdateTitle(command.Title);
}
}
The CommandProcessor
or CommandProcessor<TIdentifier, TCommandBase, TEventBase>
class is responsible for processing commands, which consists of these steps:
- Requests a new
CommandHandlingContext
from the DI container which will maintain the internal unit-of-work - Execute one or more command handlers that implement the
ICommandHandler<TCommand>
interface - Store events that were tracked by the unit-of-work
- Dispatch events that were tracked by the unit-of-work in the
CommandHandlingContext
The implementation of the IServiceScopeFactory
interface is responsible for creating a temporary child scope from which the CommandHandlingContext
, command handlers and event handlers are resolved.
The child scope, which implements IServiceScope
is only valid for the lifetime of a single command or event being processed/dispatched.
These dedicated interfaces are DI independent. There's an integration NuGet package available for Microsoft.Extensions.DI (Aggregator.Microsoft.DependencyInjection
) and Autofac (Aggregator.Autofac
) that can be used out of the box or serve as an example.
The CommandHandlingContext
is resolved from the IServiceScope
instance by the CommandProcessor
for the lifetime of one single command being processed. It's a property bag that can be used to store and retrieve properties during the processing of a single command. Internally, this context is also used to store the unit-of-work that is required by the Repository
class to keep track of changes (events) generated by aggregate root entities.
The CommandProcessor
depends on an implementation of IEventStore<TIdentifier, TEventBase>
which will be used to store one or more events that were generated during the processing of a single command in a transactional manner. When something goes wrong while storing (and dispatching) the event(s), the complete transaction will get rolled back and the exception will bubble up to the caller.
There's an integration NuGet package available for using EventStore (Aggregator.Persistence.EventStore
) that can be used or serve as an example for a custom event store.
The CommandProcessor
also depends on an implementation of IEventDispatcher<TEventBase>
which will be used to dispatch one or more events that were generated during the processing of a single command inside the command domain. The implementation of the IEventDispatcher<TEventBase>
interface is responsible for forwarding events to classes that implement the IEventHandler<TEvent>
interface inside the command domain. A typical example of classes that listen to one or more events are Process Managers (sometimes referred to as Sagas) that act on events, keep track of some kind of long running state and send out commands depending on that state. Since the work and state of process managers is important, the CommandProcessor
will also rollback the event store transaction in case something goes wrong during event dispatching.
EventDispatcher<TEventBase>
is a default implementation that can be used for dispatching events.
The package Aggregator.Testing can be used to write unit-tests against your aggregate roots.
Scenario
.ForConstructor(() => User.Create("John", "Doe"))
.Then(new CreatedUserEvent
{
GivenName = "John",
Surname = "Doe",
})
.Assert();
Scenario
.ForCommand(User.Factory)
.Given(
new CreatedUserEvent
{
GivenName = "John",
Surname = "Doe",
})
.When(user => user.SetGivenName("Jon"))
.Then(
new UpdatedUserGivenNameEvent
{
GivenName = UpdatedInfo.From("John").To("Jon"),
Surname = "Doe",
})
.Assert();
Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.
This project is licensed under the MIT License - see the LICENSE.txt file for detail