-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathmain.go
99 lines (88 loc) · 2.25 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package orderedconcurrently
import (
"container/heap"
"context"
"sync"
)
// Options options for Process
type Options struct {
PoolSize int
OutChannelBuffer int
}
// OrderedOutput is the output channel type from Process
type OrderedOutput struct {
Value interface{}
Remaining func() int
}
// WorkFunction interface
type WorkFunction interface {
Run(ctx context.Context) interface{}
}
// Process processes work function based on input.
// It Accepts an WorkFunction read channel, work function and concurrent go routine pool size.
// It Returns an interface{} channel.
func Process(ctx context.Context, inputChan <-chan WorkFunction, options *Options) <-chan OrderedOutput {
outputChan := make(chan OrderedOutput, options.OutChannelBuffer)
go func() {
if options.PoolSize < 1 {
// Set a minimum number of processors
options.PoolSize = 1
}
processChan := make(chan *processInput, options.PoolSize)
aggregatorChan := make(chan *processInput, options.PoolSize)
// Go routine to print data in order
go func() {
var current uint64
outputHeap := &processInputHeap{}
defer func() {
close(outputChan)
}()
remaining := func() int {
return outputHeap.Len()
}
for item := range aggregatorChan {
heap.Push(outputHeap, item)
for {
if top, ok := outputHeap.Peek(); !ok || top.order != current {
break
}
outputChan <- OrderedOutput{Value: heap.Pop(outputHeap).(*processInput).value, Remaining: remaining}
current++
}
}
for outputHeap.Len() > 0 {
outputChan <- OrderedOutput{Value: heap.Pop(outputHeap).(*processInput).value, Remaining: remaining}
}
}()
poolWg := sync.WaitGroup{}
poolWg.Add(options.PoolSize)
// Create a goroutine pool
for i := 0; i < options.PoolSize; i++ {
go func(worker int) {
defer func() {
poolWg.Done()
}()
for input := range processChan {
input.value = input.workFn.Run(ctx)
input.workFn = nil
aggregatorChan <- input
}
}(i)
}
go func() {
poolWg.Wait()
close(aggregatorChan)
}()
go func() {
defer func() {
close(processChan)
}()
var order uint64
for input := range inputChan {
processChan <- &processInput{workFn: input, order: order}
order++
}
}()
}()
return outputChan
}