From bfd6fe8e9f1d9e1ace617b1a390000614cf4f45e Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 15 Apr 2020 17:26:24 -0400 Subject: [PATCH 01/15] refactor: move connection management into networking layer --- internal/decision/engine.go | 21 +-- internal/decision/ledger.go | 4 - internal/messagequeue/messagequeue.go | 117 +++++---------- internal/peermanager/peermanager.go | 61 +++----- network/interface.go | 8 +- network/ipfs_impl.go | 197 +++++++++++++++++++++++--- testnet/virtual.go | 2 +- 7 files changed, 246 insertions(+), 164 deletions(-) diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 4a49c243..620bb868 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -745,32 +745,19 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) { func (e *Engine) PeerConnected(p peer.ID) { e.lock.Lock() defer e.lock.Unlock() - l, ok := e.ledgerMap[p] + + _, ok := e.ledgerMap[p] if !ok { - l = newLedger(p) - e.ledgerMap[p] = l + e.ledgerMap[p] = newLedger(p) } - - l.lk.Lock() - defer l.lk.Unlock() - l.ref++ } // PeerDisconnected is called when a peer disconnects. func (e *Engine) PeerDisconnected(p peer.ID) { e.lock.Lock() defer e.lock.Unlock() - l, ok := e.ledgerMap[p] - if !ok { - return - } - l.lk.Lock() - defer l.lk.Unlock() - l.ref-- - if l.ref <= 0 { - delete(e.ledgerMap, p) - } + delete(e.ledgerMap, p) } // If the want is a want-have, and it's below a certain size, send the full diff --git a/internal/decision/ledger.go b/internal/decision/ledger.go index 8f103bd4..87fedc45 100644 --- a/internal/decision/ledger.go +++ b/internal/decision/ledger.go @@ -43,10 +43,6 @@ type ledger struct { // wantList is a (bounded, small) set of keys that Partner desires. wantList *wl.Wantlist - // ref is the reference count for this ledger, its used to ensure we - // don't drop the reference to this ledger in multi-connection scenarios - ref int - lk sync.RWMutex } diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index d42db10d..b08834f3 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -25,7 +25,8 @@ const ( defaultRebroadcastInterval = 30 * time.Second // maxRetries is the number of times to attempt to send a message before // giving up - maxRetries = 10 + maxRetries = 3 + sendTimeout = 30 * time.Second // maxMessageSize is the maximum message size in bytes maxMessageSize = 1024 * 1024 * 2 // sendErrorBackoff is the time to wait before retrying to connect after @@ -46,7 +47,7 @@ const ( // sender. type MessageNetwork interface { ConnectTo(context.Context, peer.ID) error - NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) + NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) Latency(peer.ID) time.Duration Ping(context.Context, peer.ID) ping.Result Self() peer.ID @@ -409,12 +410,11 @@ func (mq *MessageQueue) sendIfReady() { } func (mq *MessageQueue) sendMessage() { - err := mq.initializeSender() + sender, err := mq.initializeSender() if err != nil { - log.Infof("cant open message sender to peer %s: %s", mq.p, err) - // TODO: cant connect, what now? - // TODO: should we stop using this connection and clear the want list - // to avoid using up memory? + // If we fail to initialize the sender, the networking layer will + // emit a Disconnect event and the MessageQueue will get cleaned up + log.Infof("Could not open message sender to peer %s: %s", mq.p, err) return } @@ -435,23 +435,24 @@ func (mq *MessageQueue) sendMessage() { wantlist := message.Wantlist() mq.logOutgoingMessage(wantlist) - // Try to send this message repeatedly - for i := 0; i < maxRetries; i++ { - if mq.attemptSendAndRecovery(message) { - // We were able to send successfully. - onSent() + if err := sender.SendMsg(mq.ctx, message); err != nil { + // If the message couldn't be sent, the networking layer will + // emit a Disconnect event and the MessageQueue will get cleaned up + log.Infof("Could not send message to peer %s: %s", mq.p, err) + return + } - mq.simulateDontHaveWithTimeout(wantlist) + // We were able to send successfully. + onSent() - // If the message was too big and only a subset of wants could be - // sent, schedule sending the rest of the wants in the next - // iteration of the event loop. - if mq.hasPendingWork() { - mq.signalWorkReady() - } + // Set a timer to wait for responses + mq.simulateDontHaveWithTimeout(wantlist) - return - } + // If the message was too big and only a subset of wants could be + // sent, schedule sending the rest of the wants in the next + // iteration of the event loop. + if mq.hasPendingWork() { + mq.signalWorkReady() } } @@ -620,69 +621,19 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap return mq.msg, onMessageSent } -func (mq *MessageQueue) initializeSender() error { - if mq.sender != nil { - return nil - } - nsender, err := openSender(mq.ctx, mq.network, mq.p) - if err != nil { - return err - } - mq.sender = nsender - return nil -} - -func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool { - err := mq.sender.SendMsg(mq.ctx, message) - if err == nil { - return true - } - - log.Infof("bitswap send error: %s", err) - _ = mq.sender.Reset() - mq.sender = nil - - select { - case <-mq.done: - return true - case <-mq.ctx.Done(): - return true - case <-time.After(mq.sendErrorBackoff): - // wait 100ms in case disconnect notifications are still propagating - log.Warn("SendMsg errored but neither 'done' nor context.Done() were set") - } - - err = mq.initializeSender() - if err != nil { - log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err) - return true - } - - // TODO: Is this the same instance for the remote peer? - // If its not, we should resend our entire wantlist to them - /* - if mq.sender.InstanceID() != mq.lastSeenInstanceID { - wlm = mq.getFullWantlistMessage() +func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) { + if mq.sender == nil { + opts := &bsnet.MessageSenderOpts{ + MaxRetries: maxRetries, + SendTimeout: sendTimeout, + SendErrorBackoff: sendErrorBackoff, + } + nsender, err := mq.network.NewMessageSender(mq.ctx, mq.p, opts) + if err != nil { + return nil, err } - */ - return false -} - -func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) { - // allow ten minutes for connections this includes looking them up in the - // dht dialing them, and handshaking - conctx, cancel := context.WithTimeout(ctx, time.Minute*10) - defer cancel() - - err := network.ConnectTo(conctx, p) - if err != nil { - return nil, err - } - nsender, err := network.NewMessageSender(ctx, p) - if err != nil { - return nil, err + mq.sender = nsender } - - return nsender, nil + return mq.sender, nil } diff --git a/internal/peermanager/peermanager.go b/internal/peermanager/peermanager.go index c2159b19..0cf8b2e3 100644 --- a/internal/peermanager/peermanager.go +++ b/internal/peermanager/peermanager.go @@ -30,17 +30,12 @@ type Session interface { // PeerQueueFactory provides a function that will create a PeerQueue. type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue -type peerQueueInstance struct { - refcnt int - pq PeerQueue -} - // PeerManager manages a pool of peers and sends messages to peers in the pool. type PeerManager struct { // sync access to peerQueues and peerWantManager pqLk sync.RWMutex // peerQueues -- interact through internal utility functions get/set/remove/iterate - peerQueues map[peer.ID]*peerQueueInstance + peerQueues map[peer.ID]PeerQueue pwm *peerWantManager createPeerQueue PeerQueueFactory @@ -57,7 +52,7 @@ type PeerManager struct { func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager { wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge() return &PeerManager{ - peerQueues: make(map[peer.ID]*peerQueueInstance), + peerQueues: make(map[peer.ID]PeerQueue), pwm: newPeerWantManager(wantGauge), createPeerQueue: createPeerQueue, ctx: ctx, @@ -92,19 +87,15 @@ func (pm *PeerManager) Connected(p peer.ID, initialWantHaves []cid.Cid) { defer pm.pqLk.Unlock() pq := pm.getOrCreate(p) - pq.refcnt++ - - // If this is the first connection to the peer - if pq.refcnt == 1 { - // Inform the peer want manager that there's a new peer - pm.pwm.addPeer(p) - // Record that the want-haves are being sent to the peer - _, wantHaves := pm.pwm.prepareSendWants(p, nil, initialWantHaves) - // Broadcast any live want-haves to the newly connected peers - pq.pq.AddBroadcastWantHaves(wantHaves) - // Inform the sessions that the peer has connected - pm.signalAvailability(p, true) - } + + // Inform the peer want manager that there's a new peer + pm.pwm.addPeer(p) + // Record that the want-haves are being sent to the peer + _, wantHaves := pm.pwm.prepareSendWants(p, nil, initialWantHaves) + // Broadcast any live want-haves to the newly connected peers + pq.AddBroadcastWantHaves(wantHaves) + // Inform the sessions that the peer has connected + pm.signalAvailability(p, true) } // Disconnected is called to remove a peer from the pool. @@ -118,17 +109,12 @@ func (pm *PeerManager) Disconnected(p peer.ID) { return } - pq.refcnt-- - if pq.refcnt > 0 { - return - } - // Inform the sessions that the peer has disconnected pm.signalAvailability(p, false) // Clean up the peer delete(pm.peerQueues, p) - pq.pq.Shutdown() + pq.Shutdown() pm.pwm.removePeer(p) } @@ -141,8 +127,8 @@ func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.C defer pm.pqLk.Unlock() for p, ks := range pm.pwm.prepareBroadcastWantHaves(wantHaves) { - if pqi, ok := pm.peerQueues[p]; ok { - pqi.pq.AddBroadcastWantHaves(ks) + if pq, ok := pm.peerQueues[p]; ok { + pq.AddBroadcastWantHaves(ks) } } } @@ -153,9 +139,9 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci pm.pqLk.Lock() defer pm.pqLk.Unlock() - if pqi, ok := pm.peerQueues[p]; ok { + if pq, ok := pm.peerQueues[p]; ok { wblks, whvs := pm.pwm.prepareSendWants(p, wantBlocks, wantHaves) - pqi.pq.AddWants(wblks, whvs) + pq.AddWants(wblks, whvs) } } @@ -167,8 +153,8 @@ func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) { // Send a CANCEL to each peer that has been sent a want-block or want-have for p, ks := range pm.pwm.prepareSendCancels(cancelKs) { - if pqi, ok := pm.peerQueues[p]; ok { - pqi.pq.AddCancels(ks) + if pq, ok := pm.peerQueues[p]; ok { + pq.AddCancels(ks) } } } @@ -197,15 +183,14 @@ func (pm *PeerManager) CurrentWantHaves() []cid.Cid { return pm.pwm.getWantHaves() } -func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance { - pqi, ok := pm.peerQueues[p] +func (pm *PeerManager) getOrCreate(p peer.ID) PeerQueue { + pq, ok := pm.peerQueues[p] if !ok { - pq := pm.createPeerQueue(pm.ctx, p) + pq = pm.createPeerQueue(pm.ctx, p) pq.Startup() - pqi = &peerQueueInstance{0, pq} - pm.peerQueues[p] = pqi + pm.peerQueues[p] = pq } - return pqi + return pq } // RegisterSession tells the PeerManager that the given session is interested diff --git a/network/interface.go b/network/interface.go index 6b2878e3..a350d525 100644 --- a/network/interface.go +++ b/network/interface.go @@ -42,7 +42,7 @@ type BitSwapNetwork interface { ConnectTo(context.Context, peer.ID) error DisconnectFrom(context.Context, peer.ID) error - NewMessageSender(context.Context, peer.ID) (MessageSender, error) + NewMessageSender(context.Context, peer.ID, *MessageSenderOpts) (MessageSender, error) ConnectionManager() connmgr.ConnManager @@ -63,6 +63,12 @@ type MessageSender interface { SupportsHave() bool } +type MessageSenderOpts struct { + MaxRetries int + SendTimeout time.Duration + SendErrorBackoff time.Duration +} + // Receiver is an interface that can receive messages from the BitSwapNetwork. type Receiver interface { ReceiveMessage( diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index b5661408..d626ad03 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sync" "sync/atomic" "time" @@ -43,6 +44,8 @@ func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) B supportedProtocols: s.SupportedProtocols, } + bitswapNetwork.connectEvtMgr = newConnectEventManager(&bitswapNetwork) + return &bitswapNetwork } @@ -71,8 +74,9 @@ type impl struct { // alignment. stats Stats - host host.Host - routing routing.ContentRouting + host host.Host + routing routing.ContentRouting + connectEvtMgr *connectEventManager protocolBitswapNoVers protocol.ID protocolBitswapOneZero protocol.ID @@ -86,24 +90,93 @@ type impl struct { } type streamMessageSender struct { - s network.Stream - bsnet *impl + to peer.ID + stream network.Stream + bsnet *impl + opts *MessageSenderOpts } -func (s *streamMessageSender) Close() error { - return helpers.FullClose(s.s) +func (s *streamMessageSender) Connect(ctx context.Context) (stream network.Stream, err error) { + defer func() { + if err != nil { + s.bsnet.connectEvtMgr.MarkUnresponsive(s.to) + } + }() + + if s.stream != nil { + return s.stream, nil + } + + if err = s.bsnet.ConnectTo(ctx, s.to); err != nil { + return nil, err + } + + stream, err = s.bsnet.newStreamToPeer(ctx, s.to) + if err != nil { + s.stream = stream + return s.stream, nil + } + return nil, err } func (s *streamMessageSender) Reset() error { - return s.s.Reset() + err := s.stream.Reset() + s.stream = nil + return err } -func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { - return s.bsnet.msgToStream(ctx, s.s, msg) +func (s *streamMessageSender) Close() error { + return helpers.FullClose(s.stream) } func (s *streamMessageSender) SupportsHave() bool { - return s.bsnet.SupportsHave(s.s.Protocol()) + return s.bsnet.SupportsHave(s.stream.Protocol()) +} + +func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { + // Try to send the message repeatedly + var err error + for i := 0; i < s.opts.MaxRetries; i++ { + if err = s.attemptSend(ctx, msg); err == nil { + // Sent successfully + return nil + } + + // Failed to send so reset stream and try again + _ = s.Reset() + + if i == s.opts.MaxRetries { + s.bsnet.connectEvtMgr.MarkUnresponsive(s.to) + return err + } + + select { + case <-ctx.Done(): + return nil + case <-time.After(s.opts.SendErrorBackoff): + // wait a short time in case disconnect notifications are still propagating + log.Infof("send message to %s failed but context was not Done: %s", s.to, err) + } + } + return err +} + +func (s *streamMessageSender) attemptSend(ctx context.Context, msg bsmsg.BitSwapMessage) error { + sndctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout) + defer cancel() + + stream, err := s.Connect(sndctx) + if err != nil { + log.Infof("failed to open stream to %s: %s", s.to, err) + return err + } + + if err = s.bsnet.msgToStream(sndctx, stream, msg); err != nil { + log.Infof("failed to send message to %s: %s", s.to, err) + return err + } + + return nil } func (bsnet *impl) Self() peer.ID { @@ -164,17 +237,21 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg. return nil } -func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) { - s, err := bsnet.newStreamToPeer(ctx, p) - if err != nil { - return nil, err +func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) { + sender := &streamMessageSender{ + to: p, + bsnet: bsnet, + opts: opts, } - return &streamMessageSender{s: s, bsnet: bsnet}, nil -} + conctx, cancel := context.WithTimeout(ctx, sender.opts.SendTimeout) + defer cancel() -func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) { - return bsnet.host.NewStream(ctx, p, bsnet.supportedProtocols...) + _, err := sender.Connect(conctx) + if err != nil { + return nil, err + } + return sender, nil } func (bsnet *impl) SendMessage( @@ -197,7 +274,10 @@ func (bsnet *impl) SendMessage( //nolint go helpers.AwaitEOF(s) return s.Close() +} +func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) { + return bsnet.host.NewStream(ctx, p, bsnet.supportedProtocols...) } func (bsnet *impl) SetDelegate(r Receiver) { @@ -268,6 +348,7 @@ func (bsnet *impl) handleNewStream(s network.Stream) { p := s.Conn().RemotePeer() ctx := context.Background() log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) + bsnet.connectEvtMgr.OnMessage(s.Conn().RemotePeer()) bsnet.receiver.ReceiveMessage(ctx, p, received) atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1) } @@ -284,6 +365,82 @@ func (bsnet *impl) Stats() Stats { } } +type connectEventManager struct { + bsnet *impl + lk sync.Mutex + conns map[peer.ID]*connState +} + +type connState struct { + refs int + responsive bool +} + +func newConnectEventManager(bsnet *impl) *connectEventManager { + return &connectEventManager{ + bsnet: bsnet, + conns: make(map[peer.ID]*connState), + } +} + +func (c *connectEventManager) Connected(p peer.ID) { + c.lk.Lock() + defer c.lk.Unlock() + + state, ok := c.conns[p] + if !ok { + state = &connState{responsive: true} + } + state.refs++ + + if state.refs == 1 && state.responsive { + c.bsnet.receiver.PeerConnected(p) + } +} + +func (c *connectEventManager) Disconnected(p peer.ID) { + c.lk.Lock() + defer c.lk.Unlock() + + state, ok := c.conns[p] + if !ok { + // Should never happen + return + } + state.refs-- + c.conns[p] = state + + if state.refs == 0 && state.responsive { + c.bsnet.receiver.PeerDisconnected(p) + } +} + +func (c *connectEventManager) MarkUnresponsive(p peer.ID) { + c.lk.Lock() + defer c.lk.Unlock() + + state, ok := c.conns[p] + if !ok { + return + } + state.responsive = false + c.conns[p] = state + + c.bsnet.receiver.PeerDisconnected(p) +} + +func (c *connectEventManager) OnMessage(p peer.ID) { + c.lk.Lock() + defer c.lk.Unlock() + + state, ok := c.conns[p] + if ok && !state.responsive { + state.responsive = true + c.conns[p] = state + c.bsnet.receiver.PeerConnected(p) + } +} + type netNotifiee impl func (nn *netNotifiee) impl() *impl { @@ -291,10 +448,10 @@ func (nn *netNotifiee) impl() *impl { } func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { - nn.impl().receiver.PeerConnected(v.RemotePeer()) + nn.impl().connectEvtMgr.Connected(v.RemotePeer()) } func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) { - nn.impl().receiver.PeerDisconnected(v.RemotePeer()) + nn.impl().connectEvtMgr.Disconnected(v.RemotePeer()) } func (nn *netNotifiee) OpenedStream(n network.Network, s network.Stream) {} func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {} diff --git a/testnet/virtual.go b/testnet/virtual.go index 1e472110..c44b430d 100644 --- a/testnet/virtual.go +++ b/testnet/virtual.go @@ -284,7 +284,7 @@ func (mp *messagePasser) SupportsHave() bool { return false } -func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) { +func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) { return &messagePasser{ net: nc, target: p, From b097d7027049ac57d2a503fc3047edea0d4128d9 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 15 Apr 2020 17:36:42 -0400 Subject: [PATCH 02/15] fix: stop sender when message queue shut down --- network/ipfs_impl.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index d626ad03..8a02fcea 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -94,6 +94,7 @@ type streamMessageSender struct { stream network.Stream bsnet *impl opts *MessageSenderOpts + done chan struct{} } func (s *streamMessageSender) Connect(ctx context.Context) (stream network.Stream, err error) { @@ -126,6 +127,7 @@ func (s *streamMessageSender) Reset() error { } func (s *streamMessageSender) Close() error { + close(s.done) return helpers.FullClose(s.stream) } @@ -142,6 +144,15 @@ func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess return nil } + // If the sender has been closed or the context cancelled, just bail out + select { + case <-ctx.Done(): + return nil + case <-s.done: + return nil + default: + } + // Failed to send so reset stream and try again _ = s.Reset() @@ -153,6 +164,8 @@ func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess select { case <-ctx.Done(): return nil + case <-s.done: + return nil case <-time.After(s.opts.SendErrorBackoff): // wait a short time in case disconnect notifications are still propagating log.Infof("send message to %s failed but context was not Done: %s", s.to, err) @@ -242,6 +255,7 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag to: p, bsnet: bsnet, opts: opts, + done: make(chan struct{}), } conctx, cancel := context.WithTimeout(ctx, sender.opts.SendTimeout) From c1922c0d987d6df209d7afd613aa76ece93ebf4d Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 15 Apr 2020 18:00:09 -0400 Subject: [PATCH 03/15] fix: tests --- internal/messagequeue/messagequeue_test.go | 153 ++------------------- internal/peermanager/peermanager_test.go | 11 +- network/ipfs_impl.go | 2 +- network/ipfs_impl_test.go | 8 +- 4 files changed, 25 insertions(+), 149 deletions(-) diff --git a/internal/messagequeue/messagequeue_test.go b/internal/messagequeue/messagequeue_test.go index 49c1033d..38ffafa2 100644 --- a/internal/messagequeue/messagequeue_test.go +++ b/internal/messagequeue/messagequeue_test.go @@ -2,7 +2,6 @@ package messagequeue import ( "context" - "errors" "fmt" "math" "math/rand" @@ -31,7 +30,7 @@ func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error { return fmn.connectError } -func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) { +func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) { if fmn.messageSenderError == nil { return fmn.messageSender, nil } @@ -83,23 +82,19 @@ func (fp *fakeDontHaveTimeoutMgr) pendingCount() int { type fakeMessageSender struct { lk sync.Mutex - sendError error fullClosed chan<- struct{} reset chan<- struct{} messagesSent chan<- []bsmsg.Entry - sendErrors chan<- error supportsHave bool } -func newFakeMessageSender(sendError error, fullClosed chan<- struct{}, reset chan<- struct{}, - messagesSent chan<- []bsmsg.Entry, sendErrors chan<- error, supportsHave bool) *fakeMessageSender { +func newFakeMessageSender(fullClosed chan<- struct{}, reset chan<- struct{}, + messagesSent chan<- []bsmsg.Entry, supportsHave bool) *fakeMessageSender { return &fakeMessageSender{ - sendError: sendError, fullClosed: fullClosed, reset: reset, messagesSent: messagesSent, - sendErrors: sendErrors, supportsHave: supportsHave, } } @@ -108,19 +103,9 @@ func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess fms.lk.Lock() defer fms.lk.Unlock() - if fms.sendError != nil { - fms.sendErrors <- fms.sendError - return fms.sendError - } fms.messagesSent <- msg.Wantlist() return nil } -func (fms *fakeMessageSender) clearSendError() { - fms.lk.Lock() - defer fms.lk.Unlock() - - fms.sendError = nil -} func (fms *fakeMessageSender) Close() error { fms.fullClosed <- struct{}{}; return nil } func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; return nil } func (fms *fakeMessageSender) SupportsHave() bool { return fms.supportsHave } @@ -155,10 +140,9 @@ func totalEntriesLength(messages [][]bsmsg.Entry) int { func TestStartupAndShutdown(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -197,10 +181,9 @@ func TestStartupAndShutdown(t *testing.T) { func TestSendingMessagesDeduped(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -220,10 +203,9 @@ func TestSendingMessagesDeduped(t *testing.T) { func TestSendingMessagesPartialDupe(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -243,10 +225,9 @@ func TestSendingMessagesPartialDupe(t *testing.T) { func TestSendingMessagesPriority(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -312,10 +293,9 @@ func TestSendingMessagesPriority(t *testing.T) { func TestCancelOverridesPendingWants(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -364,10 +344,9 @@ func TestCancelOverridesPendingWants(t *testing.T) { func TestWantOverridesPendingCancels(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -412,10 +391,9 @@ func TestWantOverridesPendingCancels(t *testing.T) { func TestWantlistRebroadcast(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -509,10 +487,9 @@ func TestWantlistRebroadcast(t *testing.T) { func TestSendingLargeMessages(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] @@ -540,10 +517,9 @@ func TestSendingLargeMessages(t *testing.T) { func TestSendToPeerThatDoesntSupportHave(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, false) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, false) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] @@ -596,10 +572,9 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, false) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, false) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] @@ -626,105 +601,6 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { } } -func TestResendAfterError(t *testing.T) { - ctx := context.Background() - messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) - resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) - fakenet := &fakeMessageNetwork{nil, nil, fakeSender} - dhtm := &fakeDontHaveTimeoutMgr{} - peerID := testutil.GeneratePeers(1)[0] - sendErrBackoff := 5 * time.Millisecond - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm) - wantBlocks := testutil.GenerateCids(10) - wantHaves := testutil.GenerateCids(10) - - messageQueue.Startup() - - var errs []error - go func() { - // After the first error is received, clear sendError so that - // subsequent sends will not error - errs = append(errs, <-sendErrors) - fakeSender.clearSendError() - }() - - // Make the first send error out - fakeSender.sendError = errors.New("send err") - messageQueue.AddWants(wantBlocks, wantHaves) - messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond) - - if len(errs) != 1 { - t.Fatal("Expected first send to error") - } - - if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) { - t.Fatal("Expected subsequent send to succeed") - } -} - -func TestResendAfterMaxRetries(t *testing.T) { - ctx := context.Background() - messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) - resetChan := make(chan struct{}, maxRetries*2) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) - fakenet := &fakeMessageNetwork{nil, nil, fakeSender} - dhtm := &fakeDontHaveTimeoutMgr{} - peerID := testutil.GeneratePeers(1)[0] - sendErrBackoff := 2 * time.Millisecond - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm) - wantBlocks := testutil.GenerateCids(10) - wantHaves := testutil.GenerateCids(10) - wantBlocks2 := testutil.GenerateCids(10) - wantHaves2 := testutil.GenerateCids(10) - - messageQueue.Startup() - - var lk sync.Mutex - var errs []error - go func() { - lk.Lock() - defer lk.Unlock() - for len(errs) < maxRetries { - err := <-sendErrors - errs = append(errs, err) - } - }() - - // Make the first group of send attempts error out - fakeSender.sendError = errors.New("send err") - messageQueue.AddWants(wantBlocks, wantHaves) - messages := collectMessages(ctx, t, messagesSent, 50*time.Millisecond) - - lk.Lock() - errCount := len(errs) - lk.Unlock() - if errCount != maxRetries { - t.Fatal("Expected maxRetries errors, got", len(errs)) - } - - // No successful send after max retries, so expect no messages sent - if totalEntriesLength(messages) != 0 { - t.Fatal("Expected no messages") - } - - // Clear sendError so that subsequent sends will not error - fakeSender.clearSendError() - - // Add a new batch of wants - messageQueue.AddWants(wantBlocks2, wantHaves2) - messages = collectMessages(ctx, t, messagesSent, 10*time.Millisecond) - - // All wants from previous and new send should be sent - if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks)+len(wantHaves2)+len(wantBlocks2) { - t.Fatal("Expected subsequent send to send first and second batches of wants") - } -} - func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) { var wbs []cid.Cid var whs []cid.Cid @@ -747,10 +623,9 @@ func BenchmarkMessageQueue(b *testing.B) { createQueue := func() *MessageQueue { messagesSent := make(chan []bsmsg.Entry) - sendErrors := make(chan error) resetChan := make(chan struct{}, 1) fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) + fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] diff --git a/internal/peermanager/peermanager_test.go b/internal/peermanager/peermanager_test.go index 0305b9f9..f979b2c8 100644 --- a/internal/peermanager/peermanager_test.go +++ b/internal/peermanager/peermanager_test.go @@ -99,7 +99,7 @@ func TestAddingAndRemovingPeers(t *testing.T) { t.Fatal("Peers connected that shouldn't be connected") } - // removing a peer with only one reference + // disconnect a peer peerManager.Disconnected(peer1) connectedPeers = peerManager.ConnectedPeers() @@ -107,13 +107,12 @@ func TestAddingAndRemovingPeers(t *testing.T) { t.Fatal("Peer should have been disconnected but was not") } - // connecting a peer twice, then disconnecting once, should stay in queue - peerManager.Connected(peer2, nil) - peerManager.Disconnected(peer2) + // reconnect peer + peerManager.Connected(peer1, nil) connectedPeers = peerManager.ConnectedPeers() - if !testutil.ContainsPeer(connectedPeers, peer2) { - t.Fatal("Peer was disconnected but should not have been") + if !testutil.ContainsPeer(connectedPeers, peer1) { + t.Fatal("Peer should have been connected but was not") } } diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 8a02fcea..7ca07dac 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -113,7 +113,7 @@ func (s *streamMessageSender) Connect(ctx context.Context) (stream network.Strea } stream, err = s.bsnet.newStreamToPeer(ctx, s.to) - if err != nil { + if err == nil { s.stream = stream return s.stream, nil } diff --git a/network/ipfs_impl_test.go b/network/ipfs_impl_test.go index 5e0f512b..96e14b99 100644 --- a/network/ipfs_impl_test.go +++ b/network/ipfs_impl_test.go @@ -5,10 +5,10 @@ import ( "testing" "time" - tn "github.com/ipfs/go-bitswap/testnet" bsmsg "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" bsnet "github.com/ipfs/go-bitswap/network" + tn "github.com/ipfs/go-bitswap/testnet" blocksutil "github.com/ipfs/go-ipfs-blocksutil" mockrouting "github.com/ipfs/go-ipfs-routing/mock" @@ -170,7 +170,7 @@ func TestSupportsHave(t *testing.T) { mr := mockrouting.NewServer() streamNet, err := tn.StreamNet(ctx, mn, mr) if err != nil { - t.Fatal("Unable to setup network") + t.Fatalf("Unable to setup network: %s", err) } type testCase struct { @@ -199,7 +199,9 @@ func TestSupportsHave(t *testing.T) { t.Fatal(err) } - senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID()) + senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ + SendTimeout: time.Second, + }) if err != nil { t.Fatal(err) } From ba4b52e7beb452c78df69dbf9c77d0fc0fa7ce5b Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 16 Apr 2020 10:27:27 -0400 Subject: [PATCH 04/15] fix: don't hang on to disconnected peer refs --- network/ipfs_impl.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 7ca07dac..45316010 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -424,8 +424,11 @@ func (c *connectEventManager) Disconnected(p peer.ID) { state.refs-- c.conns[p] = state - if state.refs == 0 && state.responsive { - c.bsnet.receiver.PeerDisconnected(p) + if state.refs == 0 { + if state.responsive { + c.bsnet.receiver.PeerDisconnected(p) + } + delete(c.conns, p) } } From 189564eddc7650b7d715bb6a0d4885e5de1908ae Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 16 Apr 2020 10:36:37 -0400 Subject: [PATCH 05/15] fix: shutdown message queue when there's a send error --- internal/messagequeue/messagequeue.go | 4 ++++ network/ipfs_impl.go | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index b08834f3..c45a355c 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -359,6 +359,8 @@ func (mq *MessageQueue) runQueue() { return case <-mq.ctx.Done(): if mq.sender != nil { + // TODO: should I call sender.Close() here also to stop + // and in progress connection? _ = mq.sender.Reset() } return @@ -415,6 +417,7 @@ func (mq *MessageQueue) sendMessage() { // If we fail to initialize the sender, the networking layer will // emit a Disconnect event and the MessageQueue will get cleaned up log.Infof("Could not open message sender to peer %s: %s", mq.p, err) + mq.Shutdown() return } @@ -439,6 +442,7 @@ func (mq *MessageQueue) sendMessage() { // If the message couldn't be sent, the networking layer will // emit a Disconnect event and the MessageQueue will get cleaned up log.Infof("Could not send message to peer %s: %s", mq.p, err) + mq.Shutdown() return } diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 45316010..bea3d6b0 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -112,6 +112,13 @@ func (s *streamMessageSender) Connect(ctx context.Context) (stream network.Strea return nil, err } + // Check if the sender has been closed + select { + case <-s.done: + return nil, nil + default: + } + stream, err = s.bsnet.newStreamToPeer(ctx, s.to) if err == nil { s.stream = stream From 37301bc32bee6fcade2267d7c34d3115158acc9e Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 16 Apr 2020 11:13:42 -0400 Subject: [PATCH 06/15] refactor: extract Connection Event Manager to own file and add tests --- network/connecteventmanager.go | 92 ++++++++++++++++++ network/connecteventmanager_test.go | 144 ++++++++++++++++++++++++++++ network/ipfs_impl.go | 82 +--------------- 3 files changed, 237 insertions(+), 81 deletions(-) create mode 100644 network/connecteventmanager.go create mode 100644 network/connecteventmanager_test.go diff --git a/network/connecteventmanager.go b/network/connecteventmanager.go new file mode 100644 index 00000000..100b6f96 --- /dev/null +++ b/network/connecteventmanager.go @@ -0,0 +1,92 @@ +package network + +import ( + "sync" + + "github.com/libp2p/go-libp2p-core/peer" +) + +type ConnectionListener interface { + PeerConnected(peer.ID) + PeerDisconnected(peer.ID) +} + +type connectEventManager struct { + connListener ConnectionListener + lk sync.Mutex + conns map[peer.ID]*connState +} + +type connState struct { + refs int + responsive bool +} + +func newConnectEventManager(connListener ConnectionListener) *connectEventManager { + return &connectEventManager{ + connListener: connListener, + conns: make(map[peer.ID]*connState), + } +} + +func (c *connectEventManager) Connected(p peer.ID) { + c.lk.Lock() + defer c.lk.Unlock() + + state, ok := c.conns[p] + if !ok { + state = &connState{responsive: true} + c.conns[p] = state + } + state.refs++ + + if state.refs == 1 && state.responsive { + c.connListener.PeerConnected(p) + } +} + +func (c *connectEventManager) Disconnected(p peer.ID) { + c.lk.Lock() + defer c.lk.Unlock() + + state, ok := c.conns[p] + if !ok { + // Should never happen + return + } + state.refs-- + c.conns[p] = state + + if state.refs == 0 { + if state.responsive { + c.connListener.PeerDisconnected(p) + } + delete(c.conns, p) + } +} + +func (c *connectEventManager) MarkUnresponsive(p peer.ID) { + c.lk.Lock() + defer c.lk.Unlock() + + state, ok := c.conns[p] + if !ok { + return + } + state.responsive = false + c.conns[p] = state + + c.connListener.PeerDisconnected(p) +} + +func (c *connectEventManager) OnMessage(p peer.ID) { + c.lk.Lock() + defer c.lk.Unlock() + + state, ok := c.conns[p] + if ok && !state.responsive { + state.responsive = true + c.conns[p] = state + c.connListener.PeerConnected(p) + } +} diff --git a/network/connecteventmanager_test.go b/network/connecteventmanager_test.go new file mode 100644 index 00000000..fb81abee --- /dev/null +++ b/network/connecteventmanager_test.go @@ -0,0 +1,144 @@ +package network + +import ( + "testing" + + "github.com/ipfs/go-bitswap/internal/testutil" + "github.com/libp2p/go-libp2p-core/peer" +) + +type mockConnListener struct { + conns map[peer.ID]int +} + +func newMockConnListener() *mockConnListener { + return &mockConnListener{ + conns: make(map[peer.ID]int), + } +} + +func (cl *mockConnListener) PeerConnected(p peer.ID) { + cl.conns[p]++ +} + +func (cl *mockConnListener) PeerDisconnected(p peer.ID) { + cl.conns[p]-- +} + +func TestConnectEventManagerConnectionCount(t *testing.T) { + connListener := newMockConnListener() + peers := testutil.GeneratePeers(2) + cem := newConnectEventManager(connListener) + + // Peer A: 1 Connection + cem.Connected(peers[0]) + if connListener.conns[peers[0]] != 1 { + t.Fatal("Expected Connected event") + } + + // Peer A: 2 Connections + cem.Connected(peers[0]) + if connListener.conns[peers[0]] != 1 { + t.Fatal("Unexpected no Connected event for the same peer") + } + + // Peer A: 2 Connections + // Peer B: 1 Connection + cem.Connected(peers[1]) + if connListener.conns[peers[1]] != 1 { + t.Fatal("Expected Connected event") + } + + // Peer A: 2 Connections + // Peer B: 0 Connections + cem.Disconnected(peers[1]) + if connListener.conns[peers[1]] != 0 { + t.Fatal("Expected Disconnected event") + } + + // Peer A: 1 Connection + // Peer B: 0 Connections + cem.Disconnected(peers[0]) + if connListener.conns[peers[0]] != 1 { + t.Fatal("Expected no Disconnected event for peer with one remaining conn") + } + + // Peer A: 0 Connections + // Peer B: 0 Connections + cem.Disconnected(peers[0]) + if connListener.conns[peers[0]] != 0 { + t.Fatal("Expected Disconnected event") + } +} + +func TestConnectEventManagerMarkUnresponsive(t *testing.T) { + connListener := newMockConnListener() + p := testutil.GeneratePeers(1)[0] + cem := newConnectEventManager(connListener) + + // Peer A: 1 Connection + cem.Connected(p) + if connListener.conns[p] != 1 { + t.Fatal("Expected Connected event") + } + + // Peer A: 1 Connection + cem.MarkUnresponsive(p) + if connListener.conns[p] != 0 { + t.Fatal("Expected Disconnected event") + } + + // Peer A: 2 Connections + cem.Connected(p) + if connListener.conns[p] != 0 { + t.Fatal("Expected no Connected event for unresponsive peer") + } + + // Peer A: 2 Connections + cem.OnMessage(p) + if connListener.conns[p] != 1 { + t.Fatal("Expected Connected event for newly responsive peer") + } + + // Peer A: 2 Connections + cem.OnMessage(p) + if connListener.conns[p] != 1 { + t.Fatal("Expected no further Connected event for subsequent messages") + } + + // Peer A: 1 Connection + cem.Disconnected(p) + if connListener.conns[p] != 1 { + t.Fatal("Expected no Disconnected event for peer with one remaining conn") + } + + // Peer A: 0 Connections + cem.Disconnected(p) + if connListener.conns[p] != 0 { + t.Fatal("Expected Disconnected event") + } +} + +func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) { + connListener := newMockConnListener() + p := testutil.GeneratePeers(1)[0] + cem := newConnectEventManager(connListener) + + // Peer A: 1 Connection + cem.Connected(p) + if connListener.conns[p] != 1 { + t.Fatal("Expected Connected event") + } + + // Peer A: 1 Connection + cem.MarkUnresponsive(p) + if connListener.conns[p] != 0 { + t.Fatal("Expected Disconnected event") + } + + // Peer A: 0 Connections + cem.Disconnected(p) + if connListener.conns[p] != 0 { + t.Fatal("Expected not to receive a second Disconnected event") + } +} diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index bea3d6b0..acf60525 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "sync" "sync/atomic" "time" @@ -44,7 +43,6 @@ func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) B supportedProtocols: s.SupportedProtocols, } - bitswapNetwork.connectEvtMgr = newConnectEventManager(&bitswapNetwork) return &bitswapNetwork } @@ -303,6 +301,7 @@ func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stre func (bsnet *impl) SetDelegate(r Receiver) { bsnet.receiver = r + bsnet.connectEvtMgr = newConnectEventManager(r) for _, proto := range bsnet.supportedProtocols { bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream) } @@ -386,85 +385,6 @@ func (bsnet *impl) Stats() Stats { } } -type connectEventManager struct { - bsnet *impl - lk sync.Mutex - conns map[peer.ID]*connState -} - -type connState struct { - refs int - responsive bool -} - -func newConnectEventManager(bsnet *impl) *connectEventManager { - return &connectEventManager{ - bsnet: bsnet, - conns: make(map[peer.ID]*connState), - } -} - -func (c *connectEventManager) Connected(p peer.ID) { - c.lk.Lock() - defer c.lk.Unlock() - - state, ok := c.conns[p] - if !ok { - state = &connState{responsive: true} - } - state.refs++ - - if state.refs == 1 && state.responsive { - c.bsnet.receiver.PeerConnected(p) - } -} - -func (c *connectEventManager) Disconnected(p peer.ID) { - c.lk.Lock() - defer c.lk.Unlock() - - state, ok := c.conns[p] - if !ok { - // Should never happen - return - } - state.refs-- - c.conns[p] = state - - if state.refs == 0 { - if state.responsive { - c.bsnet.receiver.PeerDisconnected(p) - } - delete(c.conns, p) - } -} - -func (c *connectEventManager) MarkUnresponsive(p peer.ID) { - c.lk.Lock() - defer c.lk.Unlock() - - state, ok := c.conns[p] - if !ok { - return - } - state.responsive = false - c.conns[p] = state - - c.bsnet.receiver.PeerDisconnected(p) -} - -func (c *connectEventManager) OnMessage(p peer.ID) { - c.lk.Lock() - defer c.lk.Unlock() - - state, ok := c.conns[p] - if ok && !state.responsive { - state.responsive = true - c.conns[p] = state - c.bsnet.receiver.PeerConnected(p) - } -} - type netNotifiee impl func (nn *netNotifiee) impl() *impl { From b62e7fd0e103db39d54ca3c7a879729eae0a6bf5 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 16 Apr 2020 16:55:37 -0400 Subject: [PATCH 07/15] test: add more testing for ipfs_impl --- network/connecteventmanager.go | 2 +- network/ipfs_impl.go | 83 +++++++---- network/ipfs_impl_test.go | 253 ++++++++++++++++++++++++++++++++- 3 files changed, 306 insertions(+), 32 deletions(-) diff --git a/network/connecteventmanager.go b/network/connecteventmanager.go index 100b6f96..67082c4d 100644 --- a/network/connecteventmanager.go +++ b/network/connecteventmanager.go @@ -70,7 +70,7 @@ func (c *connectEventManager) MarkUnresponsive(p peer.ID) { defer c.lk.Unlock() state, ok := c.conns[p] - if !ok { + if !ok || !state.responsive { return } state.responsive = false diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index acf60525..e3f6cc27 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -95,18 +95,13 @@ type streamMessageSender struct { done chan struct{} } -func (s *streamMessageSender) Connect(ctx context.Context) (stream network.Stream, err error) { - defer func() { - if err != nil { - s.bsnet.connectEvtMgr.MarkUnresponsive(s.to) - } - }() - +// Open a stream to the remote peer +func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, error) { if s.stream != nil { return s.stream, nil } - if err = s.bsnet.ConnectTo(ctx, s.to); err != nil { + if err := s.bsnet.ConnectTo(ctx, s.to); err != nil { return nil, err } @@ -117,38 +112,59 @@ func (s *streamMessageSender) Connect(ctx context.Context) (stream network.Strea default: } - stream, err = s.bsnet.newStreamToPeer(ctx, s.to) - if err == nil { - s.stream = stream - return s.stream, nil + stream, err := s.bsnet.newStreamToPeer(ctx, s.to) + if err != nil { + return nil, err } - return nil, err + + s.stream = stream + return s.stream, nil } +// Reset the stream func (s *streamMessageSender) Reset() error { - err := s.stream.Reset() - s.stream = nil - return err + if s.stream != nil { + err := s.stream.Reset() + s.stream = nil + return err + } + return nil } +// Close the stream func (s *streamMessageSender) Close() error { close(s.done) return helpers.FullClose(s.stream) } +// Indicates whether the peer supports HAVE / DONT_HAVE messages func (s *streamMessageSender) SupportsHave() bool { return s.bsnet.SupportsHave(s.stream.Protocol()) } +// Send a message to the peer, attempting multiple times func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { - // Try to send the message repeatedly + return s.multiAttempt(ctx, func(fnctx context.Context) error { + return s.send(fnctx, msg) + }) +} + +// Perform a function with multiple attempts, and a timeout +func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context.Context) error) error { + // Try to call the function repeatedly var err error for i := 0; i < s.opts.MaxRetries; i++ { - if err = s.attemptSend(ctx, msg); err == nil { - // Sent successfully + deadline := time.Now().Add(s.opts.SendTimeout) + sndctx, cancel := context.WithDeadline(ctx, deadline) + + if err = fn(sndctx); err == nil { + cancel() + // Attempt was successful return nil } + cancel() + // Attempt failed. // If the sender has been closed or the context cancelled, just bail out select { case <-ctx.Done(): @@ -161,6 +177,7 @@ func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess // Failed to send so reset stream and try again _ = s.Reset() + // Failed too many times so mark the peer as unresponsive and return an error if i == s.opts.MaxRetries { s.bsnet.connectEvtMgr.MarkUnresponsive(s.to) return err @@ -179,17 +196,15 @@ func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess return err } -func (s *streamMessageSender) attemptSend(ctx context.Context, msg bsmsg.BitSwapMessage) error { - sndctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout) - defer cancel() - - stream, err := s.Connect(sndctx) +// Send a message to the peer +func (s *streamMessageSender) send(ctx context.Context, msg bsmsg.BitSwapMessage) error { + stream, err := s.Connect(ctx) if err != nil { log.Infof("failed to open stream to %s: %s", s.to, err) return err } - if err = s.bsnet.msgToStream(sndctx, stream, msg); err != nil { + if err = s.bsnet.msgToStream(ctx, stream, msg); err != nil { log.Infof("failed to send message to %s: %s", s.to, err) return err } @@ -256,6 +271,16 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg. } func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) { + if opts.MaxRetries == 0 { + opts.MaxRetries = 3 + } + if opts.SendTimeout == 0 { + opts.SendTimeout = sendMessageTimeout + } + if opts.SendErrorBackoff == 0 { + opts.SendErrorBackoff = 100 * time.Millisecond + } + sender := &streamMessageSender{ to: p, bsnet: bsnet, @@ -263,13 +288,15 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag done: make(chan struct{}), } - conctx, cancel := context.WithTimeout(ctx, sender.opts.SendTimeout) - defer cancel() + err := sender.multiAttempt(ctx, func(fnctx context.Context) error { + _, err := sender.Connect(fnctx) + return err + }) - _, err := sender.Connect(conctx) if err != nil { return nil, err } + return sender, nil } diff --git a/network/ipfs_impl_test.go b/network/ipfs_impl_test.go index 96e14b99..6311c63d 100644 --- a/network/ipfs_impl_test.go +++ b/network/ipfs_impl_test.go @@ -2,6 +2,8 @@ package network_test import ( "context" + "fmt" + "sync" "testing" "time" @@ -9,9 +11,12 @@ import ( pb "github.com/ipfs/go-bitswap/message/pb" bsnet "github.com/ipfs/go-bitswap/network" tn "github.com/ipfs/go-bitswap/testnet" + ds "github.com/ipfs/go-datastore" blocksutil "github.com/ipfs/go-ipfs-blocksutil" mockrouting "github.com/ipfs/go-ipfs-routing/mock" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" tnet "github.com/libp2p/go-libp2p-testing/net" @@ -60,6 +65,90 @@ func (r *receiver) PeerDisconnected(p peer.ID) { r.connectionEvent <- struct{}{} } +var mockNetErr = fmt.Errorf("network err") + +type ErrStream struct { + network.Stream + lk sync.Mutex + err bool + timingOut bool +} + +type ErrHost struct { + host.Host + lk sync.Mutex + err bool + timingOut bool + streams []*ErrStream +} + +func (es *ErrStream) Write(b []byte) (int, error) { + es.lk.Lock() + defer es.lk.Unlock() + + if es.err { + return 0, mockNetErr + } + if es.timingOut { + return 0, context.DeadlineExceeded + } + return es.Stream.Write(b) +} + +func (eh *ErrHost) Connect(ctx context.Context, pi peer.AddrInfo) error { + eh.lk.Lock() + defer eh.lk.Unlock() + + if eh.err { + return mockNetErr + } + if eh.timingOut { + return context.DeadlineExceeded + } + return eh.Host.Connect(ctx, pi) +} + +func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error) { + eh.lk.Lock() + defer eh.lk.Unlock() + + if eh.err { + return nil, mockNetErr + } + if eh.timingOut { + return nil, context.DeadlineExceeded + } + stream, err := eh.Host.NewStream(ctx, p, pids...) + estrm := &ErrStream{Stream: stream, err: eh.err, timingOut: eh.timingOut} + + eh.streams = append(eh.streams, estrm) + return estrm, err +} + +func (eh *ErrHost) setErrorState(erroring bool) { + eh.lk.Lock() + defer eh.lk.Unlock() + + eh.err = erroring + for _, s := range eh.streams { + s.lk.Lock() + s.err = erroring + s.lk.Unlock() + } +} + +func (eh *ErrHost) setTimeoutState(timingOut bool) { + eh.lk.Lock() + defer eh.lk.Unlock() + + eh.timingOut = timingOut + for _, s := range eh.streams { + s.lk.Lock() + s.timingOut = timingOut + s.lk.Unlock() + } +} + func TestMessageSendAndReceive(t *testing.T) { // create network ctx := context.Background() @@ -164,6 +253,166 @@ func TestMessageSendAndReceive(t *testing.T) { } } +func TestMessageResendAfterError(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + // create network + mn := mocknet.New(ctx) + mr := mockrouting.NewServer() + streamNet, err := tn.StreamNet(ctx, mn, mr) + if err != nil { + t.Fatal("Unable to setup network") + } + p1 := tnet.RandIdentityOrFatal(t) + p2 := tnet.RandIdentityOrFatal(t) + + h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address()) + if err != nil { + t.Fatal(err) + } + + // Create a special host that we can force to start returning errors + eh := &ErrHost{ + Host: h1, + err: false, + } + routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) + bsnet1 := bsnet.NewFromIpfsHost(eh, routing) + + bsnet2 := streamNet.Adapter(p2) + r1 := newReceiver() + r2 := newReceiver() + bsnet1.SetDelegate(r1) + bsnet2.SetDelegate(r2) + + err = mn.LinkAll() + if err != nil { + t.Fatal(err) + } + err = bsnet1.ConnectTo(ctx, p2.ID()) + if err != nil { + t.Fatal(err) + } + err = bsnet2.ConnectTo(ctx, p1.ID()) + if err != nil { + t.Fatal(err) + } + + blockGenerator := blocksutil.NewBlockGenerator() + block1 := blockGenerator.Next() + msg := bsmsg.New(false) + msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true) + + testSendErrorBackoff := 100 * time.Millisecond + ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ + MaxRetries: 3, + SendTimeout: 100 * time.Millisecond, + SendErrorBackoff: testSendErrorBackoff, + }) + if err != nil { + t.Fatal(err) + } + + <-r1.connectionEvent + + // Return an error from the networking layer the next time we try to send + // a message + eh.setErrorState(true) + + go func() { + time.Sleep(testSendErrorBackoff / 2) + // Stop throwing errors so that the following attempt to send succeeds + eh.setErrorState(false) + }() + + // Send message with retries, first one should fail, then subsequent + // message should succeed + err = ms.SendMsg(ctx, msg) + if err != nil { + t.Fatal(err) + } + + select { + case <-ctx.Done(): + t.Fatal("did not receive message sent") + case <-r2.messageReceived: + } +} + +func TestMessageSendTimeout(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + // create network + mn := mocknet.New(ctx) + mr := mockrouting.NewServer() + streamNet, err := tn.StreamNet(ctx, mn, mr) + if err != nil { + t.Fatal("Unable to setup network") + } + p1 := tnet.RandIdentityOrFatal(t) + p2 := tnet.RandIdentityOrFatal(t) + + h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address()) + if err != nil { + t.Fatal(err) + } + + // Create a special host that we can force to start timing out + eh := &ErrHost{ + Host: h1, + err: false, + } + routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) + bsnet1 := bsnet.NewFromIpfsHost(eh, routing) + + bsnet2 := streamNet.Adapter(p2) + r1 := newReceiver() + r2 := newReceiver() + bsnet1.SetDelegate(r1) + bsnet2.SetDelegate(r2) + + err = mn.LinkAll() + if err != nil { + t.Fatal(err) + } + err = bsnet1.ConnectTo(ctx, p2.ID()) + if err != nil { + t.Fatal(err) + } + err = bsnet2.ConnectTo(ctx, p1.ID()) + if err != nil { + t.Fatal(err) + } + + blockGenerator := blocksutil.NewBlockGenerator() + block1 := blockGenerator.Next() + msg := bsmsg.New(false) + msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true) + + ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ + MaxRetries: 3, + SendTimeout: 100 * time.Millisecond, + SendErrorBackoff: 100 * time.Millisecond, + }) + if err != nil { + t.Fatal(err) + } + <-r1.connectionEvent + + // Return a DeadlineExceeded error from the networking layer the next time we try to + // send a message + eh.setTimeoutState(true) + + // Send message with retries, first one should fail, then subsequent + // message should succeed + err = ms.SendMsg(ctx, msg) + if err == nil { + t.Fatal("Expected error from SednMsg") + } +} + func TestSupportsHave(t *testing.T) { ctx := context.Background() mn := mocknet.New(ctx) @@ -199,9 +448,7 @@ func TestSupportsHave(t *testing.T) { t.Fatal(err) } - senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ - SendTimeout: time.Second, - }) + senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{}) if err != nil { t.Fatal(err) } From 3b40d49d0fdacdfb12fe4e431e3724ad0749b7e9 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 17 Apr 2020 11:49:34 -0400 Subject: [PATCH 08/15] feat: dont retry if connect error is multistream.ErrNotSupported --- network/ipfs_impl.go | 13 +++- network/ipfs_impl_test.go | 141 ++++++++++++++++++++++++++++++-------- 2 files changed, 123 insertions(+), 31 deletions(-) diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index e3f6cc27..cc1d0fd1 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -2,6 +2,7 @@ package network import ( "context" + "errors" "fmt" "io" "sync/atomic" @@ -22,6 +23,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/ping" msgio "github.com/libp2p/go-msgio" ma "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multistream" ) var log = logging.Logger("bitswap_network") @@ -164,7 +166,8 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context. } cancel() - // Attempt failed. + // Attempt failed + // If the sender has been closed or the context cancelled, just bail out select { case <-ctx.Done(): @@ -174,11 +177,17 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context. default: } + // Protocol is not supported, so no need to try multiple times + if errors.Is(err, multistream.ErrNotSupported) { + s.bsnet.connectEvtMgr.MarkUnresponsive(s.to) + return err + } + // Failed to send so reset stream and try again _ = s.Reset() // Failed too many times so mark the peer as unresponsive and return an error - if i == s.opts.MaxRetries { + if i == s.opts.MaxRetries-1 { s.bsnet.connectEvtMgr.MarkUnresponsive(s.to) return err } diff --git a/network/ipfs_impl_test.go b/network/ipfs_impl_test.go index 6311c63d..454bb410 100644 --- a/network/ipfs_impl_test.go +++ b/network/ipfs_impl_test.go @@ -14,6 +14,7 @@ import ( ds "github.com/ipfs/go-datastore" blocksutil "github.com/ipfs/go-ipfs-blocksutil" mockrouting "github.com/ipfs/go-ipfs-routing/mock" + "github.com/multiformats/go-multistream" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" @@ -27,7 +28,7 @@ import ( type receiver struct { peers map[peer.ID]struct{} messageReceived chan struct{} - connectionEvent chan struct{} + connectionEvent chan bool lastMessage bsmsg.BitSwapMessage lastSender peer.ID } @@ -36,7 +37,7 @@ func newReceiver() *receiver { return &receiver{ peers: make(map[peer.ID]struct{}), messageReceived: make(chan struct{}), - connectionEvent: make(chan struct{}, 1), + connectionEvent: make(chan bool, 1), } } @@ -57,12 +58,12 @@ func (r *receiver) ReceiveError(err error) { func (r *receiver) PeerConnected(p peer.ID) { r.peers[p] = struct{}{} - r.connectionEvent <- struct{}{} + r.connectionEvent <- true } func (r *receiver) PeerDisconnected(p peer.ID) { delete(r.peers, p) - r.connectionEvent <- struct{}{} + r.connectionEvent <- false } var mockNetErr = fmt.Errorf("network err") @@ -70,14 +71,14 @@ var mockNetErr = fmt.Errorf("network err") type ErrStream struct { network.Stream lk sync.Mutex - err bool + err error timingOut bool } type ErrHost struct { host.Host lk sync.Mutex - err bool + err error timingOut bool streams []*ErrStream } @@ -86,8 +87,8 @@ func (es *ErrStream) Write(b []byte) (int, error) { es.lk.Lock() defer es.lk.Unlock() - if es.err { - return 0, mockNetErr + if es.err != nil { + return 0, es.err } if es.timingOut { return 0, context.DeadlineExceeded @@ -99,8 +100,8 @@ func (eh *ErrHost) Connect(ctx context.Context, pi peer.AddrInfo) error { eh.lk.Lock() defer eh.lk.Unlock() - if eh.err { - return mockNetErr + if eh.err != nil { + return eh.err } if eh.timingOut { return context.DeadlineExceeded @@ -112,7 +113,7 @@ func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID eh.lk.Lock() defer eh.lk.Unlock() - if eh.err { + if eh.err != nil { return nil, mockNetErr } if eh.timingOut { @@ -125,14 +126,14 @@ func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID return estrm, err } -func (eh *ErrHost) setErrorState(erroring bool) { +func (eh *ErrHost) setError(err error) { eh.lk.Lock() defer eh.lk.Unlock() - eh.err = erroring + eh.err = err for _, s := range eh.streams { s.lk.Lock() - s.err = erroring + s.err = err s.lk.Unlock() } } @@ -273,10 +274,7 @@ func TestMessageResendAfterError(t *testing.T) { } // Create a special host that we can force to start returning errors - eh := &ErrHost{ - Host: h1, - err: false, - } + eh := &ErrHost{Host: h1} routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) bsnet1 := bsnet.NewFromIpfsHost(eh, routing) @@ -294,6 +292,11 @@ func TestMessageResendAfterError(t *testing.T) { if err != nil { t.Fatal(err) } + isConnected := <-r1.connectionEvent + if !isConnected { + t.Fatal("Expected connect event") + } + err = bsnet2.ConnectTo(ctx, p1.ID()) if err != nil { t.Fatal(err) @@ -314,16 +317,14 @@ func TestMessageResendAfterError(t *testing.T) { t.Fatal(err) } - <-r1.connectionEvent - // Return an error from the networking layer the next time we try to send // a message - eh.setErrorState(true) + eh.setError(mockNetErr) go func() { time.Sleep(testSendErrorBackoff / 2) // Stop throwing errors so that the following attempt to send succeeds - eh.setErrorState(false) + eh.setError(nil) }() // Send message with retries, first one should fail, then subsequent @@ -360,10 +361,7 @@ func TestMessageSendTimeout(t *testing.T) { } // Create a special host that we can force to start timing out - eh := &ErrHost{ - Host: h1, - err: false, - } + eh := &ErrHost{Host: h1} routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) bsnet1 := bsnet.NewFromIpfsHost(eh, routing) @@ -381,6 +379,11 @@ func TestMessageSendTimeout(t *testing.T) { if err != nil { t.Fatal(err) } + isConnected := <-r1.connectionEvent + if !isConnected { + t.Fatal("Expected connect event") + } + err = bsnet2.ConnectTo(ctx, p1.ID()) if err != nil { t.Fatal(err) @@ -399,18 +402,98 @@ func TestMessageSendTimeout(t *testing.T) { if err != nil { t.Fatal(err) } - <-r1.connectionEvent // Return a DeadlineExceeded error from the networking layer the next time we try to // send a message eh.setTimeoutState(true) - // Send message with retries, first one should fail, then subsequent - // message should succeed + // Send message with retries, all attempts should fail err = ms.SendMsg(ctx, msg) if err == nil { t.Fatal("Expected error from SednMsg") } + + select { + case <-time.After(500 * time.Millisecond): + t.Fatal("Did not receive disconnect event") + case isConnected = <-r1.connectionEvent: + if isConnected { + t.Fatal("Expected disconnect event (got connect event)") + } + } +} + +func TestMessageSendNotSupportedResponse(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + // create network + mn := mocknet.New(ctx) + mr := mockrouting.NewServer() + streamNet, err := tn.StreamNet(ctx, mn, mr) + if err != nil { + t.Fatal("Unable to setup network") + } + p1 := tnet.RandIdentityOrFatal(t) + p2 := tnet.RandIdentityOrFatal(t) + + h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address()) + if err != nil { + t.Fatal(err) + } + + // Create a special host that responds with ErrNotSupported + eh := &ErrHost{Host: h1} + routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) + bsnet1 := bsnet.NewFromIpfsHost(eh, routing) + + bsnet2 := streamNet.Adapter(p2) + r1 := newReceiver() + r2 := newReceiver() + bsnet1.SetDelegate(r1) + bsnet2.SetDelegate(r2) + + err = mn.LinkAll() + if err != nil { + t.Fatal(err) + } + err = bsnet1.ConnectTo(ctx, p2.ID()) + if err != nil { + t.Fatal(err) + } + isConnected := <-r1.connectionEvent + if !isConnected { + t.Fatal("Expected connect event") + } + + err = bsnet2.ConnectTo(ctx, p1.ID()) + if err != nil { + t.Fatal(err) + } + + blockGenerator := blocksutil.NewBlockGenerator() + block1 := blockGenerator.Next() + msg := bsmsg.New(false) + msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true) + + eh.setError(multistream.ErrNotSupported) + _, err = bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ + MaxRetries: 3, + SendTimeout: 100 * time.Millisecond, + SendErrorBackoff: 100 * time.Millisecond, + }) + if err == nil { + t.Fatal("Expected ErrNotSupported") + } + + select { + case <-time.After(500 * time.Millisecond): + t.Fatal("Did not receive disconnect event") + case isConnected = <-r1.connectionEvent: + if isConnected { + t.Fatal("Expected disconnect event (got connect event)") + } + } } func TestSupportsHave(t *testing.T) { From 59e7aa4226fabeb9ad69d3c3be2e71b70d709b97 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 17 Apr 2020 11:56:14 -0400 Subject: [PATCH 09/15] fix: copy opts in ipfs_impl --- network/ipfs_impl.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index cc1d0fd1..94afd61e 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -280,15 +280,7 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg. } func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) { - if opts.MaxRetries == 0 { - opts.MaxRetries = 3 - } - if opts.SendTimeout == 0 { - opts.SendTimeout = sendMessageTimeout - } - if opts.SendErrorBackoff == 0 { - opts.SendErrorBackoff = 100 * time.Millisecond - } + opts = setDefaultOpts(opts) sender := &streamMessageSender{ to: p, @@ -309,6 +301,20 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag return sender, nil } +func setDefaultOpts(opts *MessageSenderOpts) *MessageSenderOpts { + copy := *opts + if opts.MaxRetries == 0 { + copy.MaxRetries = 3 + } + if opts.SendTimeout == 0 { + copy.SendTimeout = sendMessageTimeout + } + if opts.SendErrorBackoff == 0 { + copy.SendErrorBackoff = 100 * time.Millisecond + } + return © +} + func (bsnet *impl) SendMessage( ctx context.Context, p peer.ID, From c233956cc9f9f0f7142235a9f15850cca730d043 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 17 Apr 2020 11:58:35 -0400 Subject: [PATCH 10/15] fix: remove extraneous map writes in connectionEventManager --- network/connecteventmanager.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/network/connecteventmanager.go b/network/connecteventmanager.go index 67082c4d..e86d6839 100644 --- a/network/connecteventmanager.go +++ b/network/connecteventmanager.go @@ -55,7 +55,6 @@ func (c *connectEventManager) Disconnected(p peer.ID) { return } state.refs-- - c.conns[p] = state if state.refs == 0 { if state.responsive { @@ -74,7 +73,6 @@ func (c *connectEventManager) MarkUnresponsive(p peer.ID) { return } state.responsive = false - c.conns[p] = state c.connListener.PeerDisconnected(p) } @@ -86,7 +84,6 @@ func (c *connectEventManager) OnMessage(p peer.ID) { state, ok := c.conns[p] if ok && !state.responsive { state.responsive = true - c.conns[p] = state c.connListener.PeerConnected(p) } } From c26bd59db63f49c3b3d21c4e31bcc861bc0312dc Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 17 Apr 2020 12:06:13 -0400 Subject: [PATCH 11/15] fix: perf improvement for connectEventManager --- network/connecteventmanager.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/network/connecteventmanager.go b/network/connecteventmanager.go index e86d6839..b28e8e5b 100644 --- a/network/connecteventmanager.go +++ b/network/connecteventmanager.go @@ -13,7 +13,7 @@ type ConnectionListener interface { type connectEventManager struct { connListener ConnectionListener - lk sync.Mutex + lk sync.RWMutex conns map[peer.ID]*connState } @@ -78,12 +78,28 @@ func (c *connectEventManager) MarkUnresponsive(p peer.ID) { } func (c *connectEventManager) OnMessage(p peer.ID) { + // This is a frequent operation so to avoid different message arrivals + // getting blocked by a write lock, first take a read lock to check if + // we need to modify state + c.lk.RLock() + state, ok := c.conns[p] + c.lk.RUnlock() + + if !ok || state.responsive { + return + } + + // We need to make a modification so now take a write lock c.lk.Lock() defer c.lk.Unlock() - state, ok := c.conns[p] - if ok && !state.responsive { - state.responsive = true - c.connListener.PeerConnected(p) + // Note: state may have changed in the time between when read lock + // was released and write lock taken, so check again + state, ok = c.conns[p] + if !ok || state.responsive { + return } + + state.responsive = true + c.connListener.PeerConnected(p) } From bdccb20e6aebd2f2343b860b51a1b9f2062e9e8b Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 17 Apr 2020 14:57:30 -0400 Subject: [PATCH 12/15] fix: simplify message queue shutdown --- internal/messagequeue/messagequeue.go | 22 +++++------ internal/messagequeue/messagequeue_test.go | 43 +++++++--------------- network/ipfs_impl.go | 14 ------- 3 files changed, 23 insertions(+), 56 deletions(-) diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index c45a355c..2fb19665 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -56,6 +56,7 @@ type MessageNetwork interface { // MessageQueue implements queue of want messages to send to peers. type MessageQueue struct { ctx context.Context + shutdown func() p peer.ID network MessageNetwork dhTimeoutMgr DontHaveTimeoutManager @@ -63,7 +64,6 @@ type MessageQueue struct { sendErrorBackoff time.Duration outgoingWork chan time.Time - done chan struct{} // Take lock whenever any of these variables are modified wllock sync.Mutex @@ -170,8 +170,10 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue { + ctx, cancel := context.WithCancel(ctx) mq := &MessageQueue{ ctx: ctx, + shutdown: cancel, p: p, network: network, dhTimeoutMgr: dhTimeoutMgr, @@ -180,7 +182,6 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, peerWants: newRecallWantList(), cancels: cid.NewSet(), outgoingWork: make(chan time.Time, 1), - done: make(chan struct{}), rebroadcastInterval: defaultRebroadcastInterval, sendErrorBackoff: sendErrorBackoff, priority: maxPriority, @@ -301,12 +302,17 @@ func (mq *MessageQueue) Startup() { // Shutdown stops the processing of messages for a message queue. func (mq *MessageQueue) Shutdown() { - close(mq.done) + mq.shutdown() } func (mq *MessageQueue) onShutdown() { // Shut down the DONT_HAVE timeout manager mq.dhTimeoutMgr.Shutdown() + + // Reset the streamMessageSender + if mq.sender != nil { + _ = mq.sender.Reset() + } } func (mq *MessageQueue) runQueue() { @@ -352,17 +358,7 @@ func (mq *MessageQueue) runQueue() { // in sendMessageDebounce. Send immediately. workScheduled = time.Time{} mq.sendIfReady() - case <-mq.done: - if mq.sender != nil { - mq.sender.Close() - } - return case <-mq.ctx.Done(): - if mq.sender != nil { - // TODO: should I call sender.Close() here also to stop - // and in progress connection? - _ = mq.sender.Reset() - } return } } diff --git a/internal/messagequeue/messagequeue_test.go b/internal/messagequeue/messagequeue_test.go index 38ffafa2..344da41a 100644 --- a/internal/messagequeue/messagequeue_test.go +++ b/internal/messagequeue/messagequeue_test.go @@ -82,17 +82,15 @@ func (fp *fakeDontHaveTimeoutMgr) pendingCount() int { type fakeMessageSender struct { lk sync.Mutex - fullClosed chan<- struct{} reset chan<- struct{} messagesSent chan<- []bsmsg.Entry supportsHave bool } -func newFakeMessageSender(fullClosed chan<- struct{}, reset chan<- struct{}, +func newFakeMessageSender(reset chan<- struct{}, messagesSent chan<- []bsmsg.Entry, supportsHave bool) *fakeMessageSender { return &fakeMessageSender{ - fullClosed: fullClosed, reset: reset, messagesSent: messagesSent, supportsHave: supportsHave, @@ -106,7 +104,7 @@ func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess fms.messagesSent <- msg.Wantlist() return nil } -func (fms *fakeMessageSender) Close() error { fms.fullClosed <- struct{}{}; return nil } +func (fms *fakeMessageSender) Close() error { return nil } func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; return nil } func (fms *fakeMessageSender) SupportsHave() bool { return fms.supportsHave } @@ -141,8 +139,7 @@ func TestStartupAndShutdown(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) + fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -170,11 +167,9 @@ func TestStartupAndShutdown(t *testing.T) { timeoutctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() select { - case <-fullClosedChan: case <-resetChan: - t.Fatal("message sender should have been closed but was reset") case <-timeoutctx.Done(): - t.Fatal("message sender should have been closed but wasn't") + t.Fatal("message sender should have been reset but wasn't") } } @@ -182,8 +177,7 @@ func TestSendingMessagesDeduped(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) + fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -204,8 +198,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) + fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -226,8 +219,7 @@ func TestSendingMessagesPriority(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) + fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -294,8 +286,7 @@ func TestCancelOverridesPendingWants(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) + fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -345,8 +336,7 @@ func TestWantOverridesPendingCancels(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) + fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -392,8 +382,7 @@ func TestWantlistRebroadcast(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) + fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) @@ -488,8 +477,7 @@ func TestSendingLargeMessages(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) + fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] @@ -518,8 +506,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, false) + fakeSender := newFakeMessageSender(resetChan, messagesSent, false) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] @@ -573,8 +560,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, false) + fakeSender := newFakeMessageSender(resetChan, messagesSent, false) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] @@ -624,8 +610,7 @@ func BenchmarkMessageQueue(b *testing.B) { createQueue := func() *MessageQueue { messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) - fullClosedChan := make(chan struct{}, 1) - fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true) + fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 94afd61e..6fa2f535 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -94,7 +94,6 @@ type streamMessageSender struct { stream network.Stream bsnet *impl opts *MessageSenderOpts - done chan struct{} } // Open a stream to the remote peer @@ -107,13 +106,6 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro return nil, err } - // Check if the sender has been closed - select { - case <-s.done: - return nil, nil - default: - } - stream, err := s.bsnet.newStreamToPeer(ctx, s.to) if err != nil { return nil, err @@ -135,7 +127,6 @@ func (s *streamMessageSender) Reset() error { // Close the stream func (s *streamMessageSender) Close() error { - close(s.done) return helpers.FullClose(s.stream) } @@ -172,8 +163,6 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context. select { case <-ctx.Done(): return nil - case <-s.done: - return nil default: } @@ -195,8 +184,6 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context. select { case <-ctx.Done(): return nil - case <-s.done: - return nil case <-time.After(s.opts.SendErrorBackoff): // wait a short time in case disconnect notifications are still propagating log.Infof("send message to %s failed but context was not Done: %s", s.to, err) @@ -286,7 +273,6 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag to: p, bsnet: bsnet, opts: opts, - done: make(chan struct{}), } err := sender.multiAttempt(ctx, func(fnctx context.Context) error { From a8ed651525f3feec12f5e69344eddc368eaca762 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 17 Apr 2020 15:38:12 -0400 Subject: [PATCH 13/15] fix: use explicit connected bool for streamMessageSender --- network/ipfs_impl.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 6fa2f535..daad69be 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -90,15 +90,16 @@ type impl struct { } type streamMessageSender struct { - to peer.ID - stream network.Stream - bsnet *impl - opts *MessageSenderOpts + to peer.ID + stream network.Stream + connected bool + bsnet *impl + opts *MessageSenderOpts } // Open a stream to the remote peer func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, error) { - if s.stream != nil { + if s.connected { return s.stream, nil } @@ -112,6 +113,7 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro } s.stream = stream + s.connected = true return s.stream, nil } @@ -119,7 +121,7 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro func (s *streamMessageSender) Reset() error { if s.stream != nil { err := s.stream.Reset() - s.stream = nil + s.connected = false return err } return nil From 8894bb6a26765da19ee61510b415d660e6e59df6 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 17 Apr 2020 16:21:02 -0400 Subject: [PATCH 14/15] fix: ipfs_impl error handling --- network/ipfs_impl.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index daad69be..e57d37ce 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -164,7 +164,7 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context. // If the sender has been closed or the context cancelled, just bail out select { case <-ctx.Done(): - return nil + return ctx.Err() default: } @@ -185,7 +185,7 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context. select { case <-ctx.Done(): - return nil + return ctx.Err() case <-time.After(s.opts.SendErrorBackoff): // wait a short time in case disconnect notifications are still propagating log.Infof("send message to %s failed but context was not Done: %s", s.to, err) From e6bf8af372ac2d6ec48366c277d2957c93a82029 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 17 Apr 2020 15:59:10 -0400 Subject: [PATCH 15/15] fix: mark wants sent when they are added to a message to be sent --- internal/messagequeue/messagequeue.go | 31 +++++++-------------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 2fb19665..9fcab6d3 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -422,7 +422,7 @@ func (mq *MessageQueue) sendMessage() { mq.dhTimeoutMgr.Start() // Convert want lists to a Bitswap Message - message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave()) + message := mq.extractOutgoingMessage(mq.sender.SupportsHave()) // After processing the message, clear out its fields to save memory defer mq.msg.Reset(false) @@ -442,9 +442,6 @@ func (mq *MessageQueue) sendMessage() { return } - // We were able to send successfully. - onSent() - // Set a timer to wait for responses mq.simulateDontHaveWithTimeout(wantlist) @@ -541,7 +538,7 @@ func (mq *MessageQueue) pendingWorkCount() int { } // Convert the lists of wants into a Bitswap message -func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) { +func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage { mq.wllock.Lock() defer mq.wllock.Unlock() @@ -568,7 +565,6 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap } // Add each regular want-have / want-block to the message - peerSent := peerEntries[:0] for _, e := range peerEntries { if msgSize >= mq.maxMessageSize { break @@ -580,12 +576,13 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have) } else { msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true) - peerSent = append(peerSent, e) + + // Move the key from pending to sent + mq.peerWants.MarkSent(e) } } // Add each broadcast want-have to the message - bcstSent := bcstEntries[:0] for _, e := range bcstEntries { if msgSize >= mq.maxMessageSize { break @@ -601,24 +598,12 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap } msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false) - bcstSent = append(bcstSent, e) - } - // Called when the message has been successfully sent. - onMessageSent := func() { - mq.wllock.Lock() - defer mq.wllock.Unlock() - - // Move the keys from pending to sent - for _, e := range bcstSent { - mq.bcstWants.MarkSent(e) - } - for _, e := range peerSent { - mq.peerWants.MarkSent(e) - } + // Move the key from pending to sent + mq.bcstWants.MarkSent(e) } - return mq.msg, onMessageSent + return mq.msg } func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) {