Skip to content

Commit

Permalink
Added mutation example
Browse files Browse the repository at this point in the history
  • Loading branch information
aneshas committed Oct 10, 2024
1 parent 6cf3ff0 commit b14e844
Show file tree
Hide file tree
Showing 14 changed files with 234 additions and 143 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea/
.DS_Store
exampledb
example.db
accounts.json
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@

# TODO
- [ ] store tests
- [ ] alternate On method and alternate apply
- [ ] correlation from context (helper methods in aggregate to set correlation and meta)
- [ ] projections only for simple local testing simplify and document
- [ ] add postgres tests with test-containers (with flag?)
- [ ] upgrade packages
- [ ] autogenerate ID in aggregate
- [ ] complete example with echo and mutation
- [x] autogenerate ID in aggregate
- [ ] WithDB option to pass db connection - eg. for encore?
- [ ] complete example with echo and mutation
- [ ] for projections - ignore missing json types - don't throw error (this implies projection is not interested)
- [ ] alternate On method with Event and alternate apply for id
- [x] correlation from context (helper methods in aggregate to set correlation and meta)
- [ ] projections only for simple local testing simplify and document
- [x] json encoding/decoding for events types - better way?

Embeddable EventStore implementation written in Go using gorm as an underlying persistence mechanism meaning it will work
with `almost` (tested sqlite and postgres) whatever underlying database gorm will support (just use the respective gorm driver).
Expand Down
3 changes: 2 additions & 1 deletion aggregate/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregate

