Skip to content
Najaf Shaikh edited this page Aug 12, 2025 · 12 revisions

SourceFlow.Net - Complete Developer Guide

Table of Contents

  1. Introduction
  2. Core Concepts
  3. Architecture Overview
  4. Getting Started
  5. Framework Components
  6. Implementation Guide
  7. Advanced Features
  8. Future Extensions
  9. Best Practices
  10. FAQ

Introduction

SourceFlow.Net is a modern, lightweight, and extensible .NET framework designed for building scalable event-sourced applications using Domain-Driven Design (DDD) principles and Command Query Responsibility Segregation (CQRS) patterns. Built from the ground up for .NET 8+ with performance and developer experience as core priorities.

What Makes SourceFlow.Net Special?

SourceFlow.Net provides a complete toolkit for event sourcing, domain modeling, and command/query separation, enabling developers to build maintainable, scalable applications with a strong foundation in proven architectural patterns.


Core Concepts

Event Sourcing

Event Sourcing is an architectural pattern where the state of an application is determined by a sequence of events. Instead of storing the current state directly, the system stores all the events that have occurred, allowing for complete state reconstruction at any point in time.

Key Benefits:

  • Complete Audit Trail: Every change is recorded as an immutable event
  • Time Travel: Reconstruct system state at any point in history
  • Debugging: Full visibility into how the system reached its current state
  • Scalability: Events can be replayed to build multiple read models

Example in SourceFlow.Net:

// Events are immutable records of what happened
public class AccountCreated : Event<BankAccount>
{
    public AccountCreated(BankAccount payload) : base(payload) { }
}

public class MoneyDeposited : Event<BankAccount>
{
    public MoneyDeposited(BankAccount payload) : base(payload) { }
}

Domain-Driven Design (DDD)

Domain-Driven Design is a software design approach that focuses on modeling software to match the business domain. It emphasizes collaboration between technical and domain experts to create a shared understanding of the problem space.

Core DDD Elements in SourceFlow.Net:

Aggregate Roots: Domain objects that ensure consistency and encapsulate business logic

public class AccountAggregate : Aggregate<BankAccount>
{
    public void CreateAccount(int accountId, string holder, decimal amount)
    {
        // Business logic validation
        Send(new CreateAccount(new Payload
        {
            Id = accountId,
            AccountName = holder,
            InitialAmount = amount
        }));
    }
}

Entities: Objects with unique identity

public class BankAccount : IEntity
{
    public int Id { get; set; }
    public string AccountName { get; set; }
    public decimal Balance { get; set; }
    public bool IsClosed { get; set; }
}

Value Objects: Immutable objects representing descriptive aspects Domain Events: Meaningful occurrences within the domain

Command Query Responsibility Segregation (CQRS)

CQRS separates read and write operations, allowing for optimized data models for different purposes. Commands change state, while queries retrieve data.

Benefits:

  • Independent Scaling: Read and write sides can scale independently
  • Optimized Models: Different models for different use cases
  • Simplified Queries: Read models can be denormalized for performance
  • Security: Fine-grained access control

CQRS in SourceFlow.Net:

Commands: Represent intent to change state

public class CreateAccount : Command<Payload>
{
    public CreateAccount(Payload payload) : base(payload) { }
}

Queries: Handled through optimized view models

public class AccountViewModel : IViewModel
{
    public int Id { get; set; }
    public string AccountName { get; set; }
    public decimal CurrentBalance { get; set; }
    public DateTime LastUpdated { get; set; }
}

Architecture Overview

High-Level Architecture

┌─────────────────────┐    ┌─────────────────────┐    ┌─────────────────────┐
│     Aggregate       │    │        Sagas        │    │    Projections      │
│                     │    │                     │    │                     │
│ ┌─────────────────┐ │    │ ┌─────────────────┐ │    │ ┌─────────────────┐ │
│ │ AccountAggregate│ │    │ │ AccountSaga     │ │    │ │   AccountView   │ │
│ └─────────────────┘ │    │ └─────────────────┘ │    │ └─────────────────┘ │
└─────────────────────┘ <-|└─────────────────────┘ |->└─────────────────────┘
           │            Events        │          Events          │
        Commands          |____     Events     ____│          ViewData   
           ▼                  |       ▼       │                  ▼
┌─────────────────────┐    ┌─────────────────────┐    ┌─────────────────────┐
│   Command Bus       │    │     Event Queue     │    │   View Provider     │
│                     │    │                     │    │                     │
│ ┌─────────────────┐ │    │ ┌─────────────────┐ │    │ ┌─────────────────┐ │
│ │ CommandPublisher│ │    │ │  EventQueue     │ │    │ │ InMemoryProvider│ │
│ │ CommandReplayer │ │    │ │  Dispatchers    │ │    │ │                 │ │
│ └─────────────────┘ │    │ └─────────────────┘ │    │ └─────────────────┘ │
└─────────────────────┘    └─────────────────────┘    └─────────────────────┘
           │     │--------------------Commands-------------------
        Commands                                                │
           ▼     |--Entities                                    ▼
┌─────────────────────┐  | ┌─────────────────────┐    ┌─────────────────────┐
│       Sagas         │  | │     Repository      │    │   Command Store     │
│                     │  | │                     │    │                     │
│ ┌─────────────────┐ │  ->│ ┌─────────────────┐ │    │ ┌─────────────────┐ │
│ │  AccountSaga    │ │    │ │InMemoryRepository││    │ │InMemoryCommStore│ │
│ └─────────────────┘ │    │ └─────────────────┘ │    │ └─────────────────┘ │
└─────────────────────┘    └─────────────────────┘    └─────────────────────┘

Component Interactions

  1. Services provide the application API and orchestrate business operations
  2. Aggregates encapsulate business logic and send commands
  3. Command Bus routes commands to appropriate sagas
  4. Sagas handle commands and maintain consistency across aggregates
  5. Event Queue publishes events to subscribers
  6. Projections update read models based on events
  7. Repository provides persistence for entities
  8. Command Store persists commands for replay capability

Getting Started

Installation

# Install the core package
dotnet add package SourceFlow.Net

# For SQL Server support (when available)
dotnet add package SourceFlow.Net.SqlServer

Basic Setup

// Program.cs
using SourceFlow;

var services = new ServiceCollection();

// Add logging
services.AddLogging(builder =>
{
    builder.AddConsole();
    builder.SetMinimumLevel(LogLevel.Information);
});

// Configure SourceFlow with automatic discovery
services.UseSourceFlow();

// Or configure manually for specific components
services.UseSourceFlow(config =>
{
    config.WithAggregate<AccountAggregate>();
    config.WithSaga<BankAccount, AccountSaga>();
    config.WithService<AccountService>();
});

var serviceProvider = services.BuildServiceProvider();

Quick Example

// 1. Define your entity
public class BankAccount : IEntity
{
    public int Id { get; set; }
    public string AccountName { get; set; }
    public decimal Balance { get; set; }
}

// 2. Create commands
public class CreateAccount : Command<Payload>
{
    public CreateAccount(Payload payload) : base(payload) { }
}

// 3. Define events
public class AccountCreated : Event<BankAccount>
{
    public AccountCreated(BankAccount payload) : base(payload) { }
}

// 4. Implement saga
public class AccountSaga : Saga<BankAccount>, IHandles<CreateAccount>
{
    public async Task Handle(CreateAccount command)
    {
        var account = new BankAccount
        {
            Id = command.Payload.Id,
            AccountName = command.Payload.AccountName,
            Balance = command.Payload.InitialAmount
        };

        await repository.Persist(account);
        await Raise(new AccountCreated(account));
    }
}

// 5. Build aggregate
public class AccountAggregate : Aggregate<BankAccount>
{
    public void CreateAccount(int accountId, string holder, decimal amount)
    {
        Send(new CreateAccount(new Payload
        {
            Id = accountId,
            AccountName = holder,
            InitialAmount = amount
        }));
    }
}

Framework Components

1. Aggregates

Aggregates are the primary building blocks that encapsulate business logic and coordinate with the command bus.

public abstract class Aggregate<TAggregate> : IAggregate
    where TAggregate : class, IEntity
{
    protected ICommandPublisher commandPublisher;
    protected ICommandReplayer commandReplayer;
    protected ILogger logger;

    // Replay event stream for aggregate
    public Task Replay(int AggregateId);
    
    // Send commands to command bus
    protected Task Send(ICommand command);
}

Key Features:

  • Command publishing
  • Event replay capability
  • Logger integration
  • Generic entity support

2. Sagas

Sagas handle commands and coordinate business processes, maintaining consistency across aggregate boundaries.

public abstract class Saga<TAggregate> : ISaga<TAggregate>
    where TAggregate : class, IEntity
{
    protected ICommandPublisher commandPublisher;
    protected IEventQueue eventQueue;
    protected IRepository repository;
    protected ILogger logger;

    // Publish commands
    protected Task Publish<TCommand>(TCommand command);
    
    // Raise events
    protected Task Raise<TEvent>(TEvent @event);
}

