-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtask_controller.go
136 lines (109 loc) · 2.33 KB
/
task_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package weightask
import (
"context"
"errors"
"sync"
)
var ErrNoResult = errors.New("no result")
type Task interface {
// Weight returns the weight value of the task
Weight() int
// PerformTask Execute the task
PerformTask(ctx context.Context) (any, error)
}
type WeightList interface {
Sort()
Add(val int)
Remove(val int)
GetTopWeight() int
}
type TaskReport struct {
weight int
result any
err error
}
type TaskController struct {
tasks []Task
reportCh chan *TaskReport
effectiveReport *TaskReport
weightList WeightList
}
func (t *TaskController) AddTask(task Task) {
t.tasks = append(t.tasks, task)
}
func (t *TaskController) ProcessTasks(ctx context.Context) (any, error) {
var wg sync.WaitGroup
for _, task := range t.tasks {
t.weightList.Add(task.Weight())
wg.Add(1)
go func(ctx context.Context, tsk Task) {
defer wg.Done()
select {
case <-ctx.Done():
return
case result := <-t.do(ctx, tsk):
t.reportCh <- result
}
}(ctx, task)
}
go func() {
wg.Wait()
close(t.reportCh)
}()
t.weightList.Sort()
for rst := range t.reportCh {
if rst == nil {
continue
}
if rst.err != nil {
t.weightList.Remove(rst.weight)
continue
}
if rst.weight == t.weightList.GetTopWeight() {
return rst.result, nil
}
if t.effectiveReport == nil {
t.effectiveReport = rst
continue
}
if rst.weight > t.effectiveReport.weight {
t.weightList.Remove(t.effectiveReport.weight)
t.effectiveReport = rst
} else {
t.weightList.Remove(rst.weight)
}
}
if t.effectiveReport != nil && t.effectiveReport.result != nil {
return t.effectiveReport.result, nil
}
if ctx.Err() != nil {
return nil, ctx.Err()
}
return nil, ErrNoResult
}
func (t *TaskController) do(ctx context.Context, tsk Task) <-chan *TaskReport {
trCh := make(chan *TaskReport, 1)
go func() {
rst, err := tsk.PerformTask(ctx)
trCh <- &TaskReport{result: rst, err: err, weight: tsk.Weight()}
}()
return trCh
}
type Option func(*TaskController)
func WithWeightList(pl WeightList) Option {
return func(t *TaskController) {
t.weightList = pl
}
}
func NewTaskController(opts ...Option) *TaskController {
t := &TaskController{
reportCh: make(chan *TaskReport),
}
for _, opt := range opts {
opt(t)
}
if t.weightList == nil {
t.weightList = new(WeightSlice)
}
return t
}