-
Notifications
You must be signed in to change notification settings - Fork 17.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
x/sync/errgroup: add TryGo and SetLimit to control concurrency #27837
Comments
/cc @bcmills who recently was thinking about some changes to this package IIRC |
In the meantime I'd suggest using a buffered channel before calling group.Go() and releasing it when the function returns, or using a package like github.com/kevinburke/semaphore to acquire resources before starting a goroutine. |
There is a draft API In slide 119 (in the backup slides) of my GopherCon 2018 talk, Rethinking Classical Concurrency Patterns. I agree that the I propose a new I would rather have a |
I also needed something similar and combined it with func main() {
const maxWorkers = 5
sem := semaphore.NewWeighted(maxWorkers)
g, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 50; i++ {
i := i
fmt.Printf("executing %d\n", i)
g.Go(func() error {
err := sem.Acquire(ctx, 1)
if err != nil {
return err
}
defer sem.Release(1)
// do work
time.Sleep(1 * time.Second)
fmt.Printf("finished %+v\n", i)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Printf("g.Wait() err = %+v\n", err)
}
fmt.Println("done!")
} If anything in this approach wrong please let me know. Seems like it works fine based on the debug statements. |
@fatih I would personally put the Look at it another way, if instead of 50, you had 1m, what your code does is launch 1m goroutines. Of them, All the best! |
@alexaandru thanks for the tip! You're right about that. I've fixed that actually on my end (https://twitter.com/fatih/status/1152991683870633985 and https://play.golang.org/p/h2yfBVC8IjB) but I forgot to update it here. |
You're most welcome @fatih ! Cheers! :) |
Another subtle issue that ideally would be solved by having an errgroup with a limit is that it is very easy to write code using For example, it might be non-obvious that the const (
maxWorkers = 10
numTasks = 1e6
)
func work() error {
group, ctx := errgroup.WithContext(context.Background())
sem := semaphore.NewWeighted(maxWorkers)
for i := 0; i < numTasks; i++ {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
group.Go(func() error {
defer sem.Release(1)
time.Sleep(1 * time.Second)
if rand.Float64() > 0.5 {
return errors.New("important message here")
}
return nil
})
}
return group.Wait()
} The code can be fixed with something like this, but it is easy to forget diff --git a/main.go b/main.go
index 7690b92..9f64dbc 100644
--- a/main.go
+++ b/main.go
@@ -21,6 +21,10 @@ func work() error {
sem := semaphore.NewWeighted(maxWorkers)
for i := 0; i < numTasks; i++ {
+ if ctx.Err() != nil {
+ break
+ }
+
if err := sem.Acquire(ctx, 1); err != nil {
return err
} |
@tschaub, note that in general anything that may produce an error as a result of So that example would probably be clearer as: const (
maxWorkers = 10
numTasks = 1e6
)
func work() error {
group, ctx := errgroup.WithContext(context.Background())
+
+ group.Go(func() error {
sem := semaphore.NewWeighted(maxWorkers)
for i := 0; i < numTasks; i++ {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
group.Go(func() error {
defer sem.Release(1)
time.Sleep(1 * time.Second)
if rand.Float64() > 0.5 {
return errors.New("important message here")
}
return nil
})
}
+ })
return group.Wait()
} |
We came across this use-case today, and used a semaphore channel instead of x/sync/semaphore. But since context is heavily threaded through, we'll probably switch to using x/sync/semaphore. Regarding the proposed API, |
Hello, I just created a package for that : https://pkg.go.dev/github.com/cboudereau/errgroupsem in order to use it as quick as possible on our side but feel free to discuss in order to merge both versions :). The one with WaitGroup.Go does not offer the same error management in a fail fast way like the Wait() function actually does in errgroup. |
Some related discussion here, including several people chiming in to comment that they felt the need to implement something similar: |
@bcmills do you think there is new API that should be added to errgroup along these lines? If so, what is it? |
This proposal has been added to the active column of the proposals project |
Taking the API I drafted for my GopherCon 2018 talk and adding documentation, I suggest: package errgroup
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (*Group) SetLimit(n int)
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group; its error will be returned by Wait.
func (*Group) Go(f func() error)
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (*Group) TryGo(f func() error) bool |
With the above API, I have one open question: if the group is already cancelled, should |
Should there be a variant of |
I would rather retain To make that work with the zero-value |
Any immediate examples to show how it could be more useful in this case? |
That would require leaking a goroutine for the duration of the test, which is less ergonomic and much more error-prone. Compare: g := new(errgroup.Group)
g.SetLimit(0)
use(g) // All calls to TryGo fail. vs. g := new(errgroup.Group)
g.SetLimit(1)
unblock := make(chan struct{})
g.Go(func() error {
<-unblock
return nil
})
t.Cleanup(func() {
close(unblock)
g.Wait()
})
use(g) // All calls to TryGo fail. |
Hm. It's true that in this type of comparison, the first case is simpler. However, we are particularly comparing this single use case of SetLimit(0). If we have the behavior of SetLimit(0) as no limit, this type of comparison does not entirely exist. In general, would it be more interesting for us to test, that if |
So I would really rather not define (Contrast with, say, |
A barrier is a bit tricky here. Could calling
It's actually not about implementation convenience but also trying to align the consistency with
Should we relax this requirement concerning the above potential misuse, as you briefly described? |
Change https://go.dev/cl/405174 mentions this issue: |
I also pushed CL 405174 as an alternative implementation, which implements the behavior of the original proposal, i.e.:
But the behavior on calling Go with SetLimit(0) is not entirely desired subjectively: https://go-review.googlesource.com/c/sync/+/405174/comments/835bf24e_0b7f9958 |
SetLimit(0) should set the limit to 0, to stop all future calls. That can be a useful thing to do. |
@rsc Could you also clarify what happens to a subsequent SetLimit then? Specifically
|
No change in consensus, so accepted. 🎉 |
@changkun, I think for now the second call to (The call to |
Note that we already said above "// The limit must not be modified while any goroutines in the group are active." We don't necessarily need to go out of our way to detect and report the race, although of course it's fine to do that if it is easy. |
@bcmills @ianlancetaylor Yeah a direct panic might be a better one than reporting race using Nevertheless, I just subjectively feel uncomfortable when I could call a function, and it may block forever silently (and no other way to rescue it) |
This benchmark shows the difference between two implementations. Using explicit waiter with mutex (old, before PS3) or channel (new, since PS4). There is no significant difference at a measure: name old time/op new time/op delta Go-8 247ns ±10% 245ns ±10% ~ (p=0.571 n=5+10) name old alloc/op new alloc/op delta Go-8 48.0B ± 0% 40.0B ± 0% -16.67% (p=0.000 n=5+10) name old allocs/op new allocs/op delta Go-8 2.00 ± 0% 2.00 ± 0% ~ (all equal) Fixes golang/go#27837 Change-Id: I60247f1a2a1cdce2b180f10b409e37de8b82341e Reviewed-on: https://go-review.googlesource.com/c/sync/+/405174 Reviewed-by: Bryan Mills <bcmills@google.com> Reviewed-by: Heschi Kreinick <heschi@google.com> TryBot-Result: Gopher Robot <gobot@golang.org> Run-TryBot: Changkun Ou <mail@changkun.de> Auto-Submit: Bryan Mills <bcmills@google.com>
The errgroup package will currently spawn a new goroutine for each invocation of Group.Go. This is usually fine, but extremely high cardinality fanout can exhaust memory or other resources. It would be neat if the errgroup interface allowed users to specify the maximum number of concurrent goroutines they want the errgroup to spawn.
Proposal
N would be copied to an unexported on the first invocation of Go, so that subsequent modification has no effect. This preserves the validity and the behavior of the empty Group.
When calling Go, if the number of functions running is > N then Go would block until the number was <= N.
The behavior of Go is not otherwise modified; if a subtask returns an error, then subsequent tasks will still be executed, and callers would rely on subtasks handling context cancellation to fall through to the Wait() call and then return, if WithContext was called.
Alternatives considered
An alternative interface would be that Go never block, but enqueue instead. This is an unbounded queue and I'm not a fan.
Another alternative is that the group is context-aware, and that Go return immediately if the group's context is cancelled. This requires that Group retain a reference to the context, which it does not currently do.
The text was updated successfully, but these errors were encountered: