Skip to content

Commit

Permalink
worker; clearer atomic decrement for dispatched routines
Browse files Browse the repository at this point in the history
add note to separate message pulling and processing contexts
  • Loading branch information
joelrebel committed May 4, 2023
1 parent 5649a62 commit 5880908
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Loop:
}

func (o *Worker) processEvents(ctx context.Context) {
// XXX: consider having a separate context for message retrieval
msgs, err := o.stream.PullMsg(ctx, 1)
if err != nil {
o.logger.WithFields(
Expand All @@ -148,7 +149,7 @@ func (o *Worker) processEvents(ctx context.Context) {
defer o.syncWG.Done()

atomic.AddInt32(&o.dispatched, 1)
defer atomic.AddInt32(&o.dispatched, ^int32(0))
defer atomic.AddInt32(&o.dispatched, -1)

o.processEvent(ctx, msg)
}(msg)
Expand Down

0 comments on commit 5880908

Please sign in to comment.