-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathnode.go
442 lines (370 loc) · 12.4 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
package reign
// In order to make this work:
// * There must be an "address" type that can serialize out as an ID
// and nothing but an ID, then deserialize as either a local or remote
// inside the concrete type. Then the GobEncoder and GobDecoder types can
// correctly manage the mailboxes.
// * send messages across
// * route the sent messages to the target mailboxes
// * implement the policy below
// * Implement the remote backpressure.
// FIXME: To be added to the documentation
// Unlike what you may initially suspect, when sending a message to a remote
// node, you will block until that message has been queued up and is ready
// to send. (That is, you will not block while the message is being *sent*,
// so if you have a large message that may take a moment to send you can
// move on.) This is for the purpose of providing backpressure; if the
// connection to the remote node becomes overloaded, it is desirable for
// things using that connection to be slowed up a bit. This may not be
// the optimal policy, but it's a start. In the case where messages are
// being sent substantially slower than the network can handle, the
// desirable common case, this will amount to no pauses.
//
// we can get this pattern by having the senders send to the node, and
// having them also select on the node's status chan; if that gets closed
// due to failure, we give up. We can also select on a timeout chan.
//
// FIXME: This implies that the backpressure of a node ought to be propagated
// back to the cluster itself. "Sufficiently large" backlogs should be propagated
// to any senders, so they can be slowed down locally.
import (
"crypto/tls"
"encoding/gob"
"errors"
"fmt"
"io"
"net"
"sync"
"time"
"github.com/thejerf/reign/internal"
)
const (
clusterVersion = 1
)
// ErrFailOnClusterHandshake is the error returned when a node connector's
// failOnClusterHandshake property is true.
var ErrFailOnClusterHandshake = errors.New("Failing on ssl handshake, as instructed")
// nodeConnector bundles together all of the information about how to connect
// to a node. The actual connection is a nodeConnection. This runs as a
// supervised service.
type nodeConnector struct {
source *NodeDefinition
dest *NodeDefinition
cluster *Cluster
ClusterLogger
// this mailbox should never get out to the "rest" of the system.
remoteMailboxes *remoteMailboxes
sync.Mutex
// deliberately retained across restarts
*mailboxes
connectionServer *connectionServer
connection *nodeConnection
cancel bool
failOnSSLHandshake bool
failOnClusterHandshake bool
}
// This establishes a connection to the target node. It does NOTHING ELSE,
// no SSL handshake, nothing.
//
// FIXME: Test that a node definition can't establish two connections to
// the same node.
func (nc *nodeConnector) connect() (*nodeConnection, error) {
conn, err := net.DialTCP("tcp", nc.source.localaddr, nc.dest.ipaddr)
if err != nil {
return nil, err
}
return &nodeConnection{
conn: conn,
rawOutput: conn,
rawInput: conn,
source: nc.source,
dest: nc.dest,
ClusterLogger: nc.ClusterLogger,
connectionServer: nc.connectionServer,
failOnSSLHandshake: nc.failOnSSLHandshake,
failOnClusterHandshake: nc.failOnClusterHandshake,
nodeConnector: nc,
}, nil
}
func (nc *nodeConnector) String() string {
// Since the node connector's Serve() method acquires a lock while determining
// if it should cancel, we need to make sure that we also acquire that lock before
// replying to Suture's service name inquiry.
nc.Lock()
defer nc.Unlock()
return fmt.Sprintf("nodeConnector %d -> %d", nc.source.ID, nc.dest.ID)
}
func (nc *nodeConnector) Serve() {
nc.Infof("node connection from %d to %d, starting serve", nc.source.ID, nc.dest.ID)
connection, err := nc.connect()
nc.connection = connection
if err != nil {
nc.Errorf("Could not connect to node %v: %s", nc.dest.ID, err.Error())
return
}
nc.Infof("%d -> %d connected", nc.source.ID, nc.dest.ID)
// sync with the Stop method, which could conceivably be triggered
// before we even get here
nc.Lock()
if nc.cancel {
connection.terminate()
}
defer connection.terminate()
nc.Unlock()
err = connection.sslHandshake()
if err != nil {
nc.Errorf("Could not SSL handshake to node %v: %s", nc.dest.ID, err.Error())
return
}
nc.Infof("%d -> %d ssl handshake successful", nc.source.ID, nc.dest.ID)
err = connection.clusterHandshake()
if err != nil {
nc.Errorf("Could not perform cluster handshake with node %v: %s", nc.dest.ID, err.Error())
return
}
nc.Infof("%d -> %d cluster handshake successful", nc.source.ID, nc.dest.ID)
// Synchronize registry with the remote node.
err = connection.registrySync()
if err != nil {
nc.Errorf("Could not sync registry with node %v: %s", nc.dest.ID, err.Error())
return
}
nc.Infof("%d -> %d registry sync successful", nc.source.ID, nc.dest.ID)
// hook up the connection to the permanent message manager
nc.remoteMailboxes.setConnection(connection)
defer nc.remoteMailboxes.unsetConnection(connection)
// and handle all incoming messages
connection.handleIncomingMessages()
}
func (nc *nodeConnector) Stop() {
nc.Lock()
defer nc.Unlock()
if nc.connection != nil {
nc.connection.terminate()
nc.connection = nil
} else {
nc.cancel = true
}
}
// this is the literal connection to the node.
type nodeConnection struct {
output *gob.Encoder
input *gob.Decoder
resetPingTimer chan time.Duration
connectionServer *connectionServer
source *NodeDefinition
dest *NodeDefinition
ClusterLogger
*nodeConnector
failOnSSLHandshake bool
failOnClusterHandshake bool
// Used for testing purposes to peek in on incoming messages.
peekFunc func(internal.ClusterMessage)
conn net.Conn
tls *tls.Conn
rawOutput io.WriteCloser
rawInput io.ReadCloser
}
// peekIncomingMessage accepts a cluster message and sends it to the
// peekFunc(), if set.
func (nc *nodeConnection) peekIncomingMessage(cm internal.ClusterMessage) {
nc.Lock()
defer nc.Unlock()
if nc.peekFunc == nil {
return
}
nc.peekFunc(cm)
}
// setPeekFunc accepts a function and assigns it to the node connection's
// peekFunc field.
func (nc *nodeConnection) setPeekFunc(pf func(internal.ClusterMessage)) {
nc.Lock()
defer nc.Unlock()
nc.peekFunc = pf
}
// resetConnectionDeadline resets the network connection's deadline to
// the specified duration from time.Now().
func (nc *nodeConnection) resetConnectionDeadline(d time.Duration) {
err := nc.conn.SetDeadline(time.Now().Add(d))
if err != nil {
nc.Errorf("Unable to set network connection deadline: %s", err)
}
}
func (nc *nodeConnection) terminate() {
if nc == nil {
return
}
if nc.tls != nil {
_ = nc.tls.Close()
}
if nc.conn != nil {
_ = nc.conn.Close()
}
if nc.rawOutput != nil {
_ = nc.rawOutput.Close()
}
if nc.rawInput != nil {
_ = nc.rawInput.Close()
}
}
func (nc *nodeConnection) sslHandshake() error {
nc.Tracef("Conn to %d in sslHandshake", nc.dest.ID)
if nc.failOnSSLHandshake {
nc.terminate()
return ErrFailOnClusterHandshake
}
tlsConfig := nc.connectionServer.Cluster.tlsConfig(nc.dest.ID)
tlsConn := tls.Client(nc.conn, tlsConfig)
// I like to run this manually, despite the fact this is run
// automatically at first communication,, so I get any errors it may
// produce at a controlled time.
nc.Tracef("Conn to %d handshaking", nc.dest.ID)
err := tlsConn.Handshake()
if err != nil {
return err
}
nc.tls = tlsConn
nc.output = gob.NewEncoder(nc.tls)
nc.input = gob.NewDecoder(nc.tls)
return nil
}
func (nc *nodeConnection) clusterHandshake() error {
if nc.failOnClusterHandshake {
nc.terminate()
return ErrFailOnClusterHandshake
}
handshake := internal.ClusterHandshake{
ClusterVersion: clusterVersion,
MyNodeID: internal.IntNodeID(nc.source.ID),
YourNodeID: internal.IntNodeID(nc.dest.ID),
}
err := nc.output.Encode(handshake)
if err != nil {
return err
}
var serverHandshake internal.ClusterHandshake
err = nc.input.Decode(&serverHandshake)
if err != nil {
return err
}
myNodeID := NodeID(serverHandshake.MyNodeID)
yourNodeID := NodeID(serverHandshake.YourNodeID)
if serverHandshake.ClusterVersion != clusterVersion {
connections.Warnf("Remote node id %v claimed unknown cluster version %v, proceeding in the hope that this will all just work out somehow...",
nc.dest.ID, serverHandshake.ClusterVersion)
}
if myNodeID != nc.dest.ID {
connections.Warnf("The node I thought was #%v is claiming to be #%v instead. These two nodes can not communicate properly. Standing by, hoping a new node definition will resolve this shortly....",
nc.dest.ID, serverHandshake.MyNodeID)
}
if yourNodeID != nc.source.ID {
connections.Warnf("The node #%v thinks I'm node #%v, but I think I'm node #%v. These two nodes can not communicate properly. Standing by, hoping a new node definition will resolve this shortly...",
nc.dest.ID, serverHandshake.YourNodeID, nc.source.ID)
}
nc.input = gob.NewDecoder(nc.tls)
return nil
}
// registrySync sends this node's registry MailboxID to the remote node.
func (nc *nodeConnection) registrySync() error {
// Send our registry synchronization data to the remote node.
rs := internal.RegisterRemoteNode{
Node: internal.IntNodeID(nc.source.ID),
MailboxID: internal.IntMailboxID(nc.connectionServer.registry.Address.GetID()),
}
err := nc.output.Encode(rs)
if err != nil {
return err
}
// Receive the remote node's registry synchronization data.
var irs internal.RegisterRemoteNode
err = nc.input.Decode(&irs)
if err != nil {
return err
}
nc.Tracef("Received mailbox ID %x from node %d", irs.MailboxID, irs.Node)
// Add remote node's registry mailbox ID to the nodeRegistries map. We need
// to register this *before* we generate and send our registry claims else
// some registry changes won't sync.
nc.connectionServer.registry.addNodeRegistry(
NodeID(irs.Node),
Address{
mailboxID: MailboxID(irs.MailboxID),
connectionServer: nc.connectionServer,
},
)
return nil
}
// handleIncomingMessages handle replies from the remote node after this
// node makes a successful connection.
func (nc *nodeConnection) handleIncomingMessages() {
var (
cm internal.ClusterMessage
err error
pong internal.ClusterMessage = internal.Pong{}
)
// Report the successful connection, and defer the disconnection status change call.
nc.connectionServer.changeConnectionStatus(nc.dest.ID, true)
defer nc.connectionServer.changeConnectionStatus(nc.dest.ID, false)
nc.resetPingTimer = make(chan time.Duration)
done := make(chan struct{})
defer func() {
close(nc.resetPingTimer)
close(done)
}()
go pingRemote(nc.output, nc.resetPingTimer, nc.ClusterLogger)
nc.Tracef("Connection %d -> %d in handleIncomingMessages", nc.source.ID, nc.dest.ID)
nc.resetConnectionDeadline(DeadlineInterval)
// Send our registry claims.
var claims internal.ClusterMessage = nc.connectionServer.registry.generateAllNodeClaims()
err = nc.output.Encode(&claims)
if err != nil {
nc.Errorf("Sending registry claims: %s", err)
}
for err == nil {
err = nc.input.Decode(&cm)
switch err {
case nil:
// We received a message. No need to PING the remote node.
nc.resetPingTimer <- DefaultPingInterval
nc.peekIncomingMessage(cm)
switch msg := cm.(type) {
case *internal.AllNodeClaims:
nc.Tracef("Received claims from node %d", msg.Node)
_ = nc.connectionServer.registry.Send(msg)
case *internal.Ping:
err = nc.output.Encode(&pong)
if err != nil {
nc.Errorf("Attempted to pong remote node: %s", err)
}
case *internal.Pong:
default:
err = nc.nodeConnector.remoteMailboxes.Send(cm)
if err != nil {
nc.Errorf("Error handling message %#v:\n%#v", cm, err)
}
}
nc.resetConnectionDeadline(DeadlineInterval)
case io.EOF:
nc.Errorf("Connection to node ID %v has gone down", nc.dest.ID)
default:
nErr, ok := err.(net.Error)
if ok && nErr.Temporary() {
// The pinger is monitoring temporary errors and will panic if
// its threshold is exceeded.
nc.Warn(err)
err = nil
} else {
nc.Errorf("Error decoding message: %s", err)
}
}
}
}
func (nc *nodeConnection) send(value *internal.ClusterMessage) error {
// If we are not currently connected, silently eat the message.
// FIXME: Compare with Erlang.
if nc == nil {
return errors.New("no current connection")
}
// FIXME: Send timeout
return nc.output.Encode(value)
}