Skip to content

Commit

Permalink
flightcontrol: reduce contention between goroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
  • Loading branch information
tonistiigi committed Jun 22, 2021
1 parent 1c037fd commit 892f87d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
14 changes: 11 additions & 3 deletions util/flightcontrol/flightcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (g *Group) Do(ctx context.Context, key string, fn func(ctx context.Context)
return v, err
}
// backoff logic
if backoff >= 3*time.Second {
if backoff >= 15*time.Second {
err = errors.Wrapf(errRetryTimeout, "flightcontrol")
return v, err
}
Expand Down Expand Up @@ -132,8 +132,16 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) {
select {
case <-c.ready:
c.mu.Unlock()
<-c.cleaned
return nil, errRetry
if c.err != nil { // on error retry
<-c.cleaned
return nil, errRetry
}
pw, ok, _ := progress.FromContext(ctx)
if ok {
c.progressState.add(pw)
}
return c.result, nil

case <-c.ctx.done: // could return if no error
c.mu.Unlock()
<-c.cleaned
Expand Down
26 changes: 26 additions & 0 deletions util/flightcontrol/flightcontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flightcontrol

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -203,6 +204,31 @@ func TestCancelBoth(t *testing.T) {
assert.Equal(t, counter, int64(4))
}

func TestContention(t *testing.T) {
perthread := 1000
threads := 100

wg := sync.WaitGroup{}
wg.Add(threads)

g := &Group{}

for i := 0; i < threads; i++ {
go func(s int) {
for i := 0; i < s; i++ {
_, err := g.Do(context.TODO(), "foo", func(ctx context.Context) (interface{}, error) {
time.Sleep(time.Microsecond)
return nil, nil
})
require.NoError(t, err)
}
wg.Done()
}(perthread)
}

wg.Wait()
}

func testFunc(wait time.Duration, ret string, counter *int64) func(ctx context.Context) (interface{}, error) {
return func(ctx context.Context) (interface{}, error) {
atomic.AddInt64(counter, 1)
Expand Down

0 comments on commit 892f87d

Please sign in to comment.