Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow the UserID to be injected into events #683

Merged
merged 17 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion libs/common/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func authFunc(ctx context.Context) (context.Context, error) {
}

// attach userID to context, so we can get it in a handler using GetUserID()
ctx = context.WithValue(ctx, userIDKey{}, userID)
ctx = ContextWithUserID(ctx, userID)

// attach userID to current span (should be the auth interceptor span)
telemetry.SetSpanStr(ctx, "user.id", userID.String())
Expand All @@ -200,6 +200,10 @@ func authFunc(ctx context.Context) (context.Context, error) {
return ctx, nil
}

func ContextWithUserID(ctx context.Context, userID uuid.UUID) context.Context {
return context.WithValue(ctx, userIDKey{}, userID)
}

// handleOrganizationIDForAuthFunc is a part of our auth middleware.
// The claims are signed. Therefore, we can match the user provided
// organizationID from the headers against the organizationIDs inside the claim.
Expand Down
3 changes: 1 addition & 2 deletions libs/hwes/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ func (a *AggregateBase) HandleEvent(event Event) error {
log.Debug().
Str("aggregateID", event.GetAggregateID().String()).
Str("aggregateType", string(event.GetAggregateType())).
Str("eventType", event.EventType).
Uint64("eventVersion", event.GetVersion()).
Dict("event", event.GetZerologDict()).
Msg("handle event")

eventHandler, found := a.eventHandlers[event.EventType]
Expand Down
111 changes: 98 additions & 13 deletions libs/hwes/event.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package hwes

import (
"common"
"context"
"encoding/json"
"fmt"
"github.com/EventStore/EventStore-Client-Go/esdb"
"github.com/google/uuid"
"github.com/rs/zerolog"
"strings"
"telemetry"
"time"
)

Expand All @@ -18,6 +21,12 @@ type Event struct {
Data []byte
Timestamp time.Time
Version uint64
UserID *uuid.UUID
MaxSchaefer marked this conversation as resolved.
Show resolved Hide resolved
}

type metadata struct {
// CommitterId represents some sort of optional identity that is directly responsible for this event
MaxSchaefer marked this conversation as resolved.
Show resolved Hide resolved
CommitterID string `json:"committer_id"`
}

func NewEvent(aggregate Aggregate, eventType string) Event {
Expand All @@ -27,9 +36,19 @@ func NewEvent(aggregate Aggregate, eventType string) Event {
AggregateID: aggregate.GetID(),
AggregateType: aggregate.GetType(),
Timestamp: time.Now().UTC(),
UserID: nil,
}
}

// NewEventWithUser will call hwes.NewEvent() and injects the UserID afterward into the event
func NewEventWithUser(ctx context.Context, aggregate Aggregate, eventType string) (Event, error) {
event, err := InjectUserIDInEventFromContext(ctx, NewEvent(aggregate, eventType))
if err != nil {
return Event{}, err
}
return event, nil
}

// NewEventWithData will call hwes.NewEvent() with the passed aggregate and eventType
// to marshall the data to json via hwes.Event.SetJsonData().
func NewEventWithData(aggregate Aggregate, eventType string, data interface{}) (Event, error) {
Expand All @@ -40,10 +59,47 @@ func NewEventWithData(aggregate Aggregate, eventType string, data interface{}) (
return event, nil
}

// NewEventWithUserAndData will call hwes.NewEventWithData() and injects the UserID afterward into the event
func NewEventWithUserAndData(ctx context.Context, aggregate Aggregate, eventType string, data interface{}) (Event, error) {
event, err := NewEventWithData(aggregate, eventType, data)
if err != nil {
return Event{}, err
}

event, err = InjectUserIDInEventFromContext(ctx, event)
if err != nil {
return Event{}, err
}

return event, err
}

// InjectUserIDInEventFromContext injects the UserID from the passed context via common.GetUserID().
// If no UserID was injected, prior to this function call, an error will be returned.
// Make sure to inject the UserID via a Middleware in the API layer.
func InjectUserIDInEventFromContext(ctx context.Context, event Event) (Event, error) {
MaxSchaefer marked this conversation as resolved.
Show resolved Hide resolved
ctx, span, _ := telemetry.StartSpan(ctx, "hwes.EventWithUserID")
defer span.End()

userID, err := common.GetUserID(ctx)
if err != nil {
return Event{}, err
}
event.UserID = &userID

// Just to make sure we are actually dealing with a valid UUID
if _, err := uuid.Parse(event.UserID.String()); err != nil {
return Event{}, err
}

telemetry.SetSpanStr(ctx, "userID", event.UserID.String())
return event, nil
}

// resolveAggregateIDAndTypeFromStreamID extracts the aggregateType and aggregateID of a given streamID
// See aggregate.GetTypeID
//
// # Example
// Example:
//
// StreamID: task-d9027be3-d00f-4eec-b50e-5f489df20433
// AggregateType: task
Expand Down Expand Up @@ -75,26 +131,38 @@ func resolveAggregateIDAndTypeFromStreamID(streamID string) (aggregateType Aggre
// NewEventFromRecordedEvent is a helper function for EventStore.
// This function transforms esdb.RecordedEvent to hwes.Event.
// We expect that the StreamID of the aggregate is in the format of "[aggregateType]-[aggregateID]".
func NewEventFromRecordedEvent(event *esdb.RecordedEvent) (Event, error) {
id, err := uuid.Parse(event.EventID.String())
func NewEventFromRecordedEvent(esdbEvent *esdb.RecordedEvent) (Event, error) {
id, err := uuid.Parse(esdbEvent.EventID.String())
if err != nil {
return Event{}, err
}

aggregateType, aggregateID, err := resolveAggregateIDAndTypeFromStreamID(event.StreamID)
aggregateType, aggregateID, err := resolveAggregateIDAndTypeFromStreamID(esdbEvent.StreamID)
if err != nil {
return Event{}, err
}

return Event{
md := metadata{}
if err := json.Unmarshal(esdbEvent.UserMetadata, &md); err != nil {
return Event{}, err
}

event := Event{
EventID: id,
EventType: event.EventType,
EventType: esdbEvent.EventType,
AggregateID: aggregateID,
AggregateType: aggregateType,
Data: event.Data,
Timestamp: event.CreatedDate,
Version: event.EventNumber,
}, nil
Data: esdbEvent.Data,
Timestamp: esdbEvent.CreatedDate,
Version: esdbEvent.EventNumber,
}
MaxSchaefer marked this conversation as resolved.
Show resolved Hide resolved

eventCommitterID, err := uuid.Parse(md.CommitterID)
if err == nil {
event.UserID = &eventCommitterID
}

return event, nil
}

func (e *Event) GetAggregateID() uuid.UUID {
Expand All @@ -119,12 +187,23 @@ func (e *Event) GetVersion() uint64 {
return e.Version
}

func (e *Event) ToEventData() esdb.EventData {
func (e *Event) ToEventData() (esdb.EventData, error) {
md := metadata{}
if e.UserID != nil {
md.CommitterID = e.UserID.String()
}

mdBytes, err := json.Marshal(md)
if err != nil {
return esdb.EventData{}, err
}

return esdb.EventData{
EventType: e.EventType,
ContentType: esdb.JsonContentType,
Data: e.Data,
}
Metadata: mdBytes,
}, nil
}

func (e *Event) SetData(data []byte) *Event {
Expand All @@ -151,8 +230,14 @@ func (e *Event) GetJsonData(data interface{}) error {
//
// zerolog.Ctx(ctx).Debug().Dict("event", event.GetZerologDict()).Msg("process event")
func (e *Event) GetZerologDict() *zerolog.Event {
return zerolog.Dict().
dict := zerolog.Dict().
Str("eventId", e.EventID.String()).
Str("eventType", e.EventType).
Uint64("eventVersion", e.Version)

if e.UserID != nil {
dict.Str("userID", e.UserID.String())
}

return dict
}
26 changes: 26 additions & 0 deletions libs/hwes/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package hwes_test

import (
"common"
"context"
"github.com/google/uuid"
"hwes"
"testing"
)

func TestEventWithUserID(t *testing.T) {
ctx := context.Background()
u := uuid.New()
e := hwes.Event{}

ctx = common.ContextWithUserID(ctx, u)

e, err := hwes.InjectUserIDInEventFromContext(ctx, e)
if err != nil {
t.Error(err)
}

if *e.UserID != u {
t.Error("event does not have the correct UserID")
}
}
5 changes: 4 additions & 1 deletion libs/hwes/eventstoredb/aggregate_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@ func (a *AggregateStore) doSave(ctx context.Context, aggregate hwes.Aggregate, g
return nil
}

eventsData := hwutil.Map(aggregate.GetUncommittedEvents(), func(event hwes.Event) esdb.EventData {
eventsData, err := hwutil.MapWithErr(aggregate.GetUncommittedEvents(), func(event hwes.Event) (esdb.EventData, error) {
return event.ToEventData()
})
if err != nil {
return err
}

// If AppliedEvents are empty, we imply that this entity was not loaded from an event store and therefore non-existing.
if len(aggregate.GetAppliedEvents()) == 0 {
Expand Down
33 changes: 30 additions & 3 deletions libs/hwes/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ module hwes
go 1.22

replace (
common => ../common
hwlocale => ../hwlocale
hwutil => ../hwutil
telemetry => ../telemetry
)

require (
common v0.0.0
github.com/EventStore/EventStore-Client-Go v1.0.2
github.com/google/uuid v1.6.0
github.com/rs/zerolog v1.32.0
Expand All @@ -16,6 +19,11 @@ require (
)

require (
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/dapr/dapr v1.12.0-rc.4 // indirect
github.com/dapr/go-sdk v1.9.1 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand All @@ -25,18 +33,37 @@ require (
github.com/go-playground/validator/v10 v10.16.0 // indirect
github.com/gofrs/uuid v3.3.0+incompatible // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/nicksnyder/go-i18n/v2 v2.4.0 // indirect
github.com/openzipkin/zipkin-go v0.4.2 // indirect
github.com/pquerna/cachecontrol v0.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
go.opentelemetry.io/otel v1.23.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 // indirect
go.opentelemetry.io/otel/exporters/zipkin v1.23.1 // indirect
go.opentelemetry.io/otel/metric v1.23.1 // indirect
go.opentelemetry.io/otel/sdk v1.23.1 // indirect
go.opentelemetry.io/otel/trace v1.23.1 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20200815001618-f69a88009b70 // indirect
google.golang.org/grpc v1.35.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240116215550-a9fa1716bcac // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
hwlocale v0.0.0 // indirect
)
Loading
Loading