-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
Copy pathlookup_peer.go
144 lines (128 loc) · 3.52 KB
/
lookup_peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package nsqd
import (
"encoding/binary"
"fmt"
"io"
"net"
"time"
"github.com/nsqio/go-nsq"
"github.com/nsqio/nsq/internal/lg"
)
// lookupPeer is a low-level type for connecting/reading/writing to nsqlookupd
//
// A lookupPeer instance is designed to connect lazily to nsqlookupd and reconnect
// gracefully (i.e. it is all handled by the library). Clients can simply use the
// Command interface to perform a round-trip.
type lookupPeer struct {
logf lg.AppLogFunc
addr string
conn net.Conn
state int32
connectCallback func(*lookupPeer)
maxBodySize int64
Info peerInfo
}
// peerInfo contains metadata for a lookupPeer instance (and is JSON marshalable)
type peerInfo struct {
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
BroadcastAddress string `json:"broadcast_address"`
}
// newLookupPeer creates a new lookupPeer instance connecting to the supplied address.
//
// The supplied connectCallback will be called *every* time the instance connects.
func newLookupPeer(addr string, maxBodySize int64, l lg.AppLogFunc, connectCallback func(*lookupPeer)) *lookupPeer {
return &lookupPeer{
logf: l,
addr: addr,
state: stateDisconnected,
maxBodySize: maxBodySize,
connectCallback: connectCallback,
}
}
// Connect will Dial the specified address, with timeouts
func (lp *lookupPeer) Connect() error {
lp.logf(lg.INFO, "LOOKUP connecting to %s", lp.addr)
conn, err := net.DialTimeout("tcp", lp.addr, time.Second)
if err != nil {
return err
}
lp.conn = conn
return nil
}
// String returns the specified address
func (lp *lookupPeer) String() string {
return lp.addr
}
// Read implements the io.Reader interface, adding deadlines
func (lp *lookupPeer) Read(data []byte) (int, error) {
lp.conn.SetReadDeadline(time.Now().Add(time.Second))
return lp.conn.Read(data)
}
// Write implements the io.Writer interface, adding deadlines
func (lp *lookupPeer) Write(data []byte) (int, error) {
lp.conn.SetWriteDeadline(time.Now().Add(time.Second))
return lp.conn.Write(data)
}
// Close implements the io.Closer interface
func (lp *lookupPeer) Close() error {
lp.state = stateDisconnected
if lp.conn != nil {
return lp.conn.Close()
}
return nil
}
// Command performs a round-trip for the specified Command.
//
// It will lazily connect to nsqlookupd and gracefully handle
// reconnecting in the event of a failure.
//
// It returns the response from nsqlookupd as []byte
func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
initialState := lp.state
if lp.state != stateConnected {
err := lp.Connect()
if err != nil {
return nil, err
}
lp.state = stateConnected
lp.Write(nsq.MagicV1)
if initialState == stateDisconnected {
lp.connectCallback(lp)
}
}
if cmd == nil {
return nil, nil
}
_, err := cmd.WriteTo(lp)
if err != nil {
lp.Close()
return nil, err
}
resp, err := readResponseBounded(lp, lp.maxBodySize)
if err != nil {
lp.Close()
return nil, err
}
return resp, nil
}
func readResponseBounded(r io.Reader, limit int64) ([]byte, error) {
var msgSize int32
// message size
err := binary.Read(r, binary.BigEndian, &msgSize)
if err != nil {
return nil, err
}
if int64(msgSize) > limit {
return nil, fmt.Errorf("response body size (%d) is greater than limit (%d)",
msgSize, limit)
}
// message binary data
buf := make([]byte, msgSize)
_, err = io.ReadFull(r, buf)
if err != nil {
return nil, err
}
return buf, nil
}