-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpubcontrol.go
127 lines (116 loc) · 3.77 KB
/
pubcontrol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// pubcontrol.go
// ~~~~~~~~~
// This module implements the PubControl struct and features.
// :authors: Konstantin Bokarius.
// :copyright: (c) 2015 by Fanout, Inc.
// :license: MIT, see LICENSE for more details.
package pubcontrol
import (
"fmt"
"runtime"
"strings"
"sync"
)
// The PubControl struct allows a consumer to manage a set of publishing
// endpoints and to publish to all of those endpoints via a single publish
// method call. A PubControl instance can be configured either using a
// hash or array of hashes containing configuration information or by
// manually adding PubControlClient instances.
type PubControl struct {
clients []*PubControlClient
clientsRWLock sync.RWMutex
}
// Initialize with or without a configuration. A configuration can be applied
// after initialization via the apply_config method.
func NewPubControl(config []map[string]interface{}) *PubControl {
pc := new(PubControl)
pc.clients = make([]*PubControlClient, 0)
if config != nil && len(config) > 0 {
pc.ApplyConfig(config)
}
return pc
}
// Remove all of the configured PubControlClient instances.
func (pc *PubControl) RemoveAllClients() {
pc.clientsRWLock.Lock()
defer pc.clientsRWLock.Unlock()
pc.clients = make([]*PubControlClient, 0)
}
// Add the specified PubControlClient instance.
func (pc *PubControl) AddClient(pcc *PubControlClient) {
pc.clientsRWLock.Lock()
defer pc.clientsRWLock.Unlock()
pc.clients = append(pc.clients, pcc)
}
// Apply the specified configuration to this PubControl instance. The
// configuration object can either be a hash or an array of hashes where
// each hash corresponds to a single PubControlClient instance. Each hash
// will be parsed and a PubControlClient will be created either using just
// a URI or a URI and JWT authentication information.
func (pc *PubControl) ApplyConfig(config []map[string]interface{}) {
pc.clientsRWLock.Lock()
defer pc.clientsRWLock.Unlock()
for _, entry := range config {
if _, ok := entry["uri"]; !ok {
continue
}
pcc := NewPubControlClient(entry["uri"].(string))
if _, ok := entry["iss"]; ok {
claim := make(map[string]interface{})
claim["iss"] = entry["iss"]
switch entry["key"].(type) {
case string:
pcc.SetAuthJwt(claim, []byte(entry["key"].(string)))
case []byte:
pcc.SetAuthJwt(claim, entry["key"].([]byte))
}
} else if _, ok := entry["key"]; ok {
switch entry["key"].(type) {
case string:
pcc.SetAuthBearer(entry["key"].(string))
case []byte:
pcc.SetAuthBearer(string(entry["key"].([]byte)))
}
}
pc.clients = append(pc.clients, pcc)
}
}
// The publish method for publishing the specified item to the specified
// channel on the configured endpoints. Different endpoints are published to in parallel,
// with this function waiting for them to finish. Any errors (including panics) are aggregated
// into one error.
func (pc *PubControl) Publish(channel string, item *Item) error {
pc.clientsRWLock.RLock()
defer pc.clientsRWLock.RUnlock()
wg := sync.WaitGroup{}
errCh := make(chan string, len(pc.clients))
for _, pcc := range pc.clients {
wg.Add(1)
client := pcc
go func() {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, 1024*8)
stack = stack[:runtime.Stack(stack, false)]
errCh <- fmt.Sprintf("%s: PANIC: %v\n%s", client.uri, err, stack)
}
wg.Done()
}()
err := client.Publish(channel, item)
if err != nil {
errCh <- fmt.Sprintf("%s: %s", client.uri, strings.TrimSpace(err.Error()))
}
}()
}
wg.Wait()
close(errCh)
errs := make([]string, 0)
for err := range errCh {
errs = append(errs, err)
}
if len(errs) > 0 {
return fmt.Errorf("%d/%d client(s) failed to publish to channel: %s Errors: [%s]",
len(errs), len(pc.clients), channel, strings.Join(errs, "],["))
}
return nil
}