-
Notifications
You must be signed in to change notification settings - Fork 0
/
workerpool.go
148 lines (131 loc) · 3.77 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package workerpool
import (
"context"
"sync"
"sync/atomic"
"time"
)
type Worker struct {
Ctx context.Context
Cancel context.CancelFunc
TimeOut *time.Duration
Task func(ctx context.Context)
}
func NewWorker(ctx context.Context, task func(ctx context.Context), timeOut ...time.Duration) *Worker {
var (
kill context.CancelFunc
timeout *time.Duration
)
if len(timeOut) > 0 {
timeout = &timeOut[0]
ctx, kill = context.WithTimeout(ctx, *timeout)
}
return &Worker{Ctx: ctx, TimeOut: timeout, Task: task, Cancel: kill}
}
// Kill stops the worker if it has a timeout set.
func (w *Worker) Kill() {
if w.TimeOut != nil {
w.Cancel()
}
}
type WorkerPool struct {
maxWorkers int
workers *DynamicBuffer
wg *sync.WaitGroup
Len int64
maxBufferSize int64
waitSpace chan struct{}
}
func NewPool(maxWorkers, bufferSize int) *WorkerPool {
return &WorkerPool{
maxWorkers: maxWorkers,
wg: &sync.WaitGroup{},
workers: NewDynamicBuffer(2, bufferSize),
maxBufferSize: int64(bufferSize),
waitSpace: make(chan struct{}, bufferSize),
}
}
// Run runs the WorkerPool.
//
// It creates a specified number of worker goroutines and starts executing the worker function.
// The number of worker goroutines is determined by the maxWorkers field of the WorkerPool.
// This function uses the sync.WaitGroup wg to wait for all worker goroutines to finish.
// The worker function is executed concurrently in each goroutine.
func (p *WorkerPool) Run() {
for i := 0; i < p.maxWorkers; i++ {
p.wg.Add(1)
go p.worker()
}
}
// Submit submits a worker to the WorkerPool.
//
// It checks if the WorkerPool is full and blocks until space is available.
// It then adds the worker to the WorkerPool and increments the length.
//
// Example
//
// workerpool := workerpool.NewPool(2, 20)
// worker := workerpool.NewWorker(context.Background(), func(ctx context.Context) {})
// workerpool.Submit(worker)
//
// // OR
//
// workerpool.Submit(workerpool.NewWorker(context.Background(), func(ctx context.Context) {}))
//
// // WITH TIMEOUT
//
// workerpool.Submit(workerpool.NewWorker(context.Background(),
// func(ctx context.Context) {}, 1*time.Second))
func (p *WorkerPool) Submit(worker *Worker) {
if p.IsFull() {
<-p.waitSpace
}
if ok := p.workers.Add(worker); ok {
atomic.AddInt64(&p.Len, 1)
}
}
// worker is a function that represents a worker in the WorkerPool.
//
// It reads tasks from the buffer and executes them using the worker's task function.
// The worker's context is passed as a parameter to the task function.
// After executing the task, the worker is killed.
// The length of the WorkerPool is decremented by 1.
// If there is space available in the WorkerPool, a signal is sent to the waitSpace channel.
func (p *WorkerPool) worker() {
defer p.wg.Done()
for {
w, ok := <-p.workers.buffer
if !ok {
return
}
func(w *Worker) {
defer w.Kill()
w.Task(w.Ctx)
}(w)
atomic.AddInt64(&p.Len, -1)
if p.isSpaceAvailable() {
p.waitSpace <- struct{}{}
}
}
}
// len returns the current length of the WorkerPool.
func (p *WorkerPool) len() int64 {
return atomic.LoadInt64(&p.Len)
}
// IsFull checks if the worker pool is full.
//
// It returns a boolean value indicating whether the worker pool is full or not.
func (p *WorkerPool) IsFull() bool {
return atomic.LoadInt64(&p.Len) == p.maxBufferSize
}
// isSpaceAvailable checks if there is space available in the worker pool.
//
// It returns a boolean value indicating whether there is space available or not.
func (p *WorkerPool) isSpaceAvailable() bool {
return p.workers.Cap() == p.maxBufferSize && p.len() == p.maxBufferSize-1
}
// Wait blocks until all workers in the worker pool have completed their tasks.
func (p *WorkerPool) Wait() {
close(p.workers.buffer)
p.wg.Wait()
}