-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathpool.go
117 lines (104 loc) · 2.63 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package bigopool
import (
"errors"
"sync"
)
var (
ErrNoWorkers = errors.New("Need at least one worker")
ErrZeroQueue = errors.New("Queue capacity can't be zero")
)
type (
// Job interface allows bigopool to process anything that implements Execute()
Job interface {
Execute() (Result, error)
}
// Result can be anything defined by the worker
Result interface{}
// Dispatcher is responsible for orchestrating jobs to workers and reporting results back
Dispatcher struct {
jobQueue chan Job
MaxWorkers int
wg *sync.WaitGroup
workerWg *sync.WaitGroup
// A pool of workers channels that are registered with the dispatcher
workerPool chan chan Job
quitCh chan bool
// Collect errors
errorCh chan error
resultCh chan Result
Errors errs
Results []Result
}
)
// NewDispatcher creates a new dispatcher
func NewDispatcher(maxWorkers int, queueSize int) (*Dispatcher, error) {
if maxWorkers < 1 {
return nil, ErrNoWorkers
}
if queueSize < 1 {
return nil, ErrZeroQueue
}
pool := make(chan chan Job, maxWorkers)
jobq := make(chan Job, queueSize)
errors := make(chan error)
done := make(chan Result)
quit := make(chan bool, 1)
return &Dispatcher{
jobQueue: jobq,
MaxWorkers: maxWorkers,
workerPool: pool,
wg: &sync.WaitGroup{},
workerWg: &sync.WaitGroup{},
errorCh: errors,
resultCh: done,
quitCh: quit,
}, nil
}
// Enqueue one or many jobs to process
func (d *Dispatcher) Enqueue(joblist ...Job) {
for _, job := range joblist {
d.jobQueue <- job
}
}
// Wait blocks until workers are done with their magic
// return the results and errors
func (d *Dispatcher) Wait() ([]Result, Errors) {
defer d.cleanUp()
// no more work so close the channels to tell workers job is done.
close(d.jobQueue)
d.workerWg.Wait()
// workers have all finished close out dispatcher
d.quitCh <- true
// wait for all results to tally
d.wg.Wait()
return d.Results, &d.Errors
}
func (d *Dispatcher) cleanUp() {
close(d.errorCh)
close(d.resultCh)
close(d.quitCh)
}
// Run gets the workers ready to work and listens to what they have to say at the end of their job
func (d *Dispatcher) Run() {
// Worker initialization
for i := 0; i < d.MaxWorkers; i++ {
worker := NewWorker(d.jobQueue, d.errorCh, d.resultCh)
worker.Start(d.workerWg)
}
d.wg.Add(1)
// Listen for results or errors
go func() {
defer d.wg.Done()
for {
select {
case err := <-d.errorCh:
d.Errors.append(err)
case res := <-d.resultCh:
// If you are changing this code, please note this is not a thread safe append()
d.Results = append(d.Results, res)
case <-d.quitCh:
return
}
}
}()
}