Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

calculate message latency #386

Merged
merged 6 commits into from
May 2, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil)
return bs.receiveBlocksFrom(context.Background(), time.Time{}, "", []blocks.Block{blk}, nil, nil)
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, at time.Time, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
Expand Down Expand Up @@ -348,6 +348,16 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
allKs = append(allKs, b.Cid())
}

// If the message came from the network
if from != "" {
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, at, combined)
}

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)
Expand Down Expand Up @@ -386,6 +396,8 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
// ReceiveMessage is called by the network interface when a new message is
// received.
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
now := time.Now()
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

bs.counterLk.Lock()
bs.counters.messagesRecvd++
bs.counterLk.Unlock()
Expand All @@ -409,7 +421,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
dontHaves := incoming.DontHaves()
if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 {
// Process blocks
err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves)
err := bs.receiveBlocksFrom(ctx, now, p, iblocks, haves, dontHaves)
if err != nil {
log.Warnf("ReceiveMessage recvBlockFrom error: %s", err)
return
Expand Down
120 changes: 101 additions & 19 deletions internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,20 @@ const (
// peer takes to process a want and initiate sending a response to us
maxExpectedWantProcessTime = 2 * time.Second

// latencyMultiplier is multiplied by the average ping time to
// maxTimeout is the maximum allowed timeout, regardless of latency
maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime

// pingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
latencyMultiplier = 3
pingLatencyMultiplier = 3

// messageLatencyAlpha is the alpha supplied to the message latency EWMA
messageLatencyAlpha = 0.5

// To give a margin for error, the timeout is calculated as
// messageLatencyMultiplier * message latency
messageLatencyMultiplier = 2
)

// PeerConnection is a connection to a peer that can be pinged, and the
Expand All @@ -44,16 +54,20 @@ type pendingWant struct {
sent time.Time
}

// dontHaveTimeoutMgr pings the peer to measure latency. It uses the latency to
// set a reasonable timeout for simulating a DONT_HAVE message for peers that
// don't support DONT_HAVE or that take to long to respond.
// dontHaveTimeoutMgr simulates a DONT_HAVE message if the peer takes too long
// to respond to a message.
// The timeout is based on latency - we start with a default latency, while
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
type dontHaveTimeoutMgr struct {
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid)
defaultTimeout time.Duration
latencyMultiplier int
maxTimeout time.Duration
pingLatencyMultiplier int
messageLatencyMultiplier int
maxExpectedWantProcessTime time.Duration

// All variables below here must be protected by the lock
Expand All @@ -66,20 +80,27 @@ type dontHaveTimeoutMgr struct {
wantQueue []*pendingWant
// time to wait for a response (depends on latency)
timeout time.Duration
// ewma of message latency (time from message sent to response received)
messageLatency *latencyEwma
// timer used to wait until want at front of queue expires
checkForTimeoutsTimer *time.Timer
}

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout,
latencyMultiplier, maxExpectedWantProcessTime)
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime)
}

// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration, latencyMultiplier int,
func newDontHaveTimeoutMgrWithParams(
pc PeerConnection,
onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration,
maxTimeout time.Duration,
pingLatencyMultiplier int,
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration) *dontHaveTimeoutMgr {

ctx, shutdown := context.WithCancel(context.Background())
Expand All @@ -89,8 +110,11 @@ func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: defaultTimeout,
messageLatency: &latencyEwma{alpha: messageLatencyAlpha},
defaultTimeout: defaultTimeout,
latencyMultiplier: latencyMultiplier,
maxTimeout: maxTimeout,
pingLatencyMultiplier: pingLatencyMultiplier,
messageLatencyMultiplier: messageLatencyMultiplier,
maxExpectedWantProcessTime: maxExpectedWantProcessTime,
onDontHaveTimeout: onDontHaveTimeout,
}
Expand Down Expand Up @@ -126,16 +150,36 @@ func (dhtm *dontHaveTimeoutMgr) Start() {
// calculate a reasonable timeout
latency := dhtm.peerConn.Latency()
if latency.Nanoseconds() > 0 {
dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency)
dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)
return
}

// Otherwise measure latency by pinging the peer
go dhtm.measureLatency()
go dhtm.measurePingLatency()
}

// UpdateMessageLatency is called when we receive a response from the peer.
// It is the time between sending a request and receiving the corresponding
// response.
func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()

// Update the message latency and the timeout
dhtm.messageLatency.update(elapsed)
oldTimeout := dhtm.timeout
dhtm.timeout = dhtm.calculateTimeoutFromMessageLatency()

// If the timeout has decreased
if dhtm.timeout < oldTimeout {
// Check if after changing the timeout there are any pending wants that
// are now over the timeout
dhtm.checkForTimeouts()
}
}

// measureLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measureLatency() {
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// Wait up to defaultTimeout for a response to the ping
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
defer cancel()
Expand All @@ -154,8 +198,13 @@ func (dhtm *dontHaveTimeoutMgr) measureLatency() {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()

// A message has arrived so we already set the timeout based on message latency
if dhtm.messageLatency.samples > 0 {
return
}

// Calculate a reasonable timeout based on latency
dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency)
dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)

// Check if after changing the timeout there are any pending wants that are
// now over the timeout
Expand Down Expand Up @@ -284,10 +333,43 @@ func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
dhtm.onDontHaveTimeout(pending)
}

// calculateTimeoutFromLatency calculates a reasonable timeout derived from latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromLatency(latency time.Duration) time.Duration {
// calculateTimeoutFromPingLatency calculates a reasonable timeout derived from latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Duration) time.Duration {
// The maximum expected time for a response is
// the expected time to process the want + (latency * multiplier)
// The multiplier is to provide some padding for variable latency.
return dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.latencyMultiplier)*latency
timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
}
return timeout
}

// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
}
return timeout
}

// latencyEwma is an EWMA of message latency
type latencyEwma struct {
alpha float64
samples uint64
latency time.Duration
}

// update the EWMA with the given sample
func (le *latencyEwma) update(elapsed time.Duration) {
le.samples++

// Initially set alpha to be 1.0 / <the number of samples>
alpha := 1.0 / float64(le.samples)
if alpha < le.alpha {
// Once we have enough samples, clamp alpha
alpha = le.alpha
}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
le.latency = time.Duration(float64(elapsed)*alpha + (1-alpha)*float64(le.latency))
}
88 changes: 80 additions & 8 deletions internal/messagequeue/donthavetimeoutmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand All @@ -102,7 +102,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {

// At this stage first set of keys should have timed out
if tr.timedOutCount() != len(firstks) {
t.Fatal("expected timeout")
t.Fatal("expected timeout", tr.timedOutCount(), len(firstks))
}

// Clear the recorded timed out keys
Expand All @@ -129,7 +129,7 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) {
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand Down Expand Up @@ -160,7 +160,7 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand Down Expand Up @@ -204,7 +204,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand All @@ -222,6 +222,78 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
}
}

func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) {
ks := testutil.GenerateCids(2)
latency := time.Millisecond * 40
latMultiplier := 1
expProcessTime := time.Duration(0)
msgLatencyMultiplier := 1
pc := &mockPeerConn{latency: latency}
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, msgLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add keys
dhtm.AddPending(ks)

// expectedTimeout
// = expProcessTime + latency*time.Duration(latMultiplier)
// = 0 + 40ms * 1
// = 40ms

// Wait for less than the expected timeout
time.Sleep(25 * time.Millisecond)

// Receive two message latency updates
dhtm.UpdateMessageLatency(time.Millisecond * 20)
dhtm.UpdateMessageLatency(time.Millisecond * 10)

// alpha is 0.5 so timeout should be
// = (20ms * alpha) + (10ms * (1 - alpha))
// = (20ms * 0.5) + (10ms * 0.5)
// = 15ms
// We've already slept for 25ms so with the new 15ms timeout
// the keys should have timed out

// Give the queue some time to process the updates
time.Sleep(5 * time.Millisecond)

if tr.timedOutCount() != len(ks) {
t.Fatal("expected keys to timeout")
}
}

func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) {
ks := testutil.GenerateCids(2)
pc := &mockPeerConn{latency: time.Second} // ignored
tr := timeoutRecorder{}
msgLatencyMultiplier := 1
testMaxTimeout := time.Millisecond * 10

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, testMaxTimeout, pingLatencyMultiplier, msgLatencyMultiplier, maxExpectedWantProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add keys
dhtm.AddPending(ks)

// Receive a message latency update that would make the timeout greater
// than the maximum timeout
dhtm.UpdateMessageLatency(testMaxTimeout * 4)

// Sleep until just after the maximum timeout
time.Sleep(testMaxTimeout + 5*time.Millisecond)

// Keys should have timed out
if tr.timedOutCount() != len(ks) {
t.Fatal("expected keys to timeout")
}
}

func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
ks := testutil.GenerateCids(2)
latency := time.Millisecond * 1
Expand All @@ -233,7 +305,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
pc := &mockPeerConn{latency: latency, err: fmt.Errorf("ping error")}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, latMultiplier, expProcessTime)
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand Down Expand Up @@ -267,7 +339,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
pc := &mockPeerConn{latency: latency}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, latMultiplier, expProcessTime)
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand Down Expand Up @@ -300,7 +372,7 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
pc := &mockPeerConn{latency: latency}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand Down
Loading