-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaudit.go
143 lines (117 loc) · 2.74 KB
/
audit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main
import (
"encoding/json"
"log"
"strings"
"sync"
"time"
"github.com/bitly/nsq/nsqd"
"github.com/hashicorp/serf/command/agent"
"github.com/shipwire/ansqd/internal/polity"
)
var ExpirationTime = 2 * time.Minute
type audit struct {
m *nsqd.Message
}
type delegate struct{}
// OnFinish is called when FIN is received for the message
func (d *delegate) OnFinish(m *nsqd.Message) {
n.GetTopic("audit.finish").PutMessage(&nsqd.Message{
ID: n.NewID(),
Body: m.ID[:],
})
log.Printf("AUDIT: OnFinish %x", m.ID)
}
// OnQueue is called before a message is sent to the queue
func (d *delegate) OnQueue(m *nsqd.Message, topic string) {
if strings.HasPrefix(topic, "audit.") {
return
}
n.GetTopic("audit.send").PutMessage(&nsqd.Message{
ID: n.NewID(),
Body: auditMessage{*m, topic}.Bytes(),
})
log.Printf("AUDIT: OnQueue %x", m.ID)
}
// OnRequeue is called when REQ is received for the message
func (d *delegate) OnRequeue(m *nsqd.Message, delay time.Duration) {
a.Req(m)
}
// OnTouch is called when TOUCH is received for the message
func (d *delegate) OnTouch(m *nsqd.Message) {
a.Touch(m)
}
type auditor struct {
p *polity.Polity
ag *agent.Agent
hosts map[string]*Host
hostsLock *sync.Mutex
}
func (a auditor) Audit(m *nsqd.Message) {
a.ExtractHost(m).AddMessage(*m, time.Now().Add(ExpirationTime))
}
func (a auditor) Fin(m *nsqd.Message) {
a.ExtractHost(m).RemoveMessage(*m)
}
func (a auditor) Req(m *nsqd.Message) {
a.ExtractHost(m).AddMessage(*m, time.Now().Add(ExpirationTime))
}
func (a auditor) Touch(m *nsqd.Message) {
a.ExtractHost(m).AddMessage(*m, time.Now().Add(ExpirationTime))
}
func (h *Host) InitiateRecovery() {
h.recoveryLock.Lock()
if h.inRecovery {
return
}
h.inRecovery = true
h.recoveryLock.Unlock()
role := "recover:" + h.host
err := <-a.p.RunElection(role)
defer a.p.RunRecallElection(role)
if err != nil {
return
}
for mid, bucket := range h.messages {
m := bucket.GetMessage(mid)
am := extractAudit(m)
n.GetTopic(am.Topic).PutMessage(&am.Message)
}
}
func (a auditor) ExtractHost(m *nsqd.Message) *Host {
body := map[string]interface{}{}
err := json.Unmarshal(m.Body, &body)
if err != nil {
return nil
}
var hostname string
if h, ok := body["hostname"]; !ok {
return nil
} else {
hostname, ok = h.(string)
if !ok {
return nil
}
}
return a.GetHost(hostname)
}
func (a auditor) GetHost(hostname string) *Host {
a.hostsLock.Lock()
defer a.hostsLock.Unlock()
host, ok := a.hosts[hostname]
if !ok {
host = NewHost(hostname)
a.hosts[hostname] = host
}
return host
}
type auditMessage struct {
nsqd.Message
Topic string
}
func (a auditMessage) Bytes() []byte {
return nil
}
func extractAudit(m nsqd.Message) auditMessage {
return auditMessage{}
}