From 558ea334e3ea533e34df30625d8a5d649642bd0f Mon Sep 17 00:00:00 2001 From: jkoberg Date: Thu, 2 Feb 2023 16:41:45 +0100 Subject: [PATCH] add unique id to events Signed-off-by: jkoberg --- changelog/unreleased/add-eventid.md | 5 +++++ pkg/events/events.go | 22 ++++++++++++++++--- pkg/events/example/consumer/consumer.go | 2 +- .../utils/decomposedfs/decomposedfs.go | 4 ++-- 4 files changed, 27 insertions(+), 6 deletions(-) create mode 100644 changelog/unreleased/add-eventid.md diff --git a/changelog/unreleased/add-eventid.md b/changelog/unreleased/add-eventid.md new file mode 100644 index 00000000000..a3da5ff918e --- /dev/null +++ b/changelog/unreleased/add-eventid.md @@ -0,0 +1,5 @@ +Enhancement: Add an ID to each events + +This way it is possible to uniquely identify events across services + +https://github.com/cs3org/reva/pull/3637 diff --git a/pkg/events/events.go b/pkg/events/events.go index bab8dff0b68..4c887b15e03 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -22,6 +22,7 @@ import ( "log" "reflect" + "github.com/google/uuid" "go-micro.dev/v4/events" ) @@ -34,6 +35,9 @@ var ( // MetadatakeyEventType is the key used for the eventtype in the metadata map of the event MetadatakeyEventType = "eventtype" + + // MetadatakeyEventID is the key used for the eventID in the metadata map of the event + MetadatakeyEventID = "eventid" ) type ( @@ -57,12 +61,19 @@ type ( Publish(string, interface{}, ...events.PublishOption) error Consume(string, ...events.ConsumeOption) (<-chan events.Event, error) } + + // Event is the envelope for events + Event struct { + Type string + ID string + Event interface{} + } ) // Consume returns a channel that will get all events that match the given evs // group defines the service type: One group will get exactly one copy of a event that is emitted // NOTE: uses reflect on initialization -func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{}, error) { +func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan Event, error) { c, err := s.Consume(MainQueueName, events.WithGroup(group)) if err != nil { return nil, err @@ -74,7 +85,7 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{}, registeredEvents[typ.String()] = e } - outchan := make(chan interface{}) + outchan := make(chan Event) go func() { for { e := <-c @@ -90,7 +101,11 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{}, continue } - outchan <- event + outchan <- Event{ + Type: et, + ID: e.Metadata[MetadatakeyEventID], + Event: event, + } } }() return outchan, nil @@ -102,5 +117,6 @@ func Publish(s Publisher, ev interface{}) error { evName := reflect.TypeOf(ev).String() return s.Publish(MainQueueName, ev, events.WithMetadata(map[string]string{ MetadatakeyEventType: evName, + MetadatakeyEventID: uuid.New().String(), })) } diff --git a/pkg/events/example/consumer/consumer.go b/pkg/events/example/consumer/consumer.go index f6cf9f2f77b..829b684c8fd 100644 --- a/pkg/events/example/consumer/consumer.go +++ b/pkg/events/example/consumer/consumer.go @@ -56,7 +56,7 @@ func Example(c events.Consumer) { event := <-evChan // best to use type switch to differentiate events - switch v := event.(type) { + switch v := event.Event.(type) { case events.ShareCreated: fmt.Printf("%s) Share created: %+v\n", group, v) default: diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 215bfe200b9..9dd99e6028d 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -211,11 +211,11 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event } // Postprocessing starts the postprocessing result collector -func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { +func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { ctx := context.TODO() log := logger.New() for event := range ch { - switch ev := event.(type) { + switch ev := event.Event.(type) { case events.PostprocessingFinished: up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) if err != nil {