Key Features:

  • Dynamic command handling
  • Event publishing
  • Repository access
  • Built-in logging

3. Command Bus

The command bus routes commands to appropriate saga handlers and manages command persistence.

internal class CommandBus : ICommandBus
{
    // Publish commands to sagas
    Task Publish<TCommand>(TCommand command);
    
    // Replay commands for aggregate
    Task Replay(int aggregateId);
    
    // Event dispatchers
    event EventHandler<ICommand> Dispatchers;
}

4. Event Queue

The event queue manages event flow and dispatches events to subscribers.

internal class EventQueue : IEventQueue
{
    // Enqueue events for processing
    Task Enqueue<TEvent>(TEvent @event);
    
    // Event dispatchers
    event EventHandler<IEvent> Dispatchers;
}

5. Projections

Projections create and maintain read models based on events.

public interface IProjectOn<TEvent> : IProjection
    where TEvent : IEvent
{
    Task Apply(TEvent @event);
}

// Example implementation
public class AccountView : IProjectOn<AccountCreated>, IProjectOn<AccountUpdated>
{
    public async Task Apply(AccountCreated @event)
    {
        var view = new AccountViewModel
        {
            Id = @event.Payload.Id,
            AccountName = @event.Payload.AccountName,
            CurrentBalance = @event.Payload.Balance
        };
        await provider.Push(view);
    }
}

Implementation Guide

Creating a Complete Feature

Let's implement a complete banking feature using SourceFlow.Net:

1. Define Domain Objects

// Entity
public class BankAccount : IEntity
{
    public int Id { get; set; }
    public string AccountName { get; set; }
    public decimal Balance { get; set; }
    public bool IsClosed { get; set; }
    public DateTime CreatedOn { get; set; }
}

// Command Payloads
public class CreateAccountPayload : IPayload
{
    public int Id { get; set; }
    public string AccountName { get; set; }
    public decimal InitialAmount { get; set; }
}

public class TransactionPayload : IPayload
{
    public int Id { get; set; }
    public decimal Amount { get; set; }
    public TransactionType Type { get; set; }
}

2. Create Commands

public class CreateAccount : Command<CreateAccountPayload>
{
    public CreateAccount(CreateAccountPayload payload) : base(payload) { }
}

public class DepositMoney : Command<TransactionPayload>
{
    public DepositMoney(TransactionPayload payload) : base(payload) { }
}

public class WithdrawMoney : Command<TransactionPayload>
{
    public WithdrawMoney(TransactionPayload payload) : base(payload) { }
}

3. Define Events

public class AccountCreated : Event<BankAccount>
{
    public AccountCreated(BankAccount payload) : base(payload) { }
}

public class MoneyDeposited : Event<BankAccount>
{
    public MoneyDeposited(BankAccount payload) : base(payload) { }
}

public class MoneyWithdrawn : Event<BankAccount>
{
    public MoneyWithdrawn(BankAccount payload) : base(payload) { }
}

4. Implement Saga

public class AccountSaga : Saga<BankAccount>,
                           IHandles<CreateAccount>,
                           IHandles<DepositMoney>,
                           IHandles<WithdrawMoney>
{
    public async Task Handle(CreateAccount command)
    {
        // Validation
        if (string.IsNullOrEmpty(command.Payload.AccountName))
            throw new ArgumentException("Account name is required");

        if (command.Payload.InitialAmount <= 0)
            throw new ArgumentException("Initial amount must be positive");

        // Create entity
        var account = new BankAccount
        {
            Id = command.Payload.Id,
            AccountName = command.Payload.AccountName,
            Balance = command.Payload.InitialAmount,
            CreatedOn = DateTime.UtcNow
        };

        // Persist
        await repository.Persist(account);

        // Raise event
        await Raise(new AccountCreated(account));
    }

    public async Task Handle(DepositMoney command)
    {
        var account = await repository.Get<BankAccount>(command.Payload.Id);
        
        if (account.IsClosed)
            throw new InvalidOperationException("Cannot deposit to closed account");

        account.Balance += command.Payload.Amount;
        await repository.Persist(account);
        await Raise(new MoneyDeposited(account));
    }

    public async Task Handle(WithdrawMoney command)
    {
        var account = await repository.Get<BankAccount>(command.Payload.Id);
        
        if (account.IsClosed)
            throw new InvalidOperationException("Cannot withdraw from closed account");

        if (account.Balance < command.Payload.Amount)
            throw new InvalidOperationException("Insufficient funds");

        account.Balance -= command.Payload.Amount;
        await repository.Persist(account);
        await Raise(new MoneyWithdrawn(account));
    }
}

