Skip to content

Commit

Permalink
Added propper correlation ids handling
Browse files Browse the repository at this point in the history
  • Loading branch information
aneshas committed Nov 18, 2024
1 parent 0acf8ab commit 1892f63
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 14 deletions.
20 changes: 20 additions & 0 deletions aggregate/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Rooter interface {
Events() []Event
Version() int
Rehydrate(acc any, events ...Event)
FirstEventID() string
LastEventID() string
}

// Root represents reusable DDD Event Sourcing friendly Aggregate
Expand All @@ -36,9 +38,22 @@ type Root[T fmt.Stringer] struct {
version int
domainEvents []Event

firstEventID string
lastEventID string

ptr reflect.Value
}

// LastEventID returns last event ID
func (a *Root[T]) LastEventID() string {
return a.lastEventID
}

// FirstEventID returns first event ID
func (a *Root[T]) FirstEventID() string {
return a.firstEventID
}

// StringID returns aggregate ID string
func (a *Root[T]) StringID() string {
return a.ID.String()
Expand All @@ -54,6 +69,7 @@ func (a *Root[T]) Rehydrate(aggregatePtr any, events ...Event) {

for _, evt := range events {
a.mutate(evt)
a.lastEventID = evt.ID

a.version++
}
Expand Down Expand Up @@ -127,6 +143,10 @@ func (a *Root[T]) mutate(evt Event) {
panic(ErrMissingAggregateEventHandler)
}

if a.firstEventID == "" {
a.firstEventID = evt.ID
}

if h.Type().NumIn() == 2 {
h.Call([]reflect.Value{
reflect.ValueOf(evt.E),
Expand Down
22 changes: 17 additions & 5 deletions aggregate/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,29 @@ func (s *Store[T]) Save(ctx context.Context, aggregate T) error {
causationID = v
}

if correlationID == "" {
correlationID = aggregate.FirstEventID()
}

if causationID == "" {
causationID = aggregate.LastEventID()
}

for _, evt := range aggregate.Events() {
events = append(events, eventstore.EventToStore{
Event: evt.E,
ID: evt.ID,
OccurredOn: evt.OccurredOn,
if causationID == "" {
causationID = evt.ID
}

// Optional
events = append(events, eventstore.EventToStore{
Event: evt.E,
ID: evt.ID,
OccurredOn: evt.OccurredOn,
CausationEventID: causationID,
CorrelationEventID: correlationID,
Meta: meta,
})

causationID = evt.ID
}

return s.eventStore.AppendStream(
Expand Down
55 changes: 49 additions & 6 deletions aggregate/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ func TestShould_Save_Aggregate_Events(t *testing.T) {
}

ctx := aggregate.CtxWithMeta(context.Background(), meta)
ctx = aggregate.CtxWithCausationID(ctx, "some-causation-event-id")
ctx = aggregate.CtxWithCorrelationID(ctx, "some-correlation-event-id")

var f foo

Expand All @@ -98,8 +96,6 @@ func TestShould_Save_Aggregate_Events(t *testing.T) {

assert.NoError(t, err)

assert.Equal(t, "some-causation-event-id", es.eventsToStore[0].CausationEventID)
assert.Equal(t, "some-correlation-event-id", es.eventsToStore[0].CorrelationEventID)
assert.Equal(t, meta, es.eventsToStore[0].Meta)

assert.Equal(t, ctx, es.ctx)
Expand All @@ -108,6 +104,54 @@ func TestShould_Save_Aggregate_Events(t *testing.T) {

events := f.Events()

assert.Equal(t, []eventstore.EventToStore{
{
Event: fooEvent{
Foo: "foo-1",
},
ID: events[0].ID,
CausationEventID: events[0].ID,
CorrelationEventID: events[0].ID,
Meta: meta,
OccurredOn: time.Time{},
},
{
Event: fooEvent{
Foo: "foo-2",
},
ID: events[1].ID,
CausationEventID: events[0].ID,
CorrelationEventID: events[0].ID,
Meta: meta,
OccurredOn: time.Time{},
},
}, es.eventsToStore)
}

func TestShould_Save_Aggregate_Events_With_Explicit_CorrelationIDs(t *testing.T) {
var es eventStore

store := aggregate.NewStore[*foo](&es)

meta := map[string]string{
"foo": "bar",
}

ctx := aggregate.CtxWithMeta(context.Background(), meta)
ctx = aggregate.CtxWithCausationID(ctx, "some-causation-event-id")
ctx = aggregate.CtxWithCorrelationID(ctx, "some-correlation-event-id")

var f foo

f.Rehydrate(&f)
f.doStuff()

err := store.Save(ctx, &f)

assert.NoError(t, err)

events := f.Events()

assert.Equal(t, []eventstore.EventToStore{
{
Event: fooEvent{
Expand All @@ -124,13 +168,12 @@ func TestShould_Save_Aggregate_Events(t *testing.T) {
Foo: "foo-2",
},
ID: events[1].ID,
CausationEventID: "some-causation-event-id",
CausationEventID: events[0].ID,
CorrelationEventID: "some-correlation-event-id",
Meta: meta,
OccurredOn: time.Time{},
},
}, es.eventsToStore)

}

func TestShould_Return_AggregateNotFound_Error_If_No_Events(t *testing.T) {
Expand Down
11 changes: 8 additions & 3 deletions ambar/ambar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/aneshas/eventstore"
"github.com/relvacode/iso8601"
"net/http"
"time"
)

var (
Expand Down Expand Up @@ -111,9 +112,13 @@ func (a *Ambar) Project(r *http.Request, projection Projection, data []byte) err
return err
}

occurredOn, err := iso8601.ParseString(event.Payload.OccurredOn)
if err != nil {
return err
var occurredOn time.Time

if event.Payload.OccurredOn != "" {
occurredOn, err = iso8601.ParseString(event.Payload.OccurredOn)
if err != nil {
return err
}
}

var meta map[string]string
Expand Down

0 comments on commit 1892f63

Please sign in to comment.