-
Notifications
You must be signed in to change notification settings - Fork 0
/
van.go
469 lines (362 loc) · 11.8 KB
/
van.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
package van
import (
"context"
"fmt"
"log"
"reflect"
"sync"
)
// maxArgs is the maximum number of arguments (dependencies) a function can have.
// Since we don't want to allocate a dynamic slice for every function call, we use
// a fixed size array. One can always bypass this limitation by using a dependency struct.
const maxArgs = 16
type ProviderFunc interface{} // func(ctx context.Context, deps ...interface{}) (interface{}, error)
type HandlerFunc interface{} // func(ctx context.Context, cmd interface{}, deps ...interface{}) error
type ListenerFunc interface{} // func(ctx context.Context, event interface{}, deps ...interface)
type providerOpts struct {
sync.RWMutex
fn ProviderFunc
instance interface{}
singleton bool
takesContext bool
}
func (p *providerOpts) call(args []reflect.Value) (reflect.Value, error) {
ret := reflect.ValueOf(p.fn).Call(args)
instance, err := ret[0], toError(ret[1])
return instance, err
}
type Van struct {
providers map[reflect.Type]*providerOpts
listeners map[reflect.Type][]HandlerFunc
handlers map[reflect.Type]HandlerFunc
wg sync.WaitGroup
}
func New() *Van {
return &Van{
providers: make(map[reflect.Type]*providerOpts),
listeners: make(map[reflect.Type][]HandlerFunc),
handlers: make(map[reflect.Type]HandlerFunc),
}
}
// Wait blocks until all current events are processed, which may be used for implementing graceful shutdown.
// It is up to the programmer to ensure that no new events/commands are published, otherwise it may run forever.
func (b *Van) Wait() {
b.wg.Wait()
}
// Provide registers new type constructor that will be called every time a handler requests the dependency.
// There's no such thing as "optional" dependency. Therefore, the provider should either return a valid non-nil
// dependency or an error.
// It is expected to be called during the app startup phase as it performs the run time type checking and
// panics if an incorrect function type is provided.
func (b *Van) Provide(provider ProviderFunc) {
if err := b.registerProvider(provider, false); err != nil {
panic(err)
}
}
// ProvideOnce registers a new type constructor that is guaranteed to be called not more than once in
// application's lifetime.
// It is expected to be called during the app startup phase as it performs the run time type checking and
// panics if an incorrect function type is provided.
func (b *Van) ProvideOnce(provider ProviderFunc) {
if err := b.registerProvider(provider, true); err != nil {
panic(err)
}
}
func (b *Van) registerProvider(provider ProviderFunc, signleton bool) error {
providerType := reflect.TypeOf(provider)
if err := validateProviderSignature(providerType); err != nil {
return err
}
retType := providerType.Out(0)
takesContext := false
for i := 0; i < providerType.NumIn(); i++ {
inType := providerType.In(i)
if inType == retType {
return fmt.Errorf("provider function has a dependency of the same type")
}
if err := b.validateDependency(inType); err != nil {
return err
}
if inType == typeContext {
if signleton {
return fmt.Errorf("singleton providers cannot use Context as a dependency")
}
takesContext = true
}
if pp, ok := b.providers[inType]; ok && pp.takesContext {
if signleton {
return fmt.Errorf("singleton providers cannot depend on providers that take Context")
}
takesContext = true
}
}
b.providers[retType] = &providerOpts{
fn: provider,
singleton: signleton,
takesContext: takesContext,
}
return nil
}
// Handle registers a handler for the given command type. There can be only one handler per command.
// It is expected to be called during the app startup phase as it performs the run time type checking and
// panics if an incorrect function type is provided.
func (b *Van) Handle(cmd interface{}, handler HandlerFunc) {
if err := b.registerHandler(cmd, handler); err != nil {
panic(err)
}
}
func (b *Van) registerHandler(cmd interface{}, handler HandlerFunc) error {
cmdType := reflect.TypeOf(cmd)
if cmdType.Kind() != reflect.Struct {
return fmt.Errorf("cmd must be a struct, got %s", cmdType.Name())
}
handlerType := reflect.TypeOf(handler)
if err := validateHandlerSignature(handlerType); err != nil {
return err
}
if cmdType != handlerType.In(1).Elem() {
return fmt.Errorf("command type mismatch")
}
// start from the third argument as the first two are always `ctx` and `cmd`
for i := 2; i < handlerType.NumIn(); i++ {
if err := b.validateDependency(handlerType.In(i)); err != nil {
return err
}
}
b.handlers[cmdType] = handler
return nil
}
// Invoke runs an associated command handler.
func (b *Van) Invoke(ctx context.Context, cmd interface{}) error {
cmdType := reflect.TypeOf(cmd)
if cmdType.Kind() != reflect.Ptr {
return fmt.Errorf("cmd must be a pointer to a struct")
}
cmdType = cmdType.Elem()
if cmdType.Kind() != reflect.Struct {
return fmt.Errorf("cmd must be a pointer to a struct")
}
handler, ok := b.handlers[cmdType]
if !ok {
return fmt.Errorf("no handlers found for type %s", cmdType.String())
}
var args [maxArgs]reflect.Value
handlerType := reflect.TypeOf(handler)
numIn := handlerType.NumIn()
if numIn > len(args) {
return fmt.Errorf("too many dependencies for handler %s", handlerType.String())
}
err := b.resolve(ctx, cmd, handlerType, args[:numIn])
if err != nil {
return err
}
ret := reflect.ValueOf(handler).Call(args[:numIn])
return toError(ret[0])
}
// Subscribe registers a new handler for the given command type. There can be any number of handlers per event.
// It is expected to be called during the app startup phase as it performs the run time type checking and
// panics if an incorrect function type is provided.
func (b *Van) Subscribe(event interface{}, listeners ...ListenerFunc) {
for i := range listeners {
err := b.registerListener(event, listeners[i])
if err != nil {
panic(err)
}
}
}
func (b *Van) registerListener(event interface{}, listener ListenerFunc) error {
eventType := reflect.TypeOf(event)
if eventType.Kind() != reflect.Struct {
return fmt.Errorf("event must be a struct, got %s", eventType.String())
}
listenerType := reflect.TypeOf(listener)
if err := validateListenerSignature(listenerType); err != nil {
return err
}
if eventType != listenerType.In(1) {
return fmt.Errorf("event type mismatch")
}
// start from the third argument as the first two are always `ctx` and `event`
for i := 2; i < listenerType.NumIn(); i++ {
if err := b.validateDependency(listenerType.In(i)); err != nil {
return err
}
}
if _, ok := b.listeners[eventType]; !ok {
b.listeners[eventType] = make([]HandlerFunc, 0)
}
b.listeners[eventType] = append(b.listeners[eventType], listener)
return nil
}
// Publish sends an event to the bus. This is a fire-and-forget non-blocking operation.
// Each listener will be called in a separate goroutine, and they can fail independently.
// The error is never propagated back to the publisher, and should be handled by the listener itself.
func (b *Van) Publish(event interface{}) error {
eventType := reflect.TypeOf(event)
if eventType.Kind() != reflect.Struct {
return fmt.Errorf("event must be a a struct, got %s", eventType.Name())
}
b.wg.Add(1)
go func() {
defer b.wg.Done()
b.processEvent(event)
}()
return nil
}
func (b *Van) processEvent(event interface{}) {
eventType := reflect.TypeOf(event)
listeners, ok := b.listeners[eventType]
if !ok || len(listeners) == 0 {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i := range listeners {
typ := reflect.TypeOf(listeners[i])
var args [maxArgs]reflect.Value
numIn := typ.NumIn()
if numIn > len(args) {
log.Printf("van: too many dependencies for listener %s", typ.String())
continue
}
if numIn > 0 {
err := b.resolve(ctx, event, typ, args[:numIn])
if err != nil {
log.Printf("van: failed to resolve dependencies for %s: %s", typ.String(), err)
continue
}
}
reflect.ValueOf(listeners[i]).Call(args[:numIn])
}
}
// Exec executes the given function inside the dependency injector.
func (b *Van) Exec(ctx context.Context, fn interface{}) error {
funcType := reflect.TypeOf(fn)
if err := validateExecLambdaSignature(funcType); err != nil {
return err
}
for i := 0; i < funcType.NumIn(); i++ {
if err := b.validateDependency(funcType.In(i)); err != nil {
return err
}
}
var args [maxArgs]reflect.Value
numIn := funcType.NumIn()
if numIn > len(args) {
return fmt.Errorf("too many dependencies for function %s", funcType.String())
}
err := b.resolve(ctx, nil, funcType, args[:numIn])
if err != nil {
return err
}
ret := reflect.ValueOf(fn).Call(args[:numIn])
return toError(ret[0])
}
func (b *Van) resolve(ctx context.Context, cmd interface{}, funcType reflect.Type, args []reflect.Value) error {
for i := 0; i < funcType.NumIn(); i++ {
argType := funcType.In(i)
switch {
case i == 0 && argType == typeContext:
args[i] = reflect.ValueOf(ctx)
case i == 1 && argType == reflect.TypeOf(cmd):
args[i] = reflect.ValueOf(cmd)
case argType == typeVan:
args[i] = reflect.ValueOf(b)
case argType.Kind() == reflect.Interface:
instance, err := b.new(ctx, argType)
if err != nil {
return err
}
args[i] = instance
case argType.Kind() == reflect.Struct:
value, err := b.buildStruct(ctx, argType)
if err != nil {
return err
}
args[i] = value
default:
}
}
return nil
}
func (b *Van) buildStruct(ctx context.Context, structType reflect.Type) (reflect.Value, error) {
fields := reflect.VisibleFields(structType)
value := reflect.New(structType).Elem()
for _, field := range fields {
instance, err := b.new(ctx, field.Type)
if err != nil {
return reflect.ValueOf(nil), err
}
value.FieldByIndex(field.Index).Set(instance)
}
return value, nil
}
func (b *Van) new(ctx context.Context, t reflect.Type) (reflect.Value, error) {
provider := b.providers[t]
if provider.singleton {
provider.RLock()
if provider.instance == nil {
provider.RUnlock()
return b.newSingleton(ctx, t)
}
provider.RUnlock()
return reflect.ValueOf(provider.instance), nil
}
providerType := reflect.TypeOf(provider.fn)
var args [maxArgs]reflect.Value
numIn := providerType.NumIn()
if numIn > len(args) {
return reflect.ValueOf(nil), fmt.Errorf("too many dependencies for provider %s", providerType.String())
}
if numIn > 0 {
err := b.resolve(ctx, nil, providerType, args[:numIn])
if err != nil {
return reflect.ValueOf(nil), err
}
}
inst, err := provider.call(args[:numIn])
if err != nil {
return reflect.ValueOf(nil), fmt.Errorf("failed to resolve dependency %s: %w", t.String(), err)
}
return inst, nil
}
func (b *Van) newSingleton(ctx context.Context, t reflect.Type) (reflect.Value, error) {
provider := b.providers[t]
provider.Lock()
defer provider.Unlock()
if provider.instance != nil {
return reflect.ValueOf(provider.instance), nil
}
providerType := reflect.TypeOf(provider.fn)
var args [maxArgs]reflect.Value
numIn := providerType.NumIn()
if numIn > len(args) {
return reflect.ValueOf(nil), fmt.Errorf("too many dependencies for provider %s", providerType.String())
}
if numIn > 0 {
err := b.resolve(ctx, nil, providerType, args[:numIn])
if err != nil {
return reflect.ValueOf(nil), err
}
}
inst, err := provider.call(args[:numIn])
if err != nil {
return reflect.ValueOf(nil), fmt.Errorf("failed to resolve dependency %s: %w", t.String(), err)
}
provider.instance = inst.Interface()
return inst, nil
}
func (b *Van) validateDependency(t reflect.Type) error {
if t.Kind() == reflect.Struct {
for _, field := range reflect.VisibleFields(t) {
if err := b.validateDependency(field.Type); err != nil {
return err
}
}
return nil
}
if _, ok := b.providers[t]; ok || t == typeVan || t == typeContext {
return nil
}
return fmt.Errorf("no providers registered for type %s", t.String())
}