Skip to content

Commit

Permalink
split draining methods
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Sep 27, 2023
1 parent 4a3cd72 commit 3209de1
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,24 +693,28 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
}

var wg sync.WaitGroup
// drain the channel and set all items to done we have added.
// Otherwise the caller will wait forever on the waitgroup
drainVisitInput := func(shutdown bool) {
if shutdown {
for range pp.visitInput {
wg.Done()
}
} else {
for {
select {
case _, ok := <-pp.visitInput:
if !ok {
return
}
wg.Done()
default:

// drains the channel and drops out when closed.
// This is done when the processor shuts down during visit
// and makes sure the waitgroup is fully counted down.
drainUntilClose := func() {
for range pp.visitInput {
wg.Done()
}
}

// drains the input channel until there are no more items.
// does not wait for close, because the channel stays open for the next visit
drainUntilEmpty := func() {
for {
select {
case _, ok := <-pp.visitInput:
if !ok {
return
}
wg.Done()
default:
return
}
}
}
Expand All @@ -726,11 +730,11 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
wg.Add(1)
select {
case <-stopping:
drainVisitInput(true)
drainUntilClose()
wg.Done()
return ErrVisitAborted
case <-ctx.Done():
drainVisitInput(false)
drainUntilEmpty()
wg.Done()
return ctx.Err()
// enqueue the visit
Expand All @@ -756,10 +760,10 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
}()
select {
case <-stopping:
drainVisitInput(true)
drainUntilClose()
return ErrVisitAborted
case <-ctx.Done():
drainVisitInput(false)
drainUntilEmpty()
return ctx.Err()
case <-wgDone:
}
Expand Down

0 comments on commit 3209de1

Please sign in to comment.