5. Create Aggregate

public class AccountAggregate : Aggregate<BankAccount>
{
    public void CreateAccount(int accountId, string holder, decimal amount)
    {
        Send(new CreateAccount(new CreateAccountPayload
        {
            Id = accountId,
            AccountName = holder,
            InitialAmount = amount
        }));
    }

    public void Deposit(int accountId, decimal amount)
    {
        Send(new DepositMoney(new TransactionPayload
        {
            Id = accountId,
            Amount = amount,
            Type = TransactionType.Deposit
        }));
    }

    public void Withdraw(int accountId, decimal amount)
    {
        Send(new WithdrawMoney(new TransactionPayload
        {
            Id = accountId,
            Amount = amount,
            Type = TransactionType.Withdrawal
        }));
    }
}

6. Build Read Models

public class AccountViewModel : IViewModel
{
    public int Id { get; set; }
    public string AccountName { get; set; }
    public decimal CurrentBalance { get; set; }
    public DateTime CreatedDate { get; set; }
    public int TransactionCount { get; set; }
    public bool IsClosed { get; set; }
}

public class AccountProjection : IProjectOn<AccountCreated>,
                                 IProjectOn<MoneyDeposited>,
                                 IProjectOn<MoneyWithdrawn>
{
    private readonly IViewProvider provider;

    public AccountProjection(IViewProvider provider)
    {
        this.provider = provider;
    }

    public async Task Apply(AccountCreated @event)
    {
        var view = new AccountViewModel
        {
            Id = @event.Payload.Id,
            AccountName = @event.Payload.AccountName,
            CurrentBalance = @event.Payload.Balance,
            CreatedDate = @event.Payload.CreatedOn,
            TransactionCount = 0,
            IsClosed = false
        };

        await provider.Push(view);
    }

    public async Task Apply(MoneyDeposited @event)
    {
        var view = await provider.Find<AccountViewModel>(@event.Payload.Id);
        view.CurrentBalance = @event.Payload.Balance;
        view.TransactionCount++;
        await provider.Push(view);
    }

    public async Task Apply(MoneyWithdrawn @event)
    {
        var view = await provider.Find<AccountViewModel>(@event.Payload.Id);
        view.CurrentBalance = @event.Payload.Balance;
        view.TransactionCount++;
        await provider.Push(view);
    }
}

7. Create Service Layer

public interface IAccountService
{
    Task<int> CreateAccountAsync(string holderName, decimal initialAmount);
    Task DepositAsync(int accountId, decimal amount);
    Task WithdrawAsync(int accountId, decimal amount);
    Task ReplayHistoryAsync(int accountId);
}

public class AccountService : Service, IAccountService
{
    public async Task<int> CreateAccountAsync(string holderName, decimal initialAmount)
    {
        var aggregate = await CreateAggregate<AccountAggregate>();
        var accountId = new Random().Next();
        
        aggregate.CreateAccount(accountId, holderName, initialAmount);
        return accountId;
    }

    public async Task DepositAsync(int accountId, decimal amount)
    {
        var aggregate = await CreateAggregate<AccountAggregate>();
        aggregate.Deposit(accountId, amount);
    }

    public async Task WithdrawAsync(int accountId, decimal amount)
    {
        var aggregate = await CreateAggregate<AccountAggregate>();
        aggregate.Withdraw(accountId, amount);
    }

    public async Task ReplayHistoryAsync(int accountId)
    {
        var aggregate = await CreateAggregate<AccountAggregate>();
        await aggregate.Replay(accountId);
    }
}

Advanced Features

Event Replay

SourceFlow.Net provides built-in event replay functionality for debugging and state reconstruction:

// Replay all commands for an aggregate
await accountService.ReplayHistoryAsync(accountId);

// The framework automatically handles:
// 1. Loading commands from store
// 2. Marking commands as replay
// 3. Re-executing command handlers
// 4. Updating projections

Metadata and Auditing

Every command and event includes rich metadata:

public class Metadata
{
    public Guid EventId { get; set; }
    public bool IsReplay { get; set; }
    public DateTime OccurredOn { get; set; }
    public int SequenceNo { get; set; }
    public Dictionary<string, object> Properties { get; set; }
}

Dependency Injection Integration

SourceFlow.Net seamlessly integrates with .NET's dependency injection:

// Automatic registration
services.UseSourceFlow();

