generated from ipfs/ipfs-repository-template
-
Notifications
You must be signed in to change notification settings - Fork 109
/
Copy pathpeermanager.go
248 lines (200 loc) · 6.39 KB
/
peermanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package peermanager
import (
"context"
"sync"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
)
var log = logging.Logger("bitswap/client/peermgr")
// PeerQueue provides a queue of messages to be sent for a single peer.
type PeerQueue interface {
AddBroadcastWantHaves([]cid.Cid)
AddWants([]cid.Cid, []cid.Cid)
AddCancels([]cid.Cid)
ResponseReceived(ks []cid.Cid)
Startup()
Shutdown()
}
type Session interface {
ID() uint64
SignalAvailability(peer.ID, bool)
}
// PeerQueueFactory provides a function that will create a PeerQueue.
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue
// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
// sync access to peerQueues and peerWantManager
pqLk sync.RWMutex
// peerQueues -- interact through internal utility functions get/set/remove/iterate
peerQueues map[peer.ID]PeerQueue
pwm *peerWantManager
createPeerQueue PeerQueueFactory
ctx context.Context
psLk sync.RWMutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}
self peer.ID
}
// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager {
wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge()
return &PeerManager{
peerQueues: make(map[peer.ID]PeerQueue),
pwm: newPeerWantManager(wantGauge, wantBlockGauge),
createPeerQueue: createPeerQueue,
ctx: ctx,
self: self,
sessions: make(map[uint64]Session),
peerSessions: make(map[peer.ID]map[uint64]struct{}),
}
}
func (pm *PeerManager) AvailablePeers() []peer.ID {
// TODO: Rate-limit peers
return pm.ConnectedPeers()
}
// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
pm.pqLk.RLock()
defer pm.pqLk.RUnlock()
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
peers = append(peers, p)
}
return peers
}
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
pq := pm.getOrCreate(p)
// Inform the peer want manager that there's a new peer
pm.pwm.addPeer(pq, p)
// Inform the sessions that the peer has connected
pm.signalAvailability(p, true)
}
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
pq, ok := pm.peerQueues[p]
if !ok {
return
}
// Inform the sessions that the peer has disconnected
pm.signalAvailability(p, false)
// Clean up the peer
delete(pm.peerQueues, p)
pq.Shutdown()
pm.pwm.removePeer(p)
}
// ResponseReceived is called when a message is received from the network.
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
func (pm *PeerManager) ResponseReceived(p peer.ID, ks []cid.Cid) {
pm.pqLk.RLock()
pq, ok := pm.peerQueues[p]
pm.pqLk.RUnlock()
if ok {
pq.ResponseReceived(ks)
}
}
// BroadcastWantHaves broadcasts want-haves to all peers (used by the session
// to discover seeds).
// For each peer it filters out want-haves that have previously been sent to
// the peer.
func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.Cid) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
pm.pwm.broadcastWantHaves(wantHaves)
}
// SendWants sends the given want-blocks and want-haves to the given peer.
// It filters out wants that have previously been sent to the peer.
func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
if _, ok := pm.peerQueues[p]; !ok {
return false
}
pm.pwm.sendWants(p, wantBlocks, wantHaves)
return true
}
// SendCancels sends cancels for the given keys to all peers who had previously
// received a want for those keys.
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
// Send a CANCEL to each peer that has been sent a want-block or want-have
pm.pwm.sendCancels(cancelKs)
}
// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
func (pm *PeerManager) CurrentWants() []cid.Cid {
pm.pqLk.RLock()
defer pm.pqLk.RUnlock()
return pm.pwm.getWants()
}
// CurrentWantBlocks returns the list of pending want-blocks
func (pm *PeerManager) CurrentWantBlocks() []cid.Cid {
pm.pqLk.RLock()
defer pm.pqLk.RUnlock()
return pm.pwm.getWantBlocks()
}
// CurrentWantHaves returns the list of pending want-haves
func (pm *PeerManager) CurrentWantHaves() []cid.Cid {
pm.pqLk.RLock()
defer pm.pqLk.RUnlock()
return pm.pwm.getWantHaves()
}
func (pm *PeerManager) getOrCreate(p peer.ID) PeerQueue {
pq, ok := pm.peerQueues[p]
if !ok {
pq = pm.createPeerQueue(pm.ctx, p)
pq.Startup()
pm.peerQueues[p] = pq
}
return pq
}
// RegisterSession tells the PeerManager that the given session is interested
// in events about the given peer.
func (pm *PeerManager) RegisterSession(p peer.ID, s Session) {
pm.psLk.Lock()
defer pm.psLk.Unlock()
if _, ok := pm.sessions[s.ID()]; !ok {
pm.sessions[s.ID()] = s
}
if _, ok := pm.peerSessions[p]; !ok {
pm.peerSessions[p] = make(map[uint64]struct{})
}
pm.peerSessions[p][s.ID()] = struct{}{}
}
// UnregisterSession tells the PeerManager that the given session is no longer
// interested in PeerManager events.
func (pm *PeerManager) UnregisterSession(ses uint64) {
pm.psLk.Lock()
defer pm.psLk.Unlock()
for p := range pm.peerSessions {
delete(pm.peerSessions[p], ses)
if len(pm.peerSessions[p]) == 0 {
delete(pm.peerSessions, p)
}
}
delete(pm.sessions, ses)
}
// signalAvailability is called when a peer's connectivity changes.
// It informs interested sessions.
func (pm *PeerManager) signalAvailability(p peer.ID, isConnected bool) {
pm.psLk.RLock()
defer pm.psLk.RUnlock()
sesIds, ok := pm.peerSessions[p]
if !ok {
return
}
for sesId := range sesIds {
if s, ok := pm.sessions[sesId]; ok {
s.SignalAvailability(p, isConnected)
}
}
}