Skip to content

Commit

Permalink
add ConsumeAll function
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <jkoberg@owncloud.com>
  • Loading branch information
kobergj committed Feb 2, 2023
1 parent 558ea33 commit db01d99
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,27 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan Event, error
return outchan, nil
}

// ConsumeAll allows consuming all events. Note that unmarshalling must be done manually in this case, therefore Event.Event will always be of type []byte
func ConsumeAll(s Consumer, group string) (<-chan Event, error) {
c, err := s.Consume(MainQueueName, events.WithGroup(group))
if err != nil {
return nil, err
}

outchan := make(chan Event)
go func() {
for {
e := <-c
outchan <- Event{
Type: e.Metadata[MetadatakeyEventType],
ID: e.Metadata[MetadatakeyEventID],
Event: e.Payload,
}
}
}()
return outchan, nil
}

// Publish publishes the ev to the MainQueue from where it is distributed to all subscribers
// NOTE: needs to use reflect on runtime
func Publish(s Publisher, ev interface{}) error {
Expand Down

0 comments on commit db01d99

Please sign in to comment.