Skip to content

Commit

Permalink
add unique id to events
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <jkoberg@owncloud.com>
  • Loading branch information
kobergj committed Feb 2, 2023
1 parent 76d9ff6 commit 558ea33
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 6 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/add-eventid.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Add an ID to each events

This way it is possible to uniquely identify events across services

https://github.com/cs3org/reva/pull/3637
22 changes: 19 additions & 3 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"log"
"reflect"

"github.com/google/uuid"
"go-micro.dev/v4/events"
)

Expand All @@ -34,6 +35,9 @@ var (

// MetadatakeyEventType is the key used for the eventtype in the metadata map of the event
MetadatakeyEventType = "eventtype"

// MetadatakeyEventID is the key used for the eventID in the metadata map of the event
MetadatakeyEventID = "eventid"
)

type (
Expand All @@ -57,12 +61,19 @@ type (
Publish(string, interface{}, ...events.PublishOption) error
Consume(string, ...events.ConsumeOption) (<-chan events.Event, error)
}

// Event is the envelope for events
Event struct {
Type string
ID string
Event interface{}
}
)

// Consume returns a channel that will get all events that match the given evs
// group defines the service type: One group will get exactly one copy of a event that is emitted
// NOTE: uses reflect on initialization
func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{}, error) {
func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan Event, error) {
c, err := s.Consume(MainQueueName, events.WithGroup(group))
if err != nil {
return nil, err
Expand All @@ -74,7 +85,7 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{},
registeredEvents[typ.String()] = e
}

outchan := make(chan interface{})
outchan := make(chan Event)
go func() {
for {
e := <-c
Expand All @@ -90,7 +101,11 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{},
continue
}

outchan <- event
outchan <- Event{
Type: et,
ID: e.Metadata[MetadatakeyEventID],
Event: event,
}
}
}()
return outchan, nil
Expand All @@ -102,5 +117,6 @@ func Publish(s Publisher, ev interface{}) error {
evName := reflect.TypeOf(ev).String()
return s.Publish(MainQueueName, ev, events.WithMetadata(map[string]string{
MetadatakeyEventType: evName,
MetadatakeyEventID: uuid.New().String(),
}))
}
2 changes: 1 addition & 1 deletion pkg/events/example/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func Example(c events.Consumer) {
event := <-evChan

// best to use type switch to differentiate events
switch v := event.(type) {
switch v := event.Event.(type) {
case events.ShareCreated:
fmt.Printf("%s) Share created: %+v\n", group, v)
default:
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,11 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event
}

// Postprocessing starts the postprocessing result collector
func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
ctx := context.TODO()
log := logger.New()
for event := range ch {
switch ev := event.(type) {
switch ev := event.Event.(type) {
case events.PostprocessingFinished:
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
if err != nil {
Expand Down

0 comments on commit 558ea33

Please sign in to comment.