-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpipeline_test.go
105 lines (83 loc) · 1.93 KB
/
pipeline_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package phonelab
import (
"github.com/stretchr/testify/assert"
"sync"
"testing"
)
// A processor that totals up the loglines and emits it.
type totalProcessor struct {
source Processor
}
func (proc *totalProcessor) Process() <-chan interface{} {
outChan := make(chan interface{})
go func() {
inChan := proc.source.Process()
total := 0
for _ = range inChan {
total += 1
}
outChan <- total
close(outChan)
}()
return outChan
}
// A data processor that collects the totals from all the sources.
type totalDataCollector struct {
results []int
sync.Mutex
}
func newTestDataCollector() *totalDataCollector {
return &totalDataCollector{
results: make([]int, 0),
}
}
func (t *totalDataCollector) BuildPipeline(source *PipelineSourceInstance) (*Pipeline, error) {
// Normally, there will be at least one node before the source.
// We'll fake that with a pass through handler.
processor := &totalProcessor{source.Processor}
return &Pipeline{
LastHop: processor,
}, nil
}
func (t *totalDataCollector) OnData(data interface{}, unused PipelineSourceInfo) {
t.Lock()
t.results = append(t.results, data.(int))
t.Unlock()
}
func (t *totalDataCollector) Finish() {}
// Simple Generator
type emitterGenerator struct {
sizes []int
}
func (e *emitterGenerator) Process() <-chan *PipelineSourceInstance {
outChan := make(chan *PipelineSourceInstance)
go func() {
for _, val := range e.sizes {
outChan <- &PipelineSourceInstance{
Processor: &emitter{val},
}
}
close(outChan)
}()
return outChan
}
func TestPipeline(t *testing.T) {
assert := assert.New(t)
dataProc := newTestDataCollector()
runner := NewRunner(
&emitterGenerator{
sizes: []int{10, 20, 50},
},
dataProc,
dataProc,
)
runner.Run()
totals := make(map[int]bool)
for _, total := range dataProc.results {
totals[total] = true
}
assert.Equal(3, len(totals))
assert.True(totals[10])
assert.True(totals[20])
assert.True(totals[50])
}