Skip to content

Commit

Permalink
taskgroup: add an explicit Throttle type (#6)
Browse files Browse the repository at this point in the history
This generalizes the functionality of the Group.Limit method, which has been
rewritten to use the new Throttle type internally. A caller wishing to share
capacity among multiple Groups can explicitly construct a throttle and use its
Limit method to produce start functions for each Group separately.
  • Loading branch information
creachadair authored Oct 5, 2024
1 parent 2be6922 commit 3dbd8f6
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 19 deletions.
25 changes: 9 additions & 16 deletions taskgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,24 +131,17 @@ func Listen(f func(error)) ErrorFunc { return func(e error) error { f(e); return
// NoError adapts f to a Task that executes f and reports a nil error.
func NoError(f func()) Task { return func() error { f(); return nil } }

// Limit returns g and a function that starts each task passed to it in g,
// allowing no more than n tasks to be active concurrently. If n ≤ 0, the
// start function is equivalent to g.Go, which enforces no limit.
// Limit returns g and a "start" function that starts each task passed to it in
// g, allowing no more than n tasks to be active concurrently. If n ≤ 0, no
// limit is enforced.
//
// The limiting mechanism is optional, and the underlying group is not
// restricted. A call to the start function will block until a slot is
// available, but calling g.Go directly will add a task unconditionally and
// will not take up a limiter slot.
func (g *Group) Limit(n int) (*Group, func(Task) *Group) {
if n <= 0 {
return g, g.Go
}
adm := make(chan struct{}, n)
return g, func(task Task) *Group {
adm <- struct{}{}
return g.Go(func() error {
defer func() { <-adm }()
return task()
})
}
}
//
// This is a shorthand for constructing a [Throttle] with capacity n and
// calling its Limit method. If n ≤ 0, the start function is equivalent to
// g.Go, which enforces no limit. To share a throttle among multiple groups,
// construct the throttle separately.
func (g *Group) Limit(n int) (*Group, func(Task)) { t := NewThrottle(n); return g, t.Limit(g) }
17 changes: 14 additions & 3 deletions taskgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,21 @@ func TestCapacity(t *testing.T) {

const maxCapacity = 25
const numTasks = 1492
g, start := taskgroup.New(nil).Limit(maxCapacity)

// Verify that multiple groups sharing a throttle respect the combined
// capacity limit.
throttle := taskgroup.NewThrottle(maxCapacity)
var g1, g2 taskgroup.Group
start1 := throttle.Limit(&g1)
start2 := throttle.Limit(&g2)

var p peakValue
var n int32
for i := 0; i < numTasks; i++ {
for i := range numTasks {
start := start1
if i%2 == 1 {
start = start2
}
start(func() error {
p.inc()
defer p.dec()
Expand All @@ -129,7 +139,8 @@ func TestCapacity(t *testing.T) {
return nil
})
}
g.Wait()
g1.Wait()
g2.Wait()
t.Logf("Total tasks completed: %d", n)
if p.max > maxCapacity {
t.Errorf("Exceeded maximum capacity: got %d, want %d", p.max, maxCapacity)
Expand Down
56 changes: 56 additions & 0 deletions throttle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package taskgroup

import "sync/atomic"

// A Throttle rate-limits the number of concurrent goroutines that can execute
// in parallel to some fixed number. A zero Throttle is ready for use, but
// imposes no limit on parallel execution. See [Throttle.Enter] for use.
type Throttle struct {
adm chan struct{}
}

// NewThrottle constructs a [Throttle] with a capacity of n goroutines.
// If n ≤ 0, the resulting Throttle imposes no limit.
func NewThrottle(n int) Throttle {
if n <= 0 {
return Throttle{}
}
return Throttle{adm: make(chan struct{}, n)}
}

// Enter blocks until a slot is available in t, then returns a [Leaver] that
// the caller must execute to return the slot when it is no longer in use.
func (t Throttle) Enter() Leaver {
if t.adm == nil {
return func() {}
}
t.adm <- struct{}{}
var done atomic.Bool
return func() {
if done.CompareAndSwap(false, true) {
<-t.adm
}
}
}

// A Leaver returns an in-use throttle slot to its underlying [Throttle].
// It is safe to call a Leaver multiple times; the slot will only be returned
// once.
type Leaver func()

// Leave returns the slot to its [Throttle]. This is a legibility alias for
// calling f.
func (f Leaver) Leave() { f() }

// Limit returns a function that starts each [Task] passed to it in g,
// respecting the rate limit imposed by t. Each call to Limit yields a fresh
// start function, and all the functions returned share the capacity of t.
func (t Throttle) Limit(g *Group) func(Task) {
return func(task Task) {
slot := t.Enter()
g.Go(func() error {
defer slot.Leave()
return task()
})
}
}

0 comments on commit 3dbd8f6

Please sign in to comment.