From 6058fda6804ba1c790b436e68aa71bba648ed83d Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 29 Apr 2020 18:26:16 -0400 Subject: [PATCH 1/6] feat: calculate message latency This commit was moved from ipfs/go-bitswap@6763be87bc7f052a315840b5134d6e63c1869d3c --- bitswap/bitswap.go | 18 ++- .../messagequeue/donthavetimeoutmgr.go | 120 ++++++++++++++--- .../messagequeue/donthavetimeoutmgr_test.go | 88 +++++++++++-- bitswap/internal/messagequeue/messagequeue.go | 123 ++++++++++++++++-- .../messagequeue/messagequeue_test.go | 57 +++++++- bitswap/internal/peermanager/peermanager.go | 15 +++ .../internal/peermanager/peermanager_test.go | 2 + 7 files changed, 381 insertions(+), 42 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index db0ca0986..36b95cfd5 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -303,14 +303,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks // HasBlock announces the existence of a block to this bitswap service. The // service will potentially notify its peers. func (bs *Bitswap) HasBlock(blk blocks.Block) error { - return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil) + return bs.receiveBlocksFrom(context.Background(), time.Time{}, "", []blocks.Block{blk}, nil, nil) } // TODO: Some of this stuff really only needs to be done when adding a block // from the user, not when receiving it from the network. // In case you run `git blame` on this comment, I'll save you some time: ask // @whyrusleeping, I don't know the answers you seek. -func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error { +func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, at time.Time, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error { select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -348,6 +348,16 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b allKs = append(allKs, b.Cid()) } + // If the message came from the network + if from != "" { + // Inform the PeerManager so that we can calculate per-peer latency + combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves)) + combined = append(combined, allKs...) + combined = append(combined, haves...) + combined = append(combined, dontHaves...) + bs.pm.ResponseReceived(from, at, combined) + } + // Send all block keys (including duplicates) to any sessions that want them. // (The duplicates are needed by sessions for accounting purposes) bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves) @@ -386,6 +396,8 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b // ReceiveMessage is called by the network interface when a new message is // received. func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { + now := time.Now() + bs.counterLk.Lock() bs.counters.messagesRecvd++ bs.counterLk.Unlock() @@ -409,7 +421,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg dontHaves := incoming.DontHaves() if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 { // Process blocks - err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves) + err := bs.receiveBlocksFrom(ctx, now, p, iblocks, haves, dontHaves) if err != nil { log.Warnf("ReceiveMessage recvBlockFrom error: %s", err) return diff --git a/bitswap/internal/messagequeue/donthavetimeoutmgr.go b/bitswap/internal/messagequeue/donthavetimeoutmgr.go index e53b232e6..14e70c077 100644 --- a/bitswap/internal/messagequeue/donthavetimeoutmgr.go +++ b/bitswap/internal/messagequeue/donthavetimeoutmgr.go @@ -21,10 +21,20 @@ const ( // peer takes to process a want and initiate sending a response to us maxExpectedWantProcessTime = 2 * time.Second - // latencyMultiplier is multiplied by the average ping time to + // maxTimeout is the maximum allowed timeout, regardless of latency + maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime + + // pingLatencyMultiplier is multiplied by the average ping time to // get an upper bound on how long we expect to wait for a peer's response // to arrive - latencyMultiplier = 3 + pingLatencyMultiplier = 3 + + // messageLatencyAlpha is the alpha supplied to the message latency EWMA + messageLatencyAlpha = 0.5 + + // To give a margin for error, the timeout is calculated as + // messageLatencyMultiplier * message latency + messageLatencyMultiplier = 2 ) // PeerConnection is a connection to a peer that can be pinged, and the @@ -44,16 +54,20 @@ type pendingWant struct { sent time.Time } -// dontHaveTimeoutMgr pings the peer to measure latency. It uses the latency to -// set a reasonable timeout for simulating a DONT_HAVE message for peers that -// don't support DONT_HAVE or that take to long to respond. +// dontHaveTimeoutMgr simulates a DONT_HAVE message if the peer takes too long +// to respond to a message. +// The timeout is based on latency - we start with a default latency, while +// we ping the peer to estimate latency. If we receive a response from the +// peer we use the response latency. type dontHaveTimeoutMgr struct { ctx context.Context shutdown func() peerConn PeerConnection onDontHaveTimeout func([]cid.Cid) defaultTimeout time.Duration - latencyMultiplier int + maxTimeout time.Duration + pingLatencyMultiplier int + messageLatencyMultiplier int maxExpectedWantProcessTime time.Duration // All variables below here must be protected by the lock @@ -66,6 +80,8 @@ type dontHaveTimeoutMgr struct { wantQueue []*pendingWant // time to wait for a response (depends on latency) timeout time.Duration + // ewma of message latency (time from message sent to response received) + messageLatency *latencyEwma // timer used to wait until want at front of queue expires checkForTimeoutsTimer *time.Timer } @@ -73,13 +89,18 @@ type dontHaveTimeoutMgr struct { // newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr // onDontHaveTimeout is called when pending keys expire (not cancelled before timeout) func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr { - return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, - latencyMultiplier, maxExpectedWantProcessTime) + return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout, + pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime) } // newDontHaveTimeoutMgrWithParams is used by the tests -func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), - defaultTimeout time.Duration, latencyMultiplier int, +func newDontHaveTimeoutMgrWithParams( + pc PeerConnection, + onDontHaveTimeout func([]cid.Cid), + defaultTimeout time.Duration, + maxTimeout time.Duration, + pingLatencyMultiplier int, + messageLatencyMultiplier int, maxExpectedWantProcessTime time.Duration) *dontHaveTimeoutMgr { ctx, shutdown := context.WithCancel(context.Background()) @@ -89,8 +110,11 @@ func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([ peerConn: pc, activeWants: make(map[cid.Cid]*pendingWant), timeout: defaultTimeout, + messageLatency: &latencyEwma{alpha: messageLatencyAlpha}, defaultTimeout: defaultTimeout, - latencyMultiplier: latencyMultiplier, + maxTimeout: maxTimeout, + pingLatencyMultiplier: pingLatencyMultiplier, + messageLatencyMultiplier: messageLatencyMultiplier, maxExpectedWantProcessTime: maxExpectedWantProcessTime, onDontHaveTimeout: onDontHaveTimeout, } @@ -126,16 +150,36 @@ func (dhtm *dontHaveTimeoutMgr) Start() { // calculate a reasonable timeout latency := dhtm.peerConn.Latency() if latency.Nanoseconds() > 0 { - dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency) + dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency) return } // Otherwise measure latency by pinging the peer - go dhtm.measureLatency() + go dhtm.measurePingLatency() +} + +// UpdateMessageLatency is called when we receive a response from the peer. +// It is the time between sending a request and receiving the corresponding +// response. +func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) { + dhtm.lk.Lock() + defer dhtm.lk.Unlock() + + // Update the message latency and the timeout + dhtm.messageLatency.update(elapsed) + oldTimeout := dhtm.timeout + dhtm.timeout = dhtm.calculateTimeoutFromMessageLatency() + + // If the timeout has decreased + if dhtm.timeout < oldTimeout { + // Check if after changing the timeout there are any pending wants that + // are now over the timeout + dhtm.checkForTimeouts() + } } -// measureLatency measures the latency to the peer by pinging it -func (dhtm *dontHaveTimeoutMgr) measureLatency() { +// measurePingLatency measures the latency to the peer by pinging it +func (dhtm *dontHaveTimeoutMgr) measurePingLatency() { // Wait up to defaultTimeout for a response to the ping ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout) defer cancel() @@ -154,8 +198,13 @@ func (dhtm *dontHaveTimeoutMgr) measureLatency() { dhtm.lk.Lock() defer dhtm.lk.Unlock() + // A message has arrived so we already set the timeout based on message latency + if dhtm.messageLatency.samples > 0 { + return + } + // Calculate a reasonable timeout based on latency - dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency) + dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency) // Check if after changing the timeout there are any pending wants that are // now over the timeout @@ -284,10 +333,43 @@ func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) { dhtm.onDontHaveTimeout(pending) } -// calculateTimeoutFromLatency calculates a reasonable timeout derived from latency -func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromLatency(latency time.Duration) time.Duration { +// calculateTimeoutFromPingLatency calculates a reasonable timeout derived from latency +func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Duration) time.Duration { // The maximum expected time for a response is // the expected time to process the want + (latency * multiplier) // The multiplier is to provide some padding for variable latency. - return dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.latencyMultiplier)*latency + timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency + if timeout > dhtm.maxTimeout { + timeout = dhtm.maxTimeout + } + return timeout +} + +// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency +func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration { + timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier) + if timeout > dhtm.maxTimeout { + timeout = dhtm.maxTimeout + } + return timeout +} + +// latencyEwma is an EWMA of message latency +type latencyEwma struct { + alpha float64 + samples uint64 + latency time.Duration +} + +// update the EWMA with the given sample +func (le *latencyEwma) update(elapsed time.Duration) { + le.samples++ + + // Initially set alpha to be 1.0 / + alpha := 1.0 / float64(le.samples) + if alpha < le.alpha { + // Once we have enough samples, clamp alpha + alpha = le.alpha + } + le.latency = time.Duration(float64(elapsed)*alpha + (1-alpha)*float64(le.latency)) } diff --git a/bitswap/internal/messagequeue/donthavetimeoutmgr_test.go b/bitswap/internal/messagequeue/donthavetimeoutmgr_test.go index 03ceb4816..6f315fea9 100644 --- a/bitswap/internal/messagequeue/donthavetimeoutmgr_test.go +++ b/bitswap/internal/messagequeue/donthavetimeoutmgr_test.go @@ -79,7 +79,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { tr := timeoutRecorder{} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, latMultiplier, expProcessTime) + dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -102,7 +102,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { // At this stage first set of keys should have timed out if tr.timedOutCount() != len(firstks) { - t.Fatal("expected timeout") + t.Fatal("expected timeout", tr.timedOutCount(), len(firstks)) } // Clear the recorded timed out keys @@ -129,7 +129,7 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) { tr := timeoutRecorder{} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, latMultiplier, expProcessTime) + dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -160,7 +160,7 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) { tr := timeoutRecorder{} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, latMultiplier, expProcessTime) + dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -204,7 +204,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) { tr := timeoutRecorder{} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, latMultiplier, expProcessTime) + dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -222,6 +222,78 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) { } } +func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) { + ks := testutil.GenerateCids(2) + latency := time.Millisecond * 40 + latMultiplier := 1 + expProcessTime := time.Duration(0) + msgLatencyMultiplier := 1 + pc := &mockPeerConn{latency: latency} + tr := timeoutRecorder{} + + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, + dontHaveTimeout, maxTimeout, latMultiplier, msgLatencyMultiplier, expProcessTime) + dhtm.Start() + defer dhtm.Shutdown() + + // Add keys + dhtm.AddPending(ks) + + // expectedTimeout + // = expProcessTime + latency*time.Duration(latMultiplier) + // = 0 + 40ms * 1 + // = 40ms + + // Wait for less than the expected timeout + time.Sleep(25 * time.Millisecond) + + // Receive two message latency updates + dhtm.UpdateMessageLatency(time.Millisecond * 20) + dhtm.UpdateMessageLatency(time.Millisecond * 10) + + // alpha is 0.5 so timeout should be + // = (20ms * alpha) + (10ms * (1 - alpha)) + // = (20ms * 0.5) + (10ms * 0.5) + // = 15ms + // We've already slept for 25ms so with the new 15ms timeout + // the keys should have timed out + + // Give the queue some time to process the updates + time.Sleep(5 * time.Millisecond) + + if tr.timedOutCount() != len(ks) { + t.Fatal("expected keys to timeout") + } +} + +func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) { + ks := testutil.GenerateCids(2) + pc := &mockPeerConn{latency: time.Second} // ignored + tr := timeoutRecorder{} + msgLatencyMultiplier := 1 + testMaxTimeout := time.Millisecond * 10 + + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, + dontHaveTimeout, testMaxTimeout, pingLatencyMultiplier, msgLatencyMultiplier, maxExpectedWantProcessTime) + dhtm.Start() + defer dhtm.Shutdown() + + // Add keys + dhtm.AddPending(ks) + + // Receive a message latency update that would make the timeout greater + // than the maximum timeout + dhtm.UpdateMessageLatency(testMaxTimeout * 4) + + // Sleep until just after the maximum timeout + time.Sleep(testMaxTimeout + 5*time.Millisecond) + + // Keys should have timed out + if tr.timedOutCount() != len(ks) { + t.Fatal("expected keys to timeout") + } +} + func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { ks := testutil.GenerateCids(2) latency := time.Millisecond * 1 @@ -233,7 +305,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { pc := &mockPeerConn{latency: latency, err: fmt.Errorf("ping error")} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - defaultTimeout, latMultiplier, expProcessTime) + defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -267,7 +339,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) { pc := &mockPeerConn{latency: latency} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - defaultTimeout, latMultiplier, expProcessTime) + defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -300,7 +372,7 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) { pc := &mockPeerConn{latency: latency} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, latMultiplier, expProcessTime) + dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() diff --git a/bitswap/internal/messagequeue/messagequeue.go b/bitswap/internal/messagequeue/messagequeue.go index 755df08a7..9db2a8628 100644 --- a/bitswap/internal/messagequeue/messagequeue.go +++ b/bitswap/internal/messagequeue/messagequeue.go @@ -64,6 +64,7 @@ type MessageQueue struct { sendErrorBackoff time.Duration outgoingWork chan time.Time + responses chan *Response // Take lock whenever any of these variables are modified wllock sync.Mutex @@ -88,12 +89,15 @@ type recallWantlist struct { pending *bswl.Wantlist // The list of wants that have been sent sent *bswl.Wantlist + // The time at which each want was sent + sentAt map[cid.Cid]time.Time } func newRecallWantList() recallWantlist { return recallWantlist{ pending: bswl.New(), sent: bswl.New(), + sentAt: make(map[cid.Cid]time.Time), } } @@ -104,14 +108,18 @@ func (r *recallWantlist) Add(c cid.Cid, priority int32, wtype pb.Message_Wantlis // Remove wants from both the pending list and the list of sent wants func (r *recallWantlist) Remove(c cid.Cid) { - r.sent.Remove(c) r.pending.Remove(c) + r.sent.Remove(c) + delete(r.sentAt, c) } // Remove wants by type from both the pending list and the list of sent wants func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantType) { - r.sent.RemoveType(c, wtype) r.pending.RemoveType(c, wtype) + r.sent.RemoveType(c, wtype) + if _, ok := r.sent.Contains(c); !ok { + delete(r.sentAt, c) + } } // MarkSent moves the want from the pending to the sent list @@ -126,6 +134,16 @@ func (r *recallWantlist) MarkSent(e wantlist.Entry) bool { return true } +// SentAt records the time at which a want was sent +func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) { + // The want may have been cancelled in the interim + if _, ok := r.sent.Contains(c); ok { + if _, ok := r.sentAt[c]; !ok { + r.sentAt[c] = at + } + } +} + type peerConn struct { p peer.ID network MessageNetwork @@ -160,6 +178,15 @@ type DontHaveTimeoutManager interface { AddPending([]cid.Cid) // CancelPending removes the wants CancelPending([]cid.Cid) + UpdateMessageLatency(time.Duration) +} + +// Response from the peer +type Response struct { + // The time at which the response was received + at time.Time + // The blocks / HAVEs / DONT_HAVEs in the response + ks []cid.Cid } // New creates a new MessageQueue. @@ -177,7 +204,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue { ctx, cancel := context.WithCancel(ctx) - mq := &MessageQueue{ + return &MessageQueue{ ctx: ctx, shutdown: cancel, p: p, @@ -188,6 +215,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, peerWants: newRecallWantList(), cancels: cid.NewSet(), outgoingWork: make(chan time.Time, 1), + responses: make(chan *Response, 8), rebroadcastInterval: defaultRebroadcastInterval, sendErrorBackoff: sendErrorBackoff, priority: maxPriority, @@ -195,8 +223,6 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, // after using it, instead of creating a new one every time. msg: bsmsg.New(false), } - - return mq } // Add want-haves that are part of a broadcast to all connected peers @@ -291,6 +317,22 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) { } } +// ResponseReceived is called when a message is received from the network. +// ks is the set of blocks, HAVEs and DONT_HAVEs in the message +// Note that this is just used to calculate latency. +func (mq *MessageQueue) ResponseReceived(at time.Time, ks []cid.Cid) { + if len(ks) == 0 { + return + } + + // These messages are just used to approximate latency, so if we get so + // many responses that they get backed up, just ignore the overflow. + select { + case mq.responses <- &Response{at, ks}: + default: + } +} + // SetRebroadcastInterval sets a new interval on which to rebroadcast the full wantlist func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) { mq.rebroadcastIntervalLk.Lock() @@ -340,6 +382,7 @@ func (mq *MessageQueue) runQueue() { select { case <-mq.rebroadcastTimer.C: mq.rebroadcastWantlist() + case when := <-mq.outgoingWork: // If we have work scheduled, cancel the timer. If we // don't, record when the work was scheduled. @@ -362,11 +405,17 @@ func (mq *MessageQueue) runQueue() { // Otherwise, extend the timer. scheduleWork.Reset(sendMessageDebounce) } + case <-scheduleWork.C: // We have work scheduled and haven't seen any updates // in sendMessageDebounce. Send immediately. workScheduled = time.Time{} mq.sendIfReady() + + case res := <-mq.responses: + // We received a response from the peer, calculate latency + mq.handleResponse(res) + case <-mq.ctx.Done(): return } @@ -431,7 +480,7 @@ func (mq *MessageQueue) sendMessage() { mq.dhTimeoutMgr.Start() // Convert want lists to a Bitswap Message - message := mq.extractOutgoingMessage(mq.sender.SupportsHave()) + message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave()) // After processing the message, clear out its fields to save memory defer mq.msg.Reset(false) @@ -451,6 +500,9 @@ func (mq *MessageQueue) sendMessage() { return } + // Record sent time so as to calculate message latency + onSent() + // Set a timer to wait for responses mq.simulateDontHaveWithTimeout(wantlist) @@ -489,6 +541,34 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) { mq.dhTimeoutMgr.AddPending(wants) } +// handleResponse is called when a response is received from the peer +func (mq *MessageQueue) handleResponse(res *Response) { + now := time.Now() + earliest := time.Time{} + + mq.wllock.Lock() + + // Check if the keys in the response correspond to any request that was + // sent to the peer. + // Find the earliest request so as to calculate the longest latency as + // we want to be conservative when setting the timeout. + for _, c := range res.ks { + if at, ok := mq.bcstWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) { + earliest = at + } + if at, ok := mq.peerWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) { + earliest = at + } + } + + mq.wllock.Unlock() + + if !earliest.IsZero() { + // Inform the timeout manager of the calculated latency + mq.dhTimeoutMgr.UpdateMessageLatency(now.Sub(earliest)) + } +} + func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) { // Save some CPU cycles and allocations if log level is higher than debug if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil { @@ -547,7 +627,7 @@ func (mq *MessageQueue) pendingWorkCount() int { } // Convert the lists of wants into a Bitswap message -func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage { +func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) { // Get broadcast and regular wantlist entries. mq.wllock.Lock() peerEntries := mq.peerWants.pending.Entries() @@ -641,16 +721,18 @@ FINISH: // Finally, re-take the lock, mark sent and remove any entries from our // message that we've decided to cancel at the last minute. mq.wllock.Lock() - for _, e := range peerEntries[:sentPeerEntries] { + for i, e := range peerEntries[:sentPeerEntries] { if !mq.peerWants.MarkSent(e) { // It changed. mq.msg.Remove(e.Cid) + peerEntries[i].Cid = cid.Undef } } - for _, e := range bcstEntries[:sentBcstEntries] { + for i, e := range bcstEntries[:sentBcstEntries] { if !mq.bcstWants.MarkSent(e) { mq.msg.Remove(e.Cid) + bcstEntries[i].Cid = cid.Undef } } @@ -663,7 +745,28 @@ FINISH: } mq.wllock.Unlock() - return mq.msg + // When the message has been sent, record the time at which each want was + // sent so we can calculate message latency + onSent := func() { + now := time.Now() + + mq.wllock.Lock() + defer mq.wllock.Unlock() + + for _, e := range peerEntries[:sentPeerEntries] { + if e.Cid.Defined() { // Check if want was cancelled in the interim + mq.peerWants.SentAt(e.Cid, now) + } + } + + for _, e := range bcstEntries[:sentBcstEntries] { + if e.Cid.Defined() { // Check if want was cancelled in the interim + mq.bcstWants.SentAt(e.Cid, now) + } + } + } + + return mq.msg, onSent } func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) { diff --git a/bitswap/internal/messagequeue/messagequeue_test.go b/bitswap/internal/messagequeue/messagequeue_test.go index 344da41a5..32a7242c2 100644 --- a/bitswap/internal/messagequeue/messagequeue_test.go +++ b/bitswap/internal/messagequeue/messagequeue_test.go @@ -44,8 +44,9 @@ func (fms *fakeMessageNetwork) Ping(context.Context, peer.ID) ping.Result { } type fakeDontHaveTimeoutMgr struct { - lk sync.Mutex - ks []cid.Cid + lk sync.Mutex + ks []cid.Cid + latencyUpds []time.Duration } func (fp *fakeDontHaveTimeoutMgr) Start() {} @@ -73,6 +74,18 @@ func (fp *fakeDontHaveTimeoutMgr) CancelPending(ks []cid.Cid) { } fp.ks = s.Keys() } +func (fp *fakeDontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) { + fp.lk.Lock() + defer fp.lk.Unlock() + + fp.latencyUpds = append(fp.latencyUpds, elapsed) +} +func (fp *fakeDontHaveTimeoutMgr) latencyUpdates() []time.Duration { + fp.lk.Lock() + defer fp.lk.Unlock() + + return fp.latencyUpds +} func (fp *fakeDontHaveTimeoutMgr) pendingCount() int { fp.lk.Lock() defer fp.lk.Unlock() @@ -587,6 +600,46 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { } } +func TestResponseReceived(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan []bsmsg.Entry) + resetChan := make(chan struct{}, 1) + fakeSender := newFakeMessageSender(resetChan, messagesSent, false) + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + + dhtm := &fakeDontHaveTimeoutMgr{} + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue.Startup() + + cids := testutil.GenerateCids(10) + + // Add some wants and wait 10ms + messageQueue.AddWants(cids[:5], nil) + collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + + // Add some wants and wait another 10ms + messageQueue.AddWants(cids[5:8], nil) + collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + + // Receive a response for some of the wants from both groups + messageQueue.ResponseReceived(time.Now(), []cid.Cid{cids[0], cids[6], cids[9]}) + + // Wait a short time for processing + time.Sleep(10 * time.Millisecond) + + // Check that message queue informs DHTM of received responses + upds := dhtm.latencyUpdates() + if len(upds) != 1 { + t.Fatal("expected one latency update") + } + // Elapsed time should be between when the first want was sent and the + // response received (about 20ms) + if upds[0] < 15*time.Millisecond || upds[0] > 25*time.Millisecond { + t.Fatal("expected latency to be time since oldest message sent") + } +} + func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) { var wbs []cid.Cid var whs []cid.Cid diff --git a/bitswap/internal/peermanager/peermanager.go b/bitswap/internal/peermanager/peermanager.go index 522823263..aa40727b2 100644 --- a/bitswap/internal/peermanager/peermanager.go +++ b/bitswap/internal/peermanager/peermanager.go @@ -3,6 +3,7 @@ package peermanager import ( "context" "sync" + "time" logging "github.com/ipfs/go-log" "github.com/ipfs/go-metrics-interface" @@ -18,6 +19,7 @@ type PeerQueue interface { AddBroadcastWantHaves([]cid.Cid) AddWants([]cid.Cid, []cid.Cid) AddCancels([]cid.Cid) + ResponseReceived(at time.Time, ks []cid.Cid) Startup() Shutdown() } @@ -116,6 +118,19 @@ func (pm *PeerManager) Disconnected(p peer.ID) { pm.pwm.removePeer(p) } +// ResponseReceived is called when a message is received from the network. +// ks is the set of blocks, HAVEs and DONT_HAVEs in the message +// Note that this is just used to calculate latency. +func (pm *PeerManager) ResponseReceived(p peer.ID, at time.Time, ks []cid.Cid) { + pm.pqLk.Lock() + pq, ok := pm.peerQueues[p] + pm.pqLk.Unlock() + + if ok { + pq.ResponseReceived(at, ks) + } +} + // BroadcastWantHaves broadcasts want-haves to all peers (used by the session // to discover seeds). // For each peer it filters out want-haves that have previously been sent to diff --git a/bitswap/internal/peermanager/peermanager_test.go b/bitswap/internal/peermanager/peermanager_test.go index 469aa4d19..d5d348fe6 100644 --- a/bitswap/internal/peermanager/peermanager_test.go +++ b/bitswap/internal/peermanager/peermanager_test.go @@ -35,6 +35,8 @@ func (fp *mockPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) { func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) { fp.msgs <- msg{fp.p, nil, nil, cs} } +func (fp *mockPeerQueue) ResponseReceived(at time.Time, ks []cid.Cid) { +} type peerWants struct { wantHaves []cid.Cid From 4dac20264e536387f698a5f1e289cd9267f7bc48 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 30 Apr 2020 11:39:05 -0400 Subject: [PATCH 2/6] fix: simplify latency timing This commit was moved from ipfs/go-bitswap@5c215f4179b976a42adc3838172fe8651929bc10 --- bitswap/bitswap.go | 10 +++---- bitswap/internal/messagequeue/messagequeue.go | 26 ++++++++----------- .../messagequeue/messagequeue_test.go | 2 +- bitswap/internal/peermanager/peermanager.go | 7 +++-- .../internal/peermanager/peermanager_test.go | 2 +- 5 files changed, 20 insertions(+), 27 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 36b95cfd5..bfcd125f9 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -303,14 +303,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks // HasBlock announces the existence of a block to this bitswap service. The // service will potentially notify its peers. func (bs *Bitswap) HasBlock(blk blocks.Block) error { - return bs.receiveBlocksFrom(context.Background(), time.Time{}, "", []blocks.Block{blk}, nil, nil) + return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil) } // TODO: Some of this stuff really only needs to be done when adding a block // from the user, not when receiving it from the network. // In case you run `git blame` on this comment, I'll save you some time: ask // @whyrusleeping, I don't know the answers you seek. -func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, at time.Time, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error { +func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error { select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -355,7 +355,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, at time.Time, from pee combined = append(combined, allKs...) combined = append(combined, haves...) combined = append(combined, dontHaves...) - bs.pm.ResponseReceived(from, at, combined) + bs.pm.ResponseReceived(from, combined) } // Send all block keys (including duplicates) to any sessions that want them. @@ -396,8 +396,6 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, at time.Time, from pee // ReceiveMessage is called by the network interface when a new message is // received. func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { - now := time.Now() - bs.counterLk.Lock() bs.counters.messagesRecvd++ bs.counterLk.Unlock() @@ -421,7 +419,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg dontHaves := incoming.DontHaves() if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 { // Process blocks - err := bs.receiveBlocksFrom(ctx, now, p, iblocks, haves, dontHaves) + err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves) if err != nil { log.Warnf("ReceiveMessage recvBlockFrom error: %s", err) return diff --git a/bitswap/internal/messagequeue/messagequeue.go b/bitswap/internal/messagequeue/messagequeue.go index 9db2a8628..07c18a77e 100644 --- a/bitswap/internal/messagequeue/messagequeue.go +++ b/bitswap/internal/messagequeue/messagequeue.go @@ -63,8 +63,11 @@ type MessageQueue struct { maxMessageSize int sendErrorBackoff time.Duration + // Signals that there are outgoing wants / cancels ready to be processed outgoingWork chan time.Time - responses chan *Response + + // Channel of CIDs of blocks / HAVEs / DONT_HAVEs received from the peer + responses chan []cid.Cid // Take lock whenever any of these variables are modified wllock sync.Mutex @@ -181,14 +184,6 @@ type DontHaveTimeoutManager interface { UpdateMessageLatency(time.Duration) } -// Response from the peer -type Response struct { - // The time at which the response was received - at time.Time - // The blocks / HAVEs / DONT_HAVEs in the response - ks []cid.Cid -} - // New creates a new MessageQueue. func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue { onTimeout := func(ks []cid.Cid) { @@ -215,7 +210,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, peerWants: newRecallWantList(), cancels: cid.NewSet(), outgoingWork: make(chan time.Time, 1), - responses: make(chan *Response, 8), + responses: make(chan []cid.Cid, 8), rebroadcastInterval: defaultRebroadcastInterval, sendErrorBackoff: sendErrorBackoff, priority: maxPriority, @@ -320,7 +315,7 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) { // ResponseReceived is called when a message is received from the network. // ks is the set of blocks, HAVEs and DONT_HAVEs in the message // Note that this is just used to calculate latency. -func (mq *MessageQueue) ResponseReceived(at time.Time, ks []cid.Cid) { +func (mq *MessageQueue) ResponseReceived(ks []cid.Cid) { if len(ks) == 0 { return } @@ -328,7 +323,7 @@ func (mq *MessageQueue) ResponseReceived(at time.Time, ks []cid.Cid) { // These messages are just used to approximate latency, so if we get so // many responses that they get backed up, just ignore the overflow. select { - case mq.responses <- &Response{at, ks}: + case mq.responses <- ks: default: } } @@ -541,8 +536,9 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) { mq.dhTimeoutMgr.AddPending(wants) } -// handleResponse is called when a response is received from the peer -func (mq *MessageQueue) handleResponse(res *Response) { +// handleResponse is called when a response is received from the peer, +// with the CIDs of received blocks / HAVEs / DONT_HAVEs +func (mq *MessageQueue) handleResponse(ks []cid.Cid) { now := time.Now() earliest := time.Time{} @@ -552,7 +548,7 @@ func (mq *MessageQueue) handleResponse(res *Response) { // sent to the peer. // Find the earliest request so as to calculate the longest latency as // we want to be conservative when setting the timeout. - for _, c := range res.ks { + for _, c := range ks { if at, ok := mq.bcstWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) { earliest = at } diff --git a/bitswap/internal/messagequeue/messagequeue_test.go b/bitswap/internal/messagequeue/messagequeue_test.go index 32a7242c2..1ef0d2a5f 100644 --- a/bitswap/internal/messagequeue/messagequeue_test.go +++ b/bitswap/internal/messagequeue/messagequeue_test.go @@ -623,7 +623,7 @@ func TestResponseReceived(t *testing.T) { collectMessages(ctx, t, messagesSent, 10*time.Millisecond) // Receive a response for some of the wants from both groups - messageQueue.ResponseReceived(time.Now(), []cid.Cid{cids[0], cids[6], cids[9]}) + messageQueue.ResponseReceived([]cid.Cid{cids[0], cids[6], cids[9]}) // Wait a short time for processing time.Sleep(10 * time.Millisecond) diff --git a/bitswap/internal/peermanager/peermanager.go b/bitswap/internal/peermanager/peermanager.go index aa40727b2..04b015bfd 100644 --- a/bitswap/internal/peermanager/peermanager.go +++ b/bitswap/internal/peermanager/peermanager.go @@ -3,7 +3,6 @@ package peermanager import ( "context" "sync" - "time" logging "github.com/ipfs/go-log" "github.com/ipfs/go-metrics-interface" @@ -19,7 +18,7 @@ type PeerQueue interface { AddBroadcastWantHaves([]cid.Cid) AddWants([]cid.Cid, []cid.Cid) AddCancels([]cid.Cid) - ResponseReceived(at time.Time, ks []cid.Cid) + ResponseReceived(ks []cid.Cid) Startup() Shutdown() } @@ -121,13 +120,13 @@ func (pm *PeerManager) Disconnected(p peer.ID) { // ResponseReceived is called when a message is received from the network. // ks is the set of blocks, HAVEs and DONT_HAVEs in the message // Note that this is just used to calculate latency. -func (pm *PeerManager) ResponseReceived(p peer.ID, at time.Time, ks []cid.Cid) { +func (pm *PeerManager) ResponseReceived(p peer.ID, ks []cid.Cid) { pm.pqLk.Lock() pq, ok := pm.peerQueues[p] pm.pqLk.Unlock() if ok { - pq.ResponseReceived(at, ks) + pq.ResponseReceived(ks) } } diff --git a/bitswap/internal/peermanager/peermanager_test.go b/bitswap/internal/peermanager/peermanager_test.go index d5d348fe6..560868466 100644 --- a/bitswap/internal/peermanager/peermanager_test.go +++ b/bitswap/internal/peermanager/peermanager_test.go @@ -35,7 +35,7 @@ func (fp *mockPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) { func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) { fp.msgs <- msg{fp.p, nil, nil, cs} } -func (fp *mockPeerQueue) ResponseReceived(at time.Time, ks []cid.Cid) { +func (fp *mockPeerQueue) ResponseReceived(ks []cid.Cid) { } type peerWants struct { From 997186839706407625f06f3d35737eb0edd0f17f Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 30 Apr 2020 13:33:12 -0400 Subject: [PATCH 3/6] fix: only record latency for first response per want This commit was moved from ipfs/go-bitswap@af8cba85b3cd30d0b7f63bc575d4e14a9331178b --- bitswap/internal/messagequeue/messagequeue.go | 24 ++++++++-- .../messagequeue/messagequeue_test.go | 44 +++++++++++++++++++ 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/bitswap/internal/messagequeue/messagequeue.go b/bitswap/internal/messagequeue/messagequeue.go index 07c18a77e..fd55fbee3 100644 --- a/bitswap/internal/messagequeue/messagequeue.go +++ b/bitswap/internal/messagequeue/messagequeue.go @@ -147,6 +147,13 @@ func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) { } } +// ClearSentAt clears out the record of the time a want was sent. +// We clear the sent at time when we receive a response for a key so that +// subsequent responses for the key don't appear to be even further delayed. +func (r *recallWantlist) ClearSentAt(c cid.Cid) { + delete(r.sentAt, c) +} + type peerConn struct { p peer.ID network MessageNetwork @@ -549,11 +556,20 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) { // Find the earliest request so as to calculate the longest latency as // we want to be conservative when setting the timeout. for _, c := range ks { - if at, ok := mq.bcstWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) { - earliest = at + if at, ok := mq.bcstWants.sentAt[c]; ok { + if earliest.IsZero() || at.Before(earliest) { + earliest = at + } + mq.bcstWants.ClearSentAt(c) } - if at, ok := mq.peerWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) { - earliest = at + if at, ok := mq.peerWants.sentAt[c]; ok { + if earliest.IsZero() || at.Before(earliest) { + earliest = at + } + // Clear out the sent time for the CID because we only want to + // record the latency between the request and the first response + // for that CID (not subsequent responses) + mq.peerWants.ClearSentAt(c) } } diff --git a/bitswap/internal/messagequeue/messagequeue_test.go b/bitswap/internal/messagequeue/messagequeue_test.go index 1ef0d2a5f..f0f32e0a7 100644 --- a/bitswap/internal/messagequeue/messagequeue_test.go +++ b/bitswap/internal/messagequeue/messagequeue_test.go @@ -640,6 +640,50 @@ func TestResponseReceived(t *testing.T) { } } +func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan []bsmsg.Entry) + resetChan := make(chan struct{}, 1) + fakeSender := newFakeMessageSender(resetChan, messagesSent, false) + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + + dhtm := &fakeDontHaveTimeoutMgr{} + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue.Startup() + + cids := testutil.GenerateCids(2) + + // Add some wants and wait 10ms + messageQueue.AddWants(cids, nil) + collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + + // Receive a response for the wants + messageQueue.ResponseReceived(cids) + + // Wait another 10ms + time.Sleep(10 * time.Millisecond) + + // Message queue should inform DHTM of first response + upds := dhtm.latencyUpdates() + if len(upds) != 1 { + t.Fatal("expected one latency update") + } + + // Receive a second response for the same wants + messageQueue.ResponseReceived(cids) + + // Wait for the response to be processed by the message queue + time.Sleep(10 * time.Millisecond) + + // Message queue should not inform DHTM of second response because the + // CIDs are a subset of the first response + upds = dhtm.latencyUpdates() + if len(upds) != 1 { + t.Fatal("expected one latency update") + } +} + func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) { var wbs []cid.Cid var whs []cid.Cid From 8a3442f08d0541a43a1b09d2f985d0a73dd813b7 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 30 Apr 2020 14:11:30 -0400 Subject: [PATCH 4/6] fix: discard outliers in latency calculation This commit was moved from ipfs/go-bitswap@a7c7865ad0bde1fd35394705612dfa12d9d62d21 --- bitswap/internal/messagequeue/messagequeue.go | 53 ++++++++++++++----- .../messagequeue/messagequeue_test.go | 52 ++++++++++++++++-- 2 files changed, 87 insertions(+), 18 deletions(-) diff --git a/bitswap/internal/messagequeue/messagequeue.go b/bitswap/internal/messagequeue/messagequeue.go index fd55fbee3..a3e21790d 100644 --- a/bitswap/internal/messagequeue/messagequeue.go +++ b/bitswap/internal/messagequeue/messagequeue.go @@ -41,6 +41,9 @@ const ( // when we debounce for more than sendMessageMaxDelay, we'll send the // message immediately. sendMessageMaxDelay = 20 * time.Millisecond + // The maximum amount of time in which to accept a response as being valid + // for latency calculation (as opposed to discarding it as an outlier) + maxValidLatency = 30 * time.Second ) // MessageNetwork is any network that can connect peers and generate a message @@ -55,14 +58,24 @@ type MessageNetwork interface { // MessageQueue implements queue of want messages to send to peers. type MessageQueue struct { - ctx context.Context - shutdown func() - p peer.ID - network MessageNetwork - dhTimeoutMgr DontHaveTimeoutManager - maxMessageSize int + ctx context.Context + shutdown func() + p peer.ID + network MessageNetwork + dhTimeoutMgr DontHaveTimeoutManager + + // The maximum size of a message in bytes. Any overflow is put into the + // next message + maxMessageSize int + + // The amount of time to wait when there's an error sending to a peer + // before retrying sendErrorBackoff time.Duration + // The maximum amount of time in which to accept a response as being valid + // for latency calculation + maxValidLatency time.Duration + // Signals that there are outgoing wants / cancels ready to be processed outgoingWork chan time.Time @@ -198,12 +211,18 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo onDontHaveTimeout(p, ks) } dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout) - return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, dhTimeoutMgr) + return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr) } // This constructor is used by the tests -func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, - maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue { +func newMessageQueue( + ctx context.Context, + p peer.ID, + network MessageNetwork, + maxMsgSize int, + sendErrorBackoff time.Duration, + maxValidLatency time.Duration, + dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue { ctx, cancel := context.WithCancel(ctx) return &MessageQueue{ @@ -220,6 +239,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, responses: make(chan []cid.Cid, 8), rebroadcastInterval: defaultRebroadcastInterval, sendErrorBackoff: sendErrorBackoff, + maxValidLatency: maxValidLatency, priority: maxPriority, // For performance reasons we just clear out the fields of the message // after using it, instead of creating a new one every time. @@ -553,17 +573,24 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) { // Check if the keys in the response correspond to any request that was // sent to the peer. - // Find the earliest request so as to calculate the longest latency as - // we want to be conservative when setting the timeout. + // + // - Find the earliest request so as to calculate the longest latency as + // we want to be conservative when setting the timeout + // - Ignore latencies that are very long, as these are likely to be outliers + // caused when + // - we send a want to peer A + // - peer A does not have the block + // - peer A later receives the block from peer B + // - peer A sends us HAVE / block for _, c := range ks { if at, ok := mq.bcstWants.sentAt[c]; ok { - if earliest.IsZero() || at.Before(earliest) { + if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency { earliest = at } mq.bcstWants.ClearSentAt(c) } if at, ok := mq.peerWants.sentAt[c]; ok { - if earliest.IsZero() || at.Before(earliest) { + if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency { earliest = at } // Clear out the sent time for the CID because we only want to diff --git a/bitswap/internal/messagequeue/messagequeue_test.go b/bitswap/internal/messagequeue/messagequeue_test.go index f0f32e0a7..4af3000ad 100644 --- a/bitswap/internal/messagequeue/messagequeue_test.go +++ b/bitswap/internal/messagequeue/messagequeue_test.go @@ -498,7 +498,7 @@ func TestSendingLargeMessages(t *testing.T) { wantBlocks := testutil.GenerateCids(10) entrySize := 44 maxMsgSize := entrySize * 3 // 3 wants - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, maxValidLatency, dhtm) messageQueue.Startup() messageQueue.AddWants(wantBlocks, []cid.Cid{}) @@ -578,7 +578,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { peerID := testutil.GeneratePeers(1)[0] dhtm := &fakeDontHaveTimeoutMgr{} - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm) messageQueue.Startup() wbs := testutil.GenerateCids(10) @@ -609,7 +609,7 @@ func TestResponseReceived(t *testing.T) { peerID := testutil.GeneratePeers(1)[0] dhtm := &fakeDontHaveTimeoutMgr{} - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm) messageQueue.Startup() cids := testutil.GenerateCids(10) @@ -649,7 +649,7 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) { peerID := testutil.GeneratePeers(1)[0] dhtm := &fakeDontHaveTimeoutMgr{} - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm) messageQueue.Startup() cids := testutil.GenerateCids(2) @@ -684,6 +684,48 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) { } } +func TestResponseReceivedDiscardsOutliers(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan []bsmsg.Entry) + resetChan := make(chan struct{}, 1) + fakeSender := newFakeMessageSender(resetChan, messagesSent, false) + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + + maxValLatency := 30 * time.Millisecond + dhtm := &fakeDontHaveTimeoutMgr{} + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValLatency, dhtm) + messageQueue.Startup() + + cids := testutil.GenerateCids(4) + + // Add some wants and wait 20ms + messageQueue.AddWants(cids[:2], nil) + collectMessages(ctx, t, messagesSent, 20*time.Millisecond) + + // Add some more wants and wait long enough that the first wants will be + // outside the maximum valid latency, but the second wants will be inside + messageQueue.AddWants(cids[2:], nil) + collectMessages(ctx, t, messagesSent, maxValLatency-10*time.Millisecond) + + // Receive a response for the wants + messageQueue.ResponseReceived(cids) + + // Wait for the response to be processed by the message queue + time.Sleep(10 * time.Millisecond) + + // Check that the latency calculation excludes the first wants + // (because they're older than max valid latency) + upds := dhtm.latencyUpdates() + if len(upds) != 1 { + t.Fatal("expected one latency update") + } + // Elapsed time should not include outliers + if upds[0] > maxValLatency { + t.Fatal("expected latency calculation to discard outliers") + } +} + func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) { var wbs []cid.Cid var whs []cid.Cid @@ -712,7 +754,7 @@ func BenchmarkMessageQueue(b *testing.B) { dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm) messageQueue.Startup() go func() { From d56f8df1dd473921ebfd624bd44a74a8b453630c Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 1 May 2020 11:04:05 -0400 Subject: [PATCH 5/6] docs: MessageQueue docs This commit was moved from ipfs/go-bitswap@f005819cabe8b88188366962a25925024d872b51 --- bitswap/internal/messagequeue/messagequeue.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bitswap/internal/messagequeue/messagequeue.go b/bitswap/internal/messagequeue/messagequeue.go index a3e21790d..24e80974b 100644 --- a/bitswap/internal/messagequeue/messagequeue.go +++ b/bitswap/internal/messagequeue/messagequeue.go @@ -161,8 +161,8 @@ func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) { } // ClearSentAt clears out the record of the time a want was sent. -// We clear the sent at time when we receive a response for a key so that -// subsequent responses for the key don't appear to be even further delayed. +// We clear the sent at time when we receive a response for a key as we +// only need the first response for latency measurement. func (r *recallWantlist) ClearSentAt(c cid.Cid) { delete(r.sentAt, c) } @@ -201,6 +201,7 @@ type DontHaveTimeoutManager interface { AddPending([]cid.Cid) // CancelPending removes the wants CancelPending([]cid.Cid) + // UpdateMessageLatency informs the manager of a new latency measurement UpdateMessageLatency(time.Duration) } From 522cdcc2041e9c98478f27d3fbd302eb3d1222ec Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 1 May 2020 12:05:37 -0400 Subject: [PATCH 6/6] test: fix flaky test TestSessionBetweenPeers This commit was moved from ipfs/go-bitswap@373033e7540d67c455587e61826d5a1c524f291a --- bitswap/bitswap_with_sessions_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bitswap/bitswap_with_sessions_test.go b/bitswap/bitswap_with_sessions_test.go index 9551938c9..f710879a1 100644 --- a/bitswap/bitswap_with_sessions_test.go +++ b/bitswap/bitswap_with_sessions_test.go @@ -9,10 +9,12 @@ import ( bitswap "github.com/ipfs/go-bitswap" bssession "github.com/ipfs/go-bitswap/internal/session" testinstance "github.com/ipfs/go-bitswap/testinstance" + tn "github.com/ipfs/go-bitswap/testnet" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" delay "github.com/ipfs/go-ipfs-delay" + mockrouting "github.com/ipfs/go-ipfs-routing/mock" tu "github.com/libp2p/go-libp2p-testing/etc" ) @@ -71,7 +73,7 @@ func TestSessionBetweenPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vnet := getVirtualNetwork() + vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -112,6 +114,10 @@ func TestSessionBetweenPeers(t *testing.T) { t.Fatal(err) } } + + // Uninvolved nodes should receive + // - initial broadcast want-have of root block + // - CANCEL (when Peer A receives the root block from Peer B) for _, is := range inst[2:] { stat, err := is.Exchange.Stat() if err != nil {