-
Notifications
You must be signed in to change notification settings - Fork 0
/
errgroup.go
132 lines (120 loc) · 2.96 KB
/
errgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package errgroup
import (
"context"
"fmt"
"runtime"
"sync"
"time"
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
// A zero Group is valid and does not cancel on error.
type Group struct {
err error
wg sync.WaitGroup
errOnce sync.Once
workerOnce sync.Once
ch chan func(ctx context.Context) error
chs []func(ctx context.Context) error
workerNum int
ctx context.Context
cancel func()
}
// WithContext create a Group.
// given function from Go will receive this context,
func WithContext(ctx context.Context) *Group {
return &Group{ctx: ctx}
}
// WithCancel create a new Group and an associated Context derived from ctx.
// given function from Go will receive context derived from this ctx,
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithCancel(ctx context.Context) *Group {
ctx, cancel := context.WithCancel(ctx)
return &Group{ctx: ctx, cancel: cancel}
}
// WithTimeout create a new Group and an associated Context derived from ctx.
// given function from Go will receive context derived from this ctx,
// The derived Context is canceled when the timeout expires
func WithTimeout(ctx context.Context, dur time.Duration) *Group {
ctx, cancel := context.WithTimeout(ctx, dur)
return &Group{ctx: ctx, cancel: cancel}
}
func (g *Group) do(f func(ctx context.Context) error) {
ctx := g.ctx
if ctx == nil {
ctx = context.Background()
}
var err error
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 64<<10)
buf = buf[:runtime.Stack(buf, false)]
err = fmt.Errorf("errgroup: panic recovered: %s\n%s", r, buf)
}
if err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
g.wg.Done()
}()
err = f(ctx)
}
// GOMAXPROCS set max goroutine to work.
func (g *Group) GOMAXPROCS(n int) {
if n <= 0 {
panic("errgroup: GOMAXPROCS must great than 0")
}
g.workerOnce.Do(func() {
g.ch = make(chan func(context.Context) error, n)
for i := 0; i < n; i++ {
go func() {
for f := range g.ch {
g.do(f)
}
}()
}
})
}
// Go calls the given function in a new goroutine.
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func(ctx context.Context) error) {
g.wg.Add(1)
g.workerNum++
if g.ch != nil {
select {
case g.ch <- f:
default:
g.chs = append(g.chs, f)
}
return
}
go g.do(f)
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
if g.ch != nil {
for _, f := range g.chs {
g.ch <- f
}
}
g.wg.Wait()
if g.ch != nil {
close(g.ch) // let all receiver exit
}
if g.cancel != nil {
g.cancel()
}
g.workerNum = 0
return g.err
}
func (g *Group) WorkNum() int {
return g.workerNum
}