// Manual registration with factories
services.UseSourceFlow(config =>
{
    config.WithAggregate<AccountAggregate>(provider => 
        new AccountAggregate(provider.GetService<ICustomService>()));
    
    config.WithSaga<BankAccount, AccountSaga>(provider =>
        new AccountSaga(provider.GetService<IExternalService>()));
});

Future Extensions

Microservices Architecture Integration

SourceFlow.Net is well-positioned for microservices scenarios. Here are suggested extensions:

1. Distributed Command Bus

// Proposed extension for distributed scenarios
public interface IDistributedCommandBus : ICommandBus
{
    Task PublishToService<TCommand>(TCommand command, string serviceName);
    Task<TResponse> SendQuery<TQuery, TResponse>(TQuery query, string serviceName);
}

// Implementation using message brokers
public class RabbitMQCommandBus : IDistributedCommandBus
{
    // Route commands across service boundaries
    // Handle retries and dead letter queues
    // Provide delivery guarantees
}

2. Event Store Partitioning

// Partition events across multiple stores
public interface IPartitionedEventStore : ICommandStore
{
    Task<string> GetPartitionKey(int aggregateId);
    Task<ICommandStore> GetPartitionStore(string partitionKey);
}

3. Cross-Service Sagas

// Distributed saga coordinator
public abstract class DistributedSaga<TAggregate> : Saga<TAggregate>
{
    protected Task PublishToService<TCommand>(TCommand command, string service);
    protected Task CompensateCommand<TCommand>(TCommand command);
    protected Task SetSagaTimeout(TimeSpan timeout);
}

Cloud Messaging Integration

1. Azure Service Bus Integration

// Proposed Azure Service Bus extension
public static class SourceFlowAzureExtensions
{
    public static IServiceCollection AddAzureServiceBus(
        this IServiceCollection services, 
        string connectionString)
    {
        services.AddSingleton<ICommandBus, AzureServiceBusCommandBus>();
        services.AddSingleton<IEventQueue, AzureServiceBusEventQueue>();
        return services;
    }
}

public class AzureServiceBusCommandBus : ICommandBus
{
    // Leverage Azure Service Bus topics and subscriptions
    // Dead letter queue handling
    // Auto-scaling based on message volume
    // Session-based message ordering
}

2. AWS EventBridge Integration

// AWS EventBridge integration
public class AwsEventBridgeEventQueue : IEventQueue
{
    // Custom event bus routing
    // Cross-account event publishing
    // Event replay from CloudWatch
    // Schema registry integration
}

3. Google Cloud Pub/Sub Integration

public class GoogleCloudPubSubEventQueue : IEventQueue
{
    // Exactly-once delivery
    // Message ordering keys
    // Dead letter topics
    // Flow control
}

Observability Extensions

1. Distributed Tracing

public static class TracingExtensions
{
    public static IServiceCollection AddSourceFlowTracing(
        this IServiceCollection services)
    {
        // OpenTelemetry integration
        // Command/event correlation
        // Performance metrics
        // Error tracking
    }
}

2. Event Store Analytics

public interface IEventAnalytics
{
    Task<EventMetrics> GetAggregateMetrics(int aggregateId);
    Task<IEnumerable<EventPattern>> DetectPatterns(TimeSpan window);
    Task<PerformanceReport> GeneratePerformanceReport();
}

Security Extensions

1. Command Authorization

public interface ICommandAuthorizer
{
    Task<bool> CanExecute<TCommand>(TCommand command, ClaimsPrincipal user);
    Task LogSecurityEvent(SecurityEvent securityEvent);
}

// Usage in saga
public class SecureAccountSaga : Saga<BankAccount>, IHandles<CreateAccount>
{
    private readonly ICommandAuthorizer authorizer;

    public async Task Handle(CreateAccount command)
    {
        if (!await authorizer.CanExecute(command, currentUser))
            throw new UnauthorizedAccessException();
        
        // Process command
    }
}

2. Event Encryption

public interface IEventEncryption
{
    Task<byte[]> EncryptEvent<TEvent>(TEvent @event);
    Task<TEvent> DecryptEvent<TEvent>(byte[] encryptedData);
}

Persistence Extensions

1. Multi-Database Support

// SQL Server
services.AddSourceFlowSqlServer(connectionString);

// MongoDB
services.AddSourceFlowMongoDB(connectionString);

// PostgreSQL
services.AddSourceFlowPostgreSQL(connectionString);

// Cosmos DB
services.AddSourceFlowCosmosDB(endpoint, key);

2. Event Store Snapshots

public interface ISnapshotStore
{
    Task SaveSnapshot<TAggregate>(int aggregateId, TAggregate snapshot);
    Task<TAggregate> LoadSnapshot<TAggregate>(int aggregateId);
    Task<bool> ShouldCreateSnapshot(int aggregateId, int eventCount);
}

Testing Extensions

1. Test Harness

public class SourceFlowTestHarness
{
    public async Task<TestResult> ExecuteScenario<TCommand>(TCommand command)
    {
        // Execute command
        // Capture events
        // Verify projections
        // Return results
    }

    public async Task<List<IEvent>> GetPublishedEvents()
    {
        // Return all events published during test
    }
}

2. Time Travel Testing

public interface ITimeProvider
{
    DateTime UtcNow { get; }
    void SetTime(DateTime time);
    void AdvanceTime(TimeSpan duration);
}

Best Practices

1. Command Design

// ✅ Good: Specific, intention-revealing commands
public class WithdrawMoney : Command<WithdrawPayload> { }
public class DepositMoney : Command<DepositPayload> { }

// ❌ Bad: Generic, unclear commands
public class UpdateAccount : Command<AccountPayload> { }

2. Event Granularity

// ✅ Good: Fine-grained, specific events
public class AccountCreated : Event<BankAccount> { }
public class AccountCredited : Event<BankAccount> { }
public class AccountDebited : Event<BankAccount> { }

// ❌ Bad: Coarse-grained, generic events
public class AccountChanged : Event<BankAccount> { }

3. Saga Design

// ✅ Good: Single responsibility, clear boundaries
public class AccountSaga : Saga<BankAccount>,
                           IHandles<CreateAccount>,
                           IHandles<CloseAccount>
{
    // Handles account lifecycle only
}

public class TransactionSaga : Saga<Transaction>,
                               IHandles<DepositMoney>,
                               IHandles<WithdrawMoney>
{
    // Handles transaction processing only
}

// ❌ Bad: Multiple responsibilities
public class MegaSaga : Saga<BankAccount>,
                        IHandles<CreateAccount>,
                        IHandles<DepositMoney>,
                        IHandles<ProcessLoan>,
                        IHandles<SendEmail>
{
    // Too many responsibilities
}

4. Projection Design

// ✅ Good: Purpose-built projections
public class AccountSummaryProjection : IProjectOn<AccountCreated> { }
public class TransactionHistoryProjection : IProjectOn<MoneyDeposited> { }

// ❌ Bad: One projection trying to do everything
public class EverythingProjection : IProjectOn<AccountCreated>,
                                    IProjectOn<MoneyDeposited>,
                                    IProjectOn<UserRegistered>,
                                    IProjectOn<OrderPlaced>
{
    // Too broad scope
}

5. Error Handling

public class AccountSaga : Saga<BankAccount>
{
    public async Task Handle(WithdrawMoney command)
    {
        try
        {
            var account = await repository.Get<BankAccount>(command.Payload.Id);
            
            // Validate business rules
            if (account.IsClosed)
                throw new AccountClosedException($"Account {account.Id} is closed");
            
            if (account.Balance < command.Payload.Amount)
                throw new InsufficientFundsException($"Insufficient funds in account {account.Id}");
            
            // Process transaction
            account.Balance -= command.Payload.Amount;
            await repository.Persist(account);
            await Raise(new MoneyWithdrawn(account));
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Failed to process withdrawal for account {AccountId}", 
                command.Payload.Id);
            
            // Publish compensation event if needed
            await Raise(new WithdrawalFailed(new WithdrawalFailureDetails
            {
                AccountId = command.Payload.Id,
                Amount = command.Payload.Amount,
                Reason = ex.Message
            }));
            
            throw;
        }
    }
}

FAQ

Q: How does SourceFlow.Net compare to other event sourcing frameworks?

A: SourceFlow.Net focuses on simplicity and developer experience while providing enterprise-grade features:

  • vs EventStore: More .NET-centric, integrated CQRS support
  • vs NEventStore: Modern async/await patterns, better DI integration
  • vs Marten: Framework-agnostic, not tied to PostgreSQL
  • vs Axon Framework: Native .NET, not Java-based

Q: Can I use SourceFlow.Net in existing applications?

A: Yes! SourceFlow.Net is designed for incremental adoption:

  1. Start with new features using event sourcing
  2. Gradually migrate existing features
  3. Use projections to maintain backward compatibility
  4. Leverage anti-corruption layers to bridge old and new systems

Q: What are the performance implications of event sourcing?

