-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathorbit_test.go
183 lines (147 loc) · 4.61 KB
/
orbit_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package orbit_test
import (
"sync"
"testing"
"time"
"github.com/bmizerany/assert"
"github.com/roganartu/orbitus/handlers"
target "github.com/roganartu/orbitus"
)
var (
buffer_size uint64 = 256 // 2^8
test = "Test string"
runCheckLock = &sync.Mutex{}
// TODO pull these out into a mock subpackage
receiverRan = false
receiver = func(p target.Processor, id uint64, val interface{}) {
runCheckLock.Lock()
defer runCheckLock.Unlock()
receiverRan = true
msg := p.GetMessage(id)
msg.SetID(id)
msg.SetMarshalled(val)
p.SetMessage(id, msg)
p.SetIndex(handlers.RECEIVER, id+1)
}
journalerRan = false
journaler = func(p target.Processor, ids []uint64) {
runCheckLock.Lock()
defer runCheckLock.Unlock()
journalerRan = true
handlers.Journaler(p, ids)
}
replicatorRan = false
replicator = func(p target.Processor, ids []uint64) {
runCheckLock.Lock()
defer runCheckLock.Unlock()
replicatorRan = true
handlers.Replicator(p, ids)
}
unmarshallerRan = false
unmarshaller = func(p target.Processor, ids []uint64) {
runCheckLock.Lock()
defer runCheckLock.Unlock()
unmarshallerRan = true
handlers.Unmarshaller(p, ids)
}
executorRan = false
executor = func(p target.Processor, ids []uint64) {
runCheckLock.Lock()
defer runCheckLock.Unlock()
executorRan = true
handlers.Executor(p, ids)
}
)
func TestDefaultReceiver(t *testing.T) {
loop := target.New(buffer_size, receiver, journaler, replicator, unmarshaller, executor)
err := loop.Start()
assert.Equal(t, nil, err)
loop.Input <- []byte(test)
time.Sleep(10 * time.Millisecond)
loop.Stop()
// Check out of bounds index wrapping
msg := loop.GetMessage(4)
assert.Equal(t, msg.GetMarshalled(), []byte(test))
}
func TestGetBufferSize(t *testing.T) {
loop := target.New(buffer_size, nil, nil, nil, nil, nil)
assert.Equal(t, buffer_size, loop.GetBufferSize())
}
func TestGetIndex(t *testing.T) {
loop := target.New(buffer_size, nil, nil)
assert.Equal(t, uint64(1), loop.GetIndex(0))
}
func TestNew(t *testing.T) {
loop := target.New(buffer_size, nil, nil, nil, nil, nil)
var i uint64 = 4
// Ensure all the indexes are initialized to zero
for j := 0; j < 5; j++ {
assert.Equal(t, i-uint64(j), loop.GetIndex(j))
}
// Ensure buffer has been fully allocated
for i = 0; i < buffer_size; i++ {
msg := loop.GetMessage(i)
msg.SetMarshalled([]byte(test + string(i)))
}
for i = 0; i < buffer_size; i++ {
msg := loop.GetMessage(i)
assert.Equal(t, msg.GetMarshalled(), []byte(test+string(i)))
}
}
func TestLoopStart(t *testing.T) {
runCheckLock.Lock()
receiverRan, journalerRan, replicatorRan, unmarshallerRan, executorRan =
false, false, false, false, false
runCheckLock.Unlock()
loop := target.New(buffer_size, receiver, journaler, replicator, unmarshaller, executor)
err := loop.Start()
assert.Equal(t, nil, err)
// Manually add a new message to the receiver buffer to be processed
// "{\"test\":\"This is a test message\"}" Base64 encoded
loop.Input <- []byte("eyJ0ZXN0IjoiVGhpcyBpcyBhIHRlc3QgbWVzc2FnZSJ9")
time.Sleep(10 * time.Millisecond)
loop.Stop()
runCheckLock.Lock()
defer runCheckLock.Unlock()
assert.Equal(t, true, receiverRan)
assert.Equal(t, true, journalerRan)
assert.Equal(t, true, replicatorRan)
assert.Equal(t, true, unmarshallerRan)
assert.Equal(t, true, executorRan)
}
func TestReceiverLoopReset(t *testing.T) {
loop := target.New(buffer_size, receiver, journaler, replicator, unmarshaller, executor)
var i uint64
testvals := []uint64{1, buffer_size - 1, buffer_size, buffer_size + 1}
for _, i = range testvals {
err := loop.Reset(i)
assert.Equal(t, nil, err)
// Ensure all indexes have been set to the given value
for j := 0; j < 5; j++ {
assert.Equal(t, i-uint64(j), loop.GetIndex(j))
}
}
// Ensure loop does not reset if running
err := loop.Start()
assert.Equal(t, nil, err)
time.Sleep(1 * time.Millisecond)
err = loop.Reset(buffer_size)
assert.NotEqual(t, nil, err)
assert.Equal(t, "cannot reset a running Loop", err.Error())
}
func BenchmarkIntegrated(b *testing.B) {
loop := target.New(buffer_size, receiver, journaler, replicator, unmarshaller, executor)
receiverRan, journalerRan, replicatorRan, unmarshallerRan, executorRan = false, false, false, false, false
err := loop.Start()
if err != nil {
b.Fatalf("Failed to start loop: %s", err)
}
defer loop.Stop()
// Manually add a new message to the receiver buffer to be processed
// "{\"test\":\"This is a test message\"}" Base64 encoded
loop.Input <- []byte("eyJ0ZXN0IjoiVGhpcyBpcyBhIHRlc3QgbWVzc2FnZSJ9")
time.Sleep(1 * time.Millisecond)
for i := 0; i < b.N; i++ {
loop.GetMessage(uint64(i))
}
}