-
Notifications
You must be signed in to change notification settings - Fork 0
Home
- Introduction
- Core Concepts
- Architecture Overview
- Getting Started
- Framework Components
- Implementation Guide
- Advanced Features
- Future Extensions
- Best Practices
- FAQ
SourceFlow.Net is a modern, lightweight, and extensible .NET framework designed for building scalable event-sourced applications using Domain-Driven Design (DDD) principles and Command Query Responsibility Segregation (CQRS) patterns. Built from the ground up for .NET 8+ with performance and developer experience as core priorities.
SourceFlow.Net provides a complete toolkit for event sourcing, domain modeling, and command/query separation, enabling developers to build maintainable, scalable applications with a strong foundation in proven architectural patterns.
Event Sourcing is an architectural pattern where the state of an application is determined by a sequence of events. Instead of storing the current state directly, the system stores all the events that have occurred, allowing for complete state reconstruction at any point in time.
- Complete Audit Trail: Every change is recorded as an immutable event
- Time Travel: Reconstruct system state at any point in history
- Debugging: Full visibility into how the system reached its current state
- Scalability: Events can be replayed to build multiple read models
// Events are immutable records of what happened
public class AccountCreated : Event<BankAccount>
{
public AccountCreated(BankAccount payload) : base(payload) { }
}
public class MoneyDeposited : Event<BankAccount>
{
public MoneyDeposited(BankAccount payload) : base(payload) { }
}
Domain-Driven Design is a software design approach that focuses on modeling software to match the business domain. It emphasizes collaboration between technical and domain experts to create a shared understanding of the problem space.
Aggregate Roots: Domain objects that ensure consistency and encapsulate business logic
public class AccountAggregate : Aggregate<BankAccount>
{
public void CreateAccount(int accountId, string holder, decimal amount)
{
// Business logic validation
Send(new CreateAccount(new Payload
{
Id = accountId,
AccountName = holder,
InitialAmount = amount
}));
}
}
Entities: Objects with unique identity
public class BankAccount : IEntity
{
public int Id { get; set; }
public string AccountName { get; set; }
public decimal Balance { get; set; }
public bool IsClosed { get; set; }
}
Value Objects: Immutable objects representing descriptive aspects Domain Events: Meaningful occurrences within the domain
CQRS separates read and write operations, allowing for optimized data models for different purposes. Commands change state, while queries retrieve data.
- Independent Scaling: Read and write sides can scale independently
- Optimized Models: Different models for different use cases
- Simplified Queries: Read models can be denormalized for performance
- Security: Fine-grained access control
Commands: Represent intent to change state
public class CreateAccount : Command<Payload>
{
public CreateAccount(Payload payload) : base(payload) { }
}
Queries: Handled through optimized view models
public class AccountViewModel : IViewModel
{
public int Id { get; set; }
public string AccountName { get; set; }
public decimal CurrentBalance { get; set; }
public DateTime LastUpdated { get; set; }
}
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ Aggregate │ │ Sagas │ │ Projections │
│ │ │ │ │ │
│ ┌─────────────────┐ │ │ ┌─────────────────┐ │ │ ┌─────────────────┐ │
│ │ AccountAggregate│ │ │ │ AccountSaga │ │ │ │ AccountView │ │
│ └─────────────────┘ │ │ └─────────────────┘ │ │ └─────────────────┘ │
└─────────────────────┘ <-|└─────────────────────┘ |->└─────────────────────┘
│ Events │ Events │
Commands |____ Events ____│ ViewData
▼ | ▼ │ ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ Command Bus │ │ Event Queue │ │ View Provider │
│ │ │ │ │ │
│ ┌─────────────────┐ │ │ ┌─────────────────┐ │ │ ┌─────────────────┐ │
│ │ CommandPublisher│ │ │ │ EventQueue │ │ │ │ InMemoryProvider│ │
│ │ CommandReplayer │ │ │ │ Dispatchers │ │ │ │ │ │
│ └─────────────────┘ │ │ └─────────────────┘ │ │ └─────────────────┘ │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
│ │--------------------Commands-------------------
Commands │
▼ |--Entities ▼
┌─────────────────────┐ | ┌─────────────────────┐ ┌─────────────────────┐
│ Sagas │ | │ Repository │ │ Command Store │
│ │ | │ │ │ │
│ ┌─────────────────┐ │ ->│ ┌─────────────────┐ │ │ ┌─────────────────┐ │
│ │ AccountSaga │ │ │ │InMemoryRepository││ │ │InMemoryCommStore│ │
│ └─────────────────┘ │ │ └─────────────────┘ │ │ └─────────────────┘ │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
- Services provide the application API and orchestrate business operations
- Aggregates encapsulate business logic and send commands
- Command Bus routes commands to appropriate sagas
- Sagas handle commands and maintain consistency across aggregates
- Event Queue publishes events to subscribers
- Projections update read models based on events
- Repository provides persistence for entities
- Command Store persists commands for replay capability
# Install the core package
dotnet add package SourceFlow.Net
# For SQL Server support (when available)
dotnet add package SourceFlow.Net.SqlServer
// Program.cs
using SourceFlow;
var services = new ServiceCollection();
// Add logging
services.AddLogging(builder =>
{
builder.AddConsole();
builder.SetMinimumLevel(LogLevel.Information);
});
// Configure SourceFlow with automatic discovery
services.UseSourceFlow();
// Or configure manually for specific components
services.UseSourceFlow(config =>
{
config.WithAggregate<AccountAggregate>();
config.WithSaga<BankAccount, AccountSaga>();
config.WithService<AccountService>();
});
var serviceProvider = services.BuildServiceProvider();
// 1. Define your entity
public class BankAccount : IEntity
{
public int Id { get; set; }
public string AccountName { get; set; }
public decimal Balance { get; set; }
}
// 2. Create commands
public class CreateAccount : Command<Payload>
{
public CreateAccount(Payload payload) : base(payload) { }
}
// 3. Define events
public class AccountCreated : Event<BankAccount>
{
public AccountCreated(BankAccount payload) : base(payload) { }
}
// 4. Implement saga
public class AccountSaga : Saga<BankAccount>, IHandles<CreateAccount>
{
public async Task Handle(CreateAccount command)
{
var account = new BankAccount
{
Id = command.Payload.Id,
AccountName = command.Payload.AccountName,
Balance = command.Payload.InitialAmount
};
await repository.Persist(account);
await Raise(new AccountCreated(account));
}
}
// 5. Build aggregate
public class AccountAggregate : Aggregate<BankAccount>
{
public void CreateAccount(int accountId, string holder, decimal amount)
{
Send(new CreateAccount(new Payload
{
Id = accountId,
AccountName = holder,
InitialAmount = amount
}));
}
}
Aggregates are the primary building blocks that encapsulate business logic and coordinate with the command bus.
public abstract class Aggregate<TAggregate> : IAggregate
where TAggregate : class, IEntity
{
protected ICommandPublisher commandPublisher;
protected ICommandReplayer commandReplayer;
protected ILogger logger;
// Replay event stream for aggregate
public Task Replay(int AggregateId);
// Send commands to command bus
protected Task Send(ICommand command);
}
Key Features:
- Command publishing
- Event replay capability
- Logger integration
- Generic entity support
Sagas handle commands and coordinate business processes, maintaining consistency across aggregate boundaries.
public abstract class Saga<TAggregate> : ISaga<TAggregate>
where TAggregate : class, IEntity
{
protected ICommandPublisher commandPublisher;
protected IEventQueue eventQueue;
protected IRepository repository;
protected ILogger logger;
// Publish commands
protected Task Publish<TCommand>(TCommand command);
// Raise events
protected Task Raise<TEvent>(TEvent @event);
}
Key Features:
- Dynamic command handling
- Event publishing
- Repository access
- Built-in logging
The command bus routes commands to appropriate saga handlers and manages command persistence.
internal class CommandBus : ICommandBus
{
// Publish commands to sagas
Task Publish<TCommand>(TCommand command);
// Replay commands for aggregate
Task Replay(int aggregateId);
// Event dispatchers
event EventHandler<ICommand> Dispatchers;
}
The event queue manages event flow and dispatches events to subscribers.
internal class EventQueue : IEventQueue
{
// Enqueue events for processing
Task Enqueue<TEvent>(TEvent @event);
// Event dispatchers
event EventHandler<IEvent> Dispatchers;
}
Projections create and maintain read models based on events.
public interface IProjectOn<TEvent> : IProjection
where TEvent : IEvent
{
Task Apply(TEvent @event);
}
// Example implementation
public class AccountView : IProjectOn<AccountCreated>, IProjectOn<AccountUpdated>
{
public async Task Apply(AccountCreated @event)
{
var view = new AccountViewModel
{
Id = @event.Payload.Id,
AccountName = @event.Payload.AccountName,
CurrentBalance = @event.Payload.Balance
};
await provider.Push(view);
}
}
Let's implement a complete banking feature using SourceFlow.Net:
// Entity
public class BankAccount : IEntity
{
public int Id { get; set; }
public string AccountName { get; set; }
public decimal Balance { get; set; }
public bool IsClosed { get; set; }
public DateTime CreatedOn { get; set; }
}
// Command Payloads
public class CreateAccountPayload : IPayload
{
public int Id { get; set; }
public string AccountName { get; set; }
public decimal InitialAmount { get; set; }
}
public class TransactionPayload : IPayload
{
public int Id { get; set; }
public decimal Amount { get; set; }
public TransactionType Type { get; set; }
}
public class CreateAccount : Command<CreateAccountPayload>
{
public CreateAccount(CreateAccountPayload payload) : base(payload) { }
}
public class DepositMoney : Command<TransactionPayload>
{
public DepositMoney(TransactionPayload payload) : base(payload) { }
}
public class WithdrawMoney : Command<TransactionPayload>
{
public WithdrawMoney(TransactionPayload payload) : base(payload) { }
}
public class AccountCreated : Event<BankAccount>
{
public AccountCreated(BankAccount payload) : base(payload) { }
}
public class MoneyDeposited : Event<BankAccount>
{
public MoneyDeposited(BankAccount payload) : base(payload) { }
}
public class MoneyWithdrawn : Event<BankAccount>
{
public MoneyWithdrawn(BankAccount payload) : base(payload) { }
}
public class AccountSaga : Saga<BankAccount>,
IHandles<CreateAccount>,
IHandles<DepositMoney>,
IHandles<WithdrawMoney>
{
public async Task Handle(CreateAccount command)
{
// Validation
if (string.IsNullOrEmpty(command.Payload.AccountName))
throw new ArgumentException("Account name is required");
if (command.Payload.InitialAmount <= 0)
throw new ArgumentException("Initial amount must be positive");
// Create entity
var account = new BankAccount
{
Id = command.Payload.Id,
AccountName = command.Payload.AccountName,
Balance = command.Payload.InitialAmount,
CreatedOn = DateTime.UtcNow
};
// Persist
await repository.Persist(account);
// Raise event
await Raise(new AccountCreated(account));
}
public async Task Handle(DepositMoney command)
{
var account = await repository.Get<BankAccount>(command.Payload.Id);
if (account.IsClosed)
throw new InvalidOperationException("Cannot deposit to closed account");
account.Balance += command.Payload.Amount;
await repository.Persist(account);
await Raise(new MoneyDeposited(account));
}
public async Task Handle(WithdrawMoney command)
{
var account = await repository.Get<BankAccount>(command.Payload.Id);
if (account.IsClosed)
throw new InvalidOperationException("Cannot withdraw from closed account");
if (account.Balance < command.Payload.Amount)
throw new InvalidOperationException("Insufficient funds");
account.Balance -= command.Payload.Amount;
await repository.Persist(account);
await Raise(new MoneyWithdrawn(account));
}
}
public class AccountAggregate : Aggregate<BankAccount>
{
public void CreateAccount(int accountId, string holder, decimal amount)
{
Send(new CreateAccount(new CreateAccountPayload
{
Id = accountId,
AccountName = holder,
InitialAmount = amount
}));
}
public void Deposit(int accountId, decimal amount)
{
Send(new DepositMoney(new TransactionPayload
{
Id = accountId,
Amount = amount,
Type = TransactionType.Deposit
}));
}
public void Withdraw(int accountId, decimal amount)
{
Send(new WithdrawMoney(new TransactionPayload
{
Id = accountId,
Amount = amount,
Type = TransactionType.Withdrawal
}));
}
}
public class AccountViewModel : IViewModel
{
public int Id { get; set; }
public string AccountName { get; set; }
public decimal CurrentBalance { get; set; }
public DateTime CreatedDate { get; set; }
public int TransactionCount { get; set; }
public bool IsClosed { get; set; }
}
public class AccountProjection : IProjectOn<AccountCreated>,
IProjectOn<MoneyDeposited>,
IProjectOn<MoneyWithdrawn>
{
private readonly IViewProvider provider;
public AccountProjection(IViewProvider provider)
{
this.provider = provider;
}
public async Task Apply(AccountCreated @event)
{
var view = new AccountViewModel
{
Id = @event.Payload.Id,
AccountName = @event.Payload.AccountName,
CurrentBalance = @event.Payload.Balance,
CreatedDate = @event.Payload.CreatedOn,
TransactionCount = 0,
IsClosed = false
};
await provider.Push(view);
}
public async Task Apply(MoneyDeposited @event)
{
var view = await provider.Find<AccountViewModel>(@event.Payload.Id);
view.CurrentBalance = @event.Payload.Balance;
view.TransactionCount++;
await provider.Push(view);
}
public async Task Apply(MoneyWithdrawn @event)
{
var view = await provider.Find<AccountViewModel>(@event.Payload.Id);
view.CurrentBalance = @event.Payload.Balance;
view.TransactionCount++;
await provider.Push(view);
}
}
public interface IAccountService
{
Task<int> CreateAccountAsync(string holderName, decimal initialAmount);
Task DepositAsync(int accountId, decimal amount);
Task WithdrawAsync(int accountId, decimal amount);
Task ReplayHistoryAsync(int accountId);
}
public class AccountService : Service, IAccountService
{
public async Task<int> CreateAccountAsync(string holderName, decimal initialAmount)
{
var aggregate = await CreateAggregate<AccountAggregate>();
var accountId = new Random().Next();
aggregate.CreateAccount(accountId, holderName, initialAmount);
return accountId;
}
public async Task DepositAsync(int accountId, decimal amount)
{
var aggregate = await CreateAggregate<AccountAggregate>();
aggregate.Deposit(accountId, amount);
}
public async Task WithdrawAsync(int accountId, decimal amount)
{
var aggregate = await CreateAggregate<AccountAggregate>();
aggregate.Withdraw(accountId, amount);
}
public async Task ReplayHistoryAsync(int accountId)
{
var aggregate = await CreateAggregate<AccountAggregate>();
await aggregate.Replay(accountId);
}
}
SourceFlow.Net provides built-in event replay functionality for debugging and state reconstruction:
// Replay all commands for an aggregate
await accountService.ReplayHistoryAsync(accountId);
// The framework automatically handles:
// 1. Loading commands from store
// 2. Marking commands as replay
// 3. Re-executing command handlers
// 4. Updating projections
Every command and event includes rich metadata:
public class Metadata
{
public Guid EventId { get; set; }
public bool IsReplay { get; set; }
public DateTime OccurredOn { get; set; }
public int SequenceNo { get; set; }
public Dictionary<string, object> Properties { get; set; }
}
SourceFlow.Net seamlessly integrates with .NET's dependency injection:
// Automatic registration
services.UseSourceFlow();
// Manual registration with factories
services.UseSourceFlow(config =>
{
config.WithAggregate<AccountAggregate>(provider =>
new AccountAggregate(provider.GetService<ICustomService>()));
config.WithSaga<BankAccount, AccountSaga>(provider =>
new AccountSaga(provider.GetService<IExternalService>()));
});
SourceFlow.Net is well-positioned for microservices scenarios. Here are suggested extensions:
// Proposed extension for distributed scenarios
public interface IDistributedCommandBus : ICommandBus
{
Task PublishToService<TCommand>(TCommand command, string serviceName);
Task<TResponse> SendQuery<TQuery, TResponse>(TQuery query, string serviceName);
}
// Implementation using message brokers
public class RabbitMQCommandBus : IDistributedCommandBus
{
// Route commands across service boundaries
// Handle retries and dead letter queues
// Provide delivery guarantees
}
// Partition events across multiple stores
public interface IPartitionedEventStore : ICommandStore
{
Task<string> GetPartitionKey(int aggregateId);
Task<ICommandStore> GetPartitionStore(string partitionKey);
}
// Distributed saga coordinator
public abstract class DistributedSaga<TAggregate> : Saga<TAggregate>
{
protected Task PublishToService<TCommand>(TCommand command, string service);
protected Task CompensateCommand<TCommand>(TCommand command);
protected Task SetSagaTimeout(TimeSpan timeout);
}
// Proposed Azure Service Bus extension
public static class SourceFlowAzureExtensions
{
public static IServiceCollection AddAzureServiceBus(
this IServiceCollection services,
string connectionString)
{
services.AddSingleton<ICommandBus, AzureServiceBusCommandBus>();
services.AddSingleton<IEventQueue, AzureServiceBusEventQueue>();
return services;
}
}
public class AzureServiceBusCommandBus : ICommandBus
{
// Leverage Azure Service Bus topics and subscriptions
// Dead letter queue handling
// Auto-scaling based on message volume
// Session-based message ordering
}
// AWS EventBridge integration
public class AwsEventBridgeEventQueue : IEventQueue
{
// Custom event bus routing
// Cross-account event publishing
// Event replay from CloudWatch
// Schema registry integration
}
public class GoogleCloudPubSubEventQueue : IEventQueue
{
// Exactly-once delivery
// Message ordering keys
// Dead letter topics
// Flow control
}
public static class TracingExtensions
{
public static IServiceCollection AddSourceFlowTracing(
this IServiceCollection services)
{
// OpenTelemetry integration
// Command/event correlation
// Performance metrics
// Error tracking
}
}
public interface IEventAnalytics
{
Task<EventMetrics> GetAggregateMetrics(int aggregateId);
Task<IEnumerable<EventPattern>> DetectPatterns(TimeSpan window);
Task<PerformanceReport> GeneratePerformanceReport();
}
public interface ICommandAuthorizer
{
Task<bool> CanExecute<TCommand>(TCommand command, ClaimsPrincipal user);
Task LogSecurityEvent(SecurityEvent securityEvent);
}
// Usage in saga
public class SecureAccountSaga : Saga<BankAccount>, IHandles<CreateAccount>
{
private readonly ICommandAuthorizer authorizer;
public async Task Handle(CreateAccount command)
{
if (!await authorizer.CanExecute(command, currentUser))
throw new UnauthorizedAccessException();
// Process command
}
}
public interface IEventEncryption
{
Task<byte[]> EncryptEvent<TEvent>(TEvent @event);
Task<TEvent> DecryptEvent<TEvent>(byte[] encryptedData);
}
// SQL Server
services.AddSourceFlowSqlServer(connectionString);
// MongoDB
services.AddSourceFlowMongoDB(connectionString);
// PostgreSQL
services.AddSourceFlowPostgreSQL(connectionString);
// Cosmos DB
services.AddSourceFlowCosmosDB(endpoint, key);
public interface ISnapshotStore
{
Task SaveSnapshot<TAggregate>(int aggregateId, TAggregate snapshot);
Task<TAggregate> LoadSnapshot<TAggregate>(int aggregateId);
Task<bool> ShouldCreateSnapshot(int aggregateId, int eventCount);
}
public class SourceFlowTestHarness
{
public async Task<TestResult> ExecuteScenario<TCommand>(TCommand command)
{
// Execute command
// Capture events
// Verify projections
// Return results
}
public async Task<List<IEvent>> GetPublishedEvents()
{
// Return all events published during test
}
}
public interface ITimeProvider
{
DateTime UtcNow { get; }
void SetTime(DateTime time);
void AdvanceTime(TimeSpan duration);
}
// ✅ Good: Specific, intention-revealing commands
public class WithdrawMoney : Command<WithdrawPayload> { }
public class DepositMoney : Command<DepositPayload> { }
// ❌ Bad: Generic, unclear commands
public class UpdateAccount : Command<AccountPayload> { }
// ✅ Good: Fine-grained, specific events
public class AccountCreated : Event<BankAccount> { }
public class AccountCredited : Event<BankAccount> { }
public class AccountDebited : Event<BankAccount> { }
// ❌ Bad: Coarse-grained, generic events
public class AccountChanged : Event<BankAccount> { }
// ✅ Good: Single responsibility, clear boundaries
public class AccountSaga : Saga<BankAccount>,
IHandles<CreateAccount>,
IHandles<CloseAccount>
{
// Handles account lifecycle only
}
public class TransactionSaga : Saga<Transaction>,
IHandles<DepositMoney>,
IHandles<WithdrawMoney>
{
// Handles transaction processing only
}
// ❌ Bad: Multiple responsibilities
public class MegaSaga : Saga<BankAccount>,
IHandles<CreateAccount>,
IHandles<DepositMoney>,
IHandles<ProcessLoan>,
IHandles<SendEmail>
{
// Too many responsibilities
}
// ✅ Good: Purpose-built projections
public class AccountSummaryProjection : IProjectOn<AccountCreated> { }
public class TransactionHistoryProjection : IProjectOn<MoneyDeposited> { }
// ❌ Bad: One projection trying to do everything
public class EverythingProjection : IProjectOn<AccountCreated>,
IProjectOn<MoneyDeposited>,
IProjectOn<UserRegistered>,
IProjectOn<OrderPlaced>
{
// Too broad scope
}
public class AccountSaga : Saga<BankAccount>
{
public async Task Handle(WithdrawMoney command)
{
try
{
var account = await repository.Get<BankAccount>(command.Payload.Id);
// Validate business rules
if (account.IsClosed)
throw new AccountClosedException($"Account {account.Id} is closed");
if (account.Balance < command.Payload.Amount)
throw new InsufficientFundsException($"Insufficient funds in account {account.Id}");
// Process transaction
account.Balance -= command.Payload.Amount;
await repository.Persist(account);
await Raise(new MoneyWithdrawn(account));
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to process withdrawal for account {AccountId}",
command.Payload.Id);
// Publish compensation event if needed
await Raise(new WithdrawalFailed(new WithdrawalFailureDetails
{
AccountId = command.Payload.Id,
Amount = command.Payload.Amount,
Reason = ex.Message
}));
throw;
}
}
}
A: SourceFlow.Net focuses on simplicity and developer experience while providing enterprise-grade features:
- vs EventStore: More .NET-centric, integrated CQRS support
- vs NEventStore: Modern async/await patterns, better DI integration
- vs Marten: Framework-agnostic, not tied to PostgreSQL
- vs Axon Framework: Native .NET, not Java-based
A: Yes! SourceFlow.Net is designed for incremental adoption:
- Start with new features using event sourcing
- Gradually migrate existing features
- Use projections to maintain backward compatibility
- Leverage anti-corruption layers to bridge old and new systems
A: SourceFlow.Net is designed with performance in mind:
Advantages:
- Write operations are append-only (very fast)
- Read models can be optimized for specific queries
- Horizontal scaling through event stream partitioning
- Caching strategies for frequently accessed aggregates
Considerations:
- Initial load time for aggregates with many events (solved with snapshots)
- Storage growth over time (managed through archiving strategies)
- Projection rebuilding time (can be done offline)
Optimization Tips:
// Use snapshots for aggregates with many events
public class OptimizedAccountSaga : Saga<BankAccount>
{
private const int SNAPSHOT_FREQUENCY = 100;
public async Task Handle(DepositMoney command)
{
var account = await GetAccountWithSnapshot(command.Payload.Id);
// Process command...
if (ShouldCreateSnapshot(account))
await CreateSnapshot(account);
}
}
A: SourceFlow.Net supports versioning strategies:
// Version 1
public class AccountCreatedV1 : Event<BankAccount>
{
// Original fields
}
// Version 2 with additional fields
public class AccountCreatedV2 : Event<BankAccount>
{
// Original fields + new fields
}
// Upcasting handler
public class AccountEventUpcaster : IEventUpcaster
{
public IEvent Upcast(IEvent @event)
{
if (@event is AccountCreatedV1 v1)
{
return new AccountCreatedV2
{
// Map old fields to new structure
// Provide defaults for new fields
};
}
return @event;
}
}
A: SourceFlow.Net promotes testable design:
[Test]
public async Task Should_Create_Account_When_Valid_Command()
{
// Arrange
var saga = new AccountSaga();
var command = new CreateAccount(new CreateAccountPayload
{
Id = 123,
AccountName = "John Doe",
InitialAmount = 1000m
});
// Act
await saga.Handle(command);
// Assert
var events = saga.GetUncommittedEvents();
Assert.That(events.Count, Is.EqualTo(1));
Assert.That(events[0], Is.TypeOf<AccountCreated>());
}
A: Use the Saga pattern for distributed coordination:
public class TransferMoneySaga : Saga<MoneyTransfer>,
IHandles<InitiateTransfer>,
IHandles<MoneyDebited>,
IHandles<MoneyCredited>,
IHandles<TransferFailed>
{
public async Task Handle(InitiateTransfer command)
{
// Step 1: Debit source account
await Publish(new DebitAccount(new DebitPayload
{
AccountId = command.Payload.FromAccountId,
Amount = command.Payload.Amount,
TransferId = command.Payload.TransferId
}));
}
public async Task Handle(MoneyDebited @event)
{
// Step 2: Credit destination account
await Publish(new CreditAccount(new CreditPayload
{
AccountId = transfer.ToAccountId,
Amount = @event.Payload.Amount,
TransferId = @event.Payload.TransferId
}));
}
public async Task Handle(TransferFailed @event)
{
// Compensate: Refund source account
await Publish(new CreditAccount(new CreditPayload
{
AccountId = transfer.FromAccountId,
Amount = @event.Payload.Amount,
TransferId = @event.Payload.TransferId
}));
}
}
A: Integrate authorization at multiple levels:
// Command level authorization
public class AuthorizedAccountSaga : Saga<BankAccount>
{
private readonly IAuthorizationService authService;
public async Task Handle(WithdrawMoney command)
{
var authResult = await authService.AuthorizeAsync(
currentUser,
command.Payload.AccountId,
"WithdrawMoney");
if (!authResult.Succeeded)
throw new UnauthorizedAccessException();
// Process command...
}
}
// Service level authorization
public class AccountService : Service, IAccountService
{
[Authorize("AccountOwner")]
public async Task WithdrawAsync(int accountId, decimal amount)
{
var aggregate = await CreateAggregate<AccountAggregate>();
aggregate.Withdraw(accountId, amount);
}
}
// Deploy new version alongside old version
// Route traffic gradually
// Event replay ensures consistency across versions
public class VersionedEventHandler
{
public async Task Handle(IEvent @event)
{
var version = GetDeploymentVersion();
await ProcessEventForVersion(@event, version);
}
}
// Update instances one at a time
// Events ensure eventual consistency
// Use health checks to verify deployment
public class HealthCheckExtensions
{
public static IServiceCollection AddSourceFlowHealthChecks(
this IServiceCollection services)
{
services.AddHealthChecks()
.AddCheck<CommandStoreHealthCheck>("commandstore")
.AddCheck<EventQueueHealthCheck>("eventqueue")
.AddCheck<ProjectionHealthCheck>("projections");
return services;
}
}
public class MetricsCollector
{
public void RecordCommandProcessed(string commandType, TimeSpan duration)
{
Metrics.CreateCounter("sourceflow_commands_total")
.WithTag("command_type", commandType)
.Increment();
Metrics.CreateHistogram("sourceflow_command_duration")
.WithTag("command_type", commandType)
.Record(duration.TotalMilliseconds);
}
}
public class TracingCommandBus : ICommandBus
{
public async Task Publish<TCommand>(TCommand command) where TCommand : ICommand
{
using var activity = ActivitySource.StartActivity($"Command.{typeof(TCommand).Name}");
activity?.SetTag("command.id", command.Metadata.EventId.ToString());
activity?.SetTag("aggregate.id", command.Payload.Id.ToString());
try
{
await commandBus.Publish(command);
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}
}
public interface IEventArchiver
{
Task ArchiveEvents(int aggregateId, DateTime beforeDate);
Task<IEnumerable<ICommand>> RetrieveArchivedEvents(int aggregateId);
Task DeleteArchivedEvents(int aggregateId, DateTime beforeDate);
}
public class S3EventArchiver : IEventArchiver
{
public async Task ArchiveEvents(int aggregateId, DateTime beforeDate)
{
var events = await commandStore.LoadBefore(aggregateId, beforeDate);
var archived = await UploadToS3(events);
await commandStore.MarkAsArchived(events);
}
}
public class ProjectionRecoveryService
{
public async Task RebuildProjection<TProjection>() where TProjection : IProjection
{
// 1. Clear existing projection data
await viewProvider.ClearViews<TProjection>();
// 2. Replay all events
var allEvents = await eventStore.LoadAllEvents();
// 3. Apply events to projection
var projection = serviceProvider.GetService<TProjection>();
foreach (var @event in allEvents)
{
await ApplyEventToProjection(projection, @event);
}
}
}
We welcome contributions! Here's how to get involved:
# Fork the repository
git fork https://github.com/CodeShayk/SourceFlow.Net
# Create a feature branch
git checkout -b feature/your-feature-name
# Make your changes and add tests
# Ensure all tests pass
dotnet test
# Submit a pull request
- Improve existing documentation
- Add code examples
- Create tutorials and guides
- Translate documentation
- Answer questions on GitHub Issues
- Help with troubleshooting
- Share your experiences
- Write blog posts and articles
- GitHub Repository: https://github.com/CodeShayk/SourceFlow.Net
- Documentation: https://sourceflow.net/docs
- Quick Start Guide: https://sourceflow.net/quick-start
- Examples: https://github.com/CodeShayk/SourceFlow.Net/tree/master/examples
SourceFlow.Net is released under the MIT License, making it free for both commercial and open-source use.
SourceFlow.Net provides a robust, scalable foundation for building event-sourced applications with .NET. By combining the power of Event Sourcing, Domain-Driven Design, and CQRS patterns, it enables developers to create maintainable, auditable, and performant systems.
The framework's emphasis on clean architecture, dependency injection integration, and developer experience makes it an excellent choice for both greenfield projects and incremental adoption in existing applications.
Whether you're building a simple CRUD application that could benefit from an audit trail, or a complex distributed system requiring eventual consistency and high scalability, SourceFlow.Net provides the tools and patterns you need to succeed.
- Start Small: Begin with the quick start guide and simple examples
- Learn the Patterns: Understand Event Sourcing, DDD, and CQRS concepts
- Build a Prototype: Create a small application using SourceFlow.Net
- Scale Up: Apply the patterns to larger, more complex scenarios
- Contribute: Join the community and help improve the framework
Start your journey with SourceFlow.Net today and build better software with events as your foundation!