A: SourceFlow.Net is designed with performance in mind:

Advantages:

  • Write operations are append-only (very fast)
  • Read models can be optimized for specific queries
  • Horizontal scaling through event stream partitioning
  • Caching strategies for frequently accessed aggregates

Considerations:

  • Initial load time for aggregates with many events (solved with snapshots)
  • Storage growth over time (managed through archiving strategies)
  • Projection rebuilding time (can be done offline)

Optimization Tips:

// Use snapshots for aggregates with many events
public class OptimizedAccountSaga : Saga<BankAccount>
{
    private const int SNAPSHOT_FREQUENCY = 100;

    public async Task Handle(DepositMoney command)
    {
        var account = await GetAccountWithSnapshot(command.Payload.Id);
        // Process command...
        
        if (ShouldCreateSnapshot(account))
            await CreateSnapshot(account);
    }
}

Q: How do I handle schema evolution?

A: SourceFlow.Net supports versioning strategies:

// Version 1
public class AccountCreatedV1 : Event<BankAccount>
{
    // Original fields
}

// Version 2 with additional fields
public class AccountCreatedV2 : Event<BankAccount>
{
    // Original fields + new fields
}

// Upcasting handler
public class AccountEventUpcaster : IEventUpcaster
{
    public IEvent Upcast(IEvent @event)
    {
        if (@event is AccountCreatedV1 v1)
        {
            return new AccountCreatedV2
            {
                // Map old fields to new structure
                // Provide defaults for new fields
            };
        }
        return @event;
    }
}

Q: How do I test event-sourced systems?

A: SourceFlow.Net promotes testable design:

[Test]
public async Task Should_Create_Account_When_Valid_Command()
{
    // Arrange
    var saga = new AccountSaga();
    var command = new CreateAccount(new CreateAccountPayload
    {
        Id = 123,
        AccountName = "John Doe",
        InitialAmount = 1000m
    });

    // Act
    await saga.Handle(command);

    // Assert
    var events = saga.GetUncommittedEvents();
    Assert.That(events.Count, Is.EqualTo(1));
    Assert.That(events[0], Is.TypeOf<AccountCreated>());
}

Q: How do I handle distributed transactions?

A: Use the Saga pattern for distributed coordination:

public class TransferMoneySaga : Saga<MoneyTransfer>,
                                IHandles<InitiateTransfer>,
                                IHandles<MoneyDebited>,
                                IHandles<MoneyCredited>,
                                IHandles<TransferFailed>
{
    public async Task Handle(InitiateTransfer command)
    {
        // Step 1: Debit source account
        await Publish(new DebitAccount(new DebitPayload
        {
            AccountId = command.Payload.FromAccountId,
            Amount = command.Payload.Amount,
            TransferId = command.Payload.TransferId
        }));
    }

    public async Task Handle(MoneyDebited @event)
    {
        // Step 2: Credit destination account
        await Publish(new CreditAccount(new CreditPayload
        {
            AccountId = transfer.ToAccountId,
            Amount = @event.Payload.Amount,
            TransferId = @event.Payload.TransferId
        }));
    }

    public async Task Handle(TransferFailed @event)
    {
        // Compensate: Refund source account
        await Publish(new CreditAccount(new CreditPayload
        {
            AccountId = transfer.FromAccountId,
            Amount = @event.Payload.Amount,
            TransferId = @event.Payload.TransferId
        }));
    }
}

Q: How do I implement authorization?

A: Integrate authorization at multiple levels:

// Command level authorization
public class AuthorizedAccountSaga : Saga<BankAccount>
{
    private readonly IAuthorizationService authService;

    public async Task Handle(WithdrawMoney command)
    {
        var authResult = await authService.AuthorizeAsync(
            currentUser, 
            command.Payload.AccountId, 
            "WithdrawMoney");

        if (!authResult.Succeeded)
            throw new UnauthorizedAccessException();

        // Process command...
    }
}

// Service level authorization
public class AccountService : Service, IAccountService
{
    [Authorize("AccountOwner")]
    public async Task WithdrawAsync(int accountId, decimal amount)
    {
        var aggregate = await CreateAggregate<AccountAggregate>();
        aggregate.Withdraw(accountId, amount);
    }
}

Production Considerations

Deployment Strategies

1. Blue-Green Deployment

// Deploy new version alongside old version
// Route traffic gradually
// Event replay ensures consistency across versions

public class VersionedEventHandler
{
    public async Task Handle(IEvent @event)
    {
        var version = GetDeploymentVersion();
        await ProcessEventForVersion(@event, version);
    }
}