import (
"fmt"
"github.com/google/uuid"
"reflect"
"time"
)
Expand Down Expand Up @@ -91,7 +92,7 @@ func (a *Root[T]) Apply(events ...any) {

for _, evt := range events {
e := Event{
ID: "", // TODO - Autogenerate
ID: uuid.Must(uuid.NewV7()).String(),
E: evt,
OccurredOn: time.Now().UTC(),
}
Expand Down
66 changes: 55 additions & 11 deletions aggregate/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,19 @@ package aggregate

import (
"context"
"errors"
"github.com/aneshas/eventstore"
)

// ErrAggregateNotFound is returned when aggregate is not found
var ErrAggregateNotFound = errors.New("aggregate not found")

type metaKey struct{}

type correlationIDKey struct{}

type causationIDKey struct{}

// NewStore constructs new event sourced aggregate store
func NewStore[T Rooter](eventStore EventStore) *Store[T] {
return &Store[T]{
Expand All @@ -25,18 +35,35 @@ type Store[T Rooter] struct {

// Save saves aggregate events to the event store
func (s *Store[T]) Save(ctx context.Context, aggregate T) error {
var events []eventstore.EventToStore
var (
events []eventstore.EventToStore
meta map[string]string
correlationID string
causationID string
)

if v, ok := ctx.Value(metaKey{}).(map[string]string); ok {
meta = v
}

if v, ok := ctx.Value(correlationIDKey{}).(string); ok {
correlationID = v
}

if v, ok := ctx.Value(causationIDKey{}).(string); ok {
causationID = v
}

for _, evt := range aggregate.Events() {
events = append(events, eventstore.EventToStore{
Event: evt.E,
ID: evt.ID,
OccurredOn: evt.OccurredOn,

// Optional - set through context
CausationEventID: "",
CorrelationEventID: "",
Meta: nil,
// Optional
CausationEventID: causationID,
CorrelationEventID: correlationID,
Meta: meta,
})
}

Expand All @@ -49,28 +76,45 @@ func (s *Store[T]) Save(ctx context.Context, aggregate T) error {
}

// FindByID finds aggregate events by its id and rehydrates the aggregate
func (s *Store[T]) FindByID(ctx context.Context, id string) (*T, error) {
func (s *Store[T]) FindByID(ctx context.Context, id string, root T) error {
storedEvents, err := s.eventStore.ReadStream(ctx, id)
if err != nil {
return nil, err
if errors.Is(err, eventstore.ErrStreamNotFound) {
return ErrAggregateNotFound
}

return err
}

var events []Event

for _, evt := range storedEvents {
events = append(events, Event{
ID: evt.ID,
E: evt,
E: evt.Event,
OccurredOn: evt.OccurredOn,
CausationEventID: evt.CausationEventID,
CorrelationEventID: evt.CorrelationEventID,
Meta: evt.Meta,
})
}

var acc T
root.Rehydrate(root, events...)

return nil
}

// CtxWithMeta returns new context with meta data
func CtxWithMeta(ctx context.Context, meta map[string]string) context.Context {
return context.WithValue(ctx, metaKey{}, meta)
}

acc.Rehydrate(&acc, events...)
// CtxWithCorrelationID returns new context with correlation ID
func CtxWithCorrelationID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, correlationIDKey{}, id)
}

return &acc, nil
// CtxWithCausationID returns new context with causation ID
func CtxWithCausationID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, causationIDKey{}, id)
}
51 changes: 0 additions & 51 deletions eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package eventstore_test
import (
"context"
"errors"
"flag"
"fmt"
"io"
"reflect"
Expand All @@ -13,17 +12,11 @@ import (
"github.com/aneshas/eventstore"
)

var integration = flag.Bool("integration", true, "perform integration tests")

type SomeEvent struct {
UserID string
}

func TestShouldReadAppendedEvents(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

es, cleanup := eventStore(t)

defer cleanup()
Expand Down Expand Up @@ -72,10 +65,6 @@ func TestShouldReadAppendedEvents(t *testing.T) {
}

func TestShouldWriteToDifferentStreams(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

es, cleanup := eventStore(t)

defer cleanup()
Expand Down Expand Up @@ -112,10 +101,6 @@ func TestShouldWriteToDifferentStreams(t *testing.T) {
}

func TestShouldAppendToExistingStream(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

es, cleanup := eventStore(t)

defer cleanup()
Expand Down Expand Up @@ -152,10 +137,6 @@ func TestShouldAppendToExistingStream(t *testing.T) {
}

func TestOptimisticConcurrencyCheckIsPerformed(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

es, cleanup := eventStore(t)

defer cleanup()
Expand Down Expand Up @@ -186,10 +167,6 @@ func TestOptimisticConcurrencyCheckIsPerformed(t *testing.T) {
}

func TestReadStreamWrapsNotFoundError(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

es, cleanup := eventStore(t)

defer cleanup()
Expand All @@ -201,10 +178,6 @@ func TestReadStreamWrapsNotFoundError(t *testing.T) {
}

func TestSubscribeAllWithOffsetCatchesUpToNewEvents(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

es, cleanup := eventStore(t)

defer cleanup()
Expand Down Expand Up @@ -300,10 +273,6 @@ outer:
}

func TestReadAllShouldReadAllEvents(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

es, cleanup := eventStore(t)

defer cleanup()
Expand Down Expand Up @@ -344,10 +313,6 @@ func TestReadAllShouldReadAllEvents(t *testing.T) {
}

func TestSubscribeAllCancelsSubscriptionOnContextCancel(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

es, cleanup := eventStore(t)

defer cleanup()
Expand Down Expand Up @@ -375,10 +340,6 @@ func TestSubscribeAllCancelsSubscriptionOnContextCancel(t *testing.T) {
}

func TestSubscribeAllCancelsSubscriptionWithClose(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

es, cleanup := eventStore(t)

defer cleanup()
Expand Down Expand Up @@ -425,10 +386,6 @@ func (e enc) Decode(evt *eventstore.EncodedEvt) (interface{}, error) {
}

func TestEncoderEncodeErrorsPropagated(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

var anErr = fmt.Errorf("an error occurred")

e := enc{
Expand Down Expand Up @@ -458,10 +415,6 @@ func TestEncoderEncodeErrorsPropagated(t *testing.T) {
}

func TestEncoderDecodeErrorsPropagated(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

var anErr = fmt.Errorf("an error occurred")

e := enc{
Expand Down Expand Up @@ -504,10 +457,6 @@ func TestEncoderDecodeErrorsPropagated(t *testing.T) {
}

func TestEncoderDecodeErrorsPropagatedOnSubscribeAll(t *testing.T) {
if !*integration {
t.Skip("skipping integration tests")
}

var anErr = fmt.Errorf("an error occurred")

e := enc{
Expand Down
19 changes: 7 additions & 12 deletions example/account/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,17 @@ func (id ID) String() string { return string(id) }
type Account struct {
aggregate.Root[ID]

holder string
balance int
// notice how aggregate has no state until it is needed to make a decision

Balance int
}

// Deposit money
func (a *Account) Deposit(amount int) {
a.Apply(
// TODO - From aggregate we should always set the event ID
// and occurred on ?
// or
//
// Apply always sets ID and occured on internally
// Provide alternate Apply with IDs, OccuredOn - make this one accept single event (bcs of id)
// and others (correlation, meta, pass through context)
// for example: check if amount is positive or account is not closed

// if all good, do the mutation by applying the event
a.Apply(
DepositMade{
Amount: amount,
},
Expand All @@ -54,10 +50,9 @@ func (a *Account) Deposit(amount int) {
// OnNewAccountOpened handler
func (a *Account) OnNewAccountOpened(evt NewAccountOpened) {
a.SetID(ID(evt.AccountID))
a.holder = evt.Holder
}

// OnDepositMade handler
func (a *Account) OnDepositMade(evt DepositMade) {
a.balance += evt.Amount
a.Balance += evt.Amount
}
6 changes: 6 additions & 0 deletions example/account/events.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package account

// Events is a list of all domain event instances
var Events = []any{
NewAccountOpened{},
DepositMade{},
}

// NewAccountOpened domain event indicates that new
// account has been opened
type NewAccountOpened struct {
Expand Down
Loading

0 comments on commit b14e844

Please sign in to comment.