2. Rolling Updates

// Update instances one at a time
// Events ensure eventual consistency
// Use health checks to verify deployment

public class HealthCheckExtensions
{
    public static IServiceCollection AddSourceFlowHealthChecks(
        this IServiceCollection services)
    {
        services.AddHealthChecks()
                .AddCheck<CommandStoreHealthCheck>("commandstore")
                .AddCheck<EventQueueHealthCheck>("eventqueue")
                .AddCheck<ProjectionHealthCheck>("projections");
        
        return services;
    }
}

Monitoring and Observability

1. Metrics Collection

public class MetricsCollector
{
    public void RecordCommandProcessed(string commandType, TimeSpan duration)
    {
        Metrics.CreateCounter("sourceflow_commands_total")
               .WithTag("command_type", commandType)
               .Increment();

        Metrics.CreateHistogram("sourceflow_command_duration")
               .WithTag("command_type", commandType)
               .Record(duration.TotalMilliseconds);
    }
}

2. Distributed Tracing

public class TracingCommandBus : ICommandBus
{
    public async Task Publish<TCommand>(TCommand command) where TCommand : ICommand
    {
        using var activity = ActivitySource.StartActivity($"Command.{typeof(TCommand).Name}");
        activity?.SetTag("command.id", command.Metadata.EventId.ToString());
        activity?.SetTag("aggregate.id", command.Payload.Id.ToString());

        try
        {
            await commandBus.Publish(command);
            activity?.SetStatus(ActivityStatusCode.Ok);
        }
        catch (Exception ex)
        {
            activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
            throw;
        }
    }
}

Data Management

1. Event Store Archiving

public interface IEventArchiver
{
    Task ArchiveEvents(int aggregateId, DateTime beforeDate);
    Task<IEnumerable<ICommand>> RetrieveArchivedEvents(int aggregateId);
    Task DeleteArchivedEvents(int aggregateId, DateTime beforeDate);
}

public class S3EventArchiver : IEventArchiver
{
    public async Task ArchiveEvents(int aggregateId, DateTime beforeDate)
    {
        var events = await commandStore.LoadBefore(aggregateId, beforeDate);
        var archived = await UploadToS3(events);
        await commandStore.MarkAsArchived(events);
    }
}

2. Projection Recovery

public class ProjectionRecoveryService
{
    public async Task RebuildProjection<TProjection>() where TProjection : IProjection
    {
        // 1. Clear existing projection data
        await viewProvider.ClearViews<TProjection>();

        // 2. Replay all events
        var allEvents = await eventStore.LoadAllEvents();
        
        // 3. Apply events to projection
        var projection = serviceProvider.GetService<TProjection>();
        foreach (var @event in allEvents)
        {
            await ApplyEventToProjection(projection, @event);
        }
    }
}

Community and Support

Contributing to SourceFlow.Net

We welcome contributions! Here's how to get involved:

1. Code Contributions

# Fork the repository
git fork https://github.com/CodeShayk/SourceFlow.Net

# Create a feature branch
git checkout -b feature/your-feature-name

# Make your changes and add tests
# Ensure all tests pass
dotnet test

# Submit a pull request

2. Documentation

  • Improve existing documentation
  • Add code examples
  • Create tutorials and guides
  • Translate documentation

3. Community Support

  • Answer questions on GitHub Issues
  • Help with troubleshooting
  • Share your experiences
  • Write blog posts and articles

Resources

License

SourceFlow.Net is released under the MIT License, making it free for both commercial and open-source use.


Conclusion

SourceFlow.Net provides a robust, scalable foundation for building event-sourced applications with .NET. By combining the power of Event Sourcing, Domain-Driven Design, and CQRS patterns, it enables developers to create maintainable, auditable, and performant systems.

The framework's emphasis on clean architecture, dependency injection integration, and developer experience makes it an excellent choice for both greenfield projects and incremental adoption in existing applications.

Whether you're building a simple CRUD application that could benefit from an audit trail, or a complex distributed system requiring eventual consistency and high scalability, SourceFlow.Net provides the tools and patterns you need to succeed.

Next Steps

  1. Start Small: Begin with the quick start guide and simple examples
  2. Learn the Patterns: Understand Event Sourcing, DDD, and CQRS concepts
  3. Build a Prototype: Create a small application using SourceFlow.Net
  4. Scale Up: Apply the patterns to larger, more complex scenarios
  5. Contribute: Join the community and help improve the framework

Start your journey with SourceFlow.Net today and build better software with events as your foundation!

Clone this wiki locally