Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#386 from ipfs/feat/msg-latency
Browse files Browse the repository at this point in the history
calculate message latency

This commit was moved from ipfs/go-bitswap@165b154
  • Loading branch information
Stebalien committed May 2, 2020
2 parents 6c9536b + 522cdcc commit 486c683
Show file tree
Hide file tree
Showing 8 changed files with 520 additions and 52 deletions.
10 changes: 10 additions & 0 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,16 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
allKs = append(allKs, b.Cid())
}

// If the message came from the network
if from != "" {
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)
}

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)
Expand Down
8 changes: 7 additions & 1 deletion bitswap/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/internal/session"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
tu "github.com/libp2p/go-libp2p-testing/etc"
)

Expand Down Expand Up @@ -71,7 +73,7 @@ func TestSessionBetweenPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vnet := getVirtualNetwork()
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -112,6 +114,10 @@ func TestSessionBetweenPeers(t *testing.T) {
t.Fatal(err)
}
}

// Uninvolved nodes should receive
// - initial broadcast want-have of root block
// - CANCEL (when Peer A receives the root block from Peer B)
for _, is := range inst[2:] {
stat, err := is.Exchange.Stat()
if err != nil {
Expand Down
120 changes: 101 additions & 19 deletions bitswap/internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,20 @@ const (
// peer takes to process a want and initiate sending a response to us
maxExpectedWantProcessTime = 2 * time.Second

// latencyMultiplier is multiplied by the average ping time to
// maxTimeout is the maximum allowed timeout, regardless of latency
maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime

// pingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
latencyMultiplier = 3
pingLatencyMultiplier = 3

// messageLatencyAlpha is the alpha supplied to the message latency EWMA
messageLatencyAlpha = 0.5

// To give a margin for error, the timeout is calculated as
// messageLatencyMultiplier * message latency
messageLatencyMultiplier = 2
)

// PeerConnection is a connection to a peer that can be pinged, and the
Expand All @@ -44,16 +54,20 @@ type pendingWant struct {
sent time.Time
}

// dontHaveTimeoutMgr pings the peer to measure latency. It uses the latency to
// set a reasonable timeout for simulating a DONT_HAVE message for peers that
// don't support DONT_HAVE or that take to long to respond.
// dontHaveTimeoutMgr simulates a DONT_HAVE message if the peer takes too long
// to respond to a message.
// The timeout is based on latency - we start with a default latency, while
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
type dontHaveTimeoutMgr struct {
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid)
defaultTimeout time.Duration
latencyMultiplier int
maxTimeout time.Duration
pingLatencyMultiplier int
messageLatencyMultiplier int
maxExpectedWantProcessTime time.Duration

// All variables below here must be protected by the lock
Expand All @@ -66,20 +80,27 @@ type dontHaveTimeoutMgr struct {
wantQueue []*pendingWant
// time to wait for a response (depends on latency)
timeout time.Duration
// ewma of message latency (time from message sent to response received)
messageLatency *latencyEwma
// timer used to wait until want at front of queue expires
checkForTimeoutsTimer *time.Timer
}

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout,
latencyMultiplier, maxExpectedWantProcessTime)
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime)
}

// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration, latencyMultiplier int,
func newDontHaveTimeoutMgrWithParams(
pc PeerConnection,
onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration,
maxTimeout time.Duration,
pingLatencyMultiplier int,
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration) *dontHaveTimeoutMgr {

ctx, shutdown := context.WithCancel(context.Background())
Expand All @@ -89,8 +110,11 @@ func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: defaultTimeout,
messageLatency: &latencyEwma{alpha: messageLatencyAlpha},
defaultTimeout: defaultTimeout,
latencyMultiplier: latencyMultiplier,
maxTimeout: maxTimeout,
pingLatencyMultiplier: pingLatencyMultiplier,
messageLatencyMultiplier: messageLatencyMultiplier,
maxExpectedWantProcessTime: maxExpectedWantProcessTime,
onDontHaveTimeout: onDontHaveTimeout,
}
Expand Down Expand Up @@ -126,16 +150,36 @@ func (dhtm *dontHaveTimeoutMgr) Start() {
// calculate a reasonable timeout
latency := dhtm.peerConn.Latency()
if latency.Nanoseconds() > 0 {
dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency)
dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)
return
}

// Otherwise measure latency by pinging the peer
go dhtm.measureLatency()
go dhtm.measurePingLatency()
}

// UpdateMessageLatency is called when we receive a response from the peer.
// It is the time between sending a request and receiving the corresponding
// response.
func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()

// Update the message latency and the timeout
dhtm.messageLatency.update(elapsed)
oldTimeout := dhtm.timeout
dhtm.timeout = dhtm.calculateTimeoutFromMessageLatency()

// If the timeout has decreased
if dhtm.timeout < oldTimeout {
// Check if after changing the timeout there are any pending wants that
// are now over the timeout
dhtm.checkForTimeouts()
}
}

// measureLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measureLatency() {
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// Wait up to defaultTimeout for a response to the ping
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
defer cancel()
Expand All @@ -154,8 +198,13 @@ func (dhtm *dontHaveTimeoutMgr) measureLatency() {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()

// A message has arrived so we already set the timeout based on message latency
if dhtm.messageLatency.samples > 0 {
return
}

// Calculate a reasonable timeout based on latency
dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency)
dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)

// Check if after changing the timeout there are any pending wants that are
// now over the timeout
Expand Down Expand Up @@ -284,10 +333,43 @@ func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
dhtm.onDontHaveTimeout(pending)
}

// calculateTimeoutFromLatency calculates a reasonable timeout derived from latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromLatency(latency time.Duration) time.Duration {
// calculateTimeoutFromPingLatency calculates a reasonable timeout derived from latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Duration) time.Duration {
// The maximum expected time for a response is
// the expected time to process the want + (latency * multiplier)
// The multiplier is to provide some padding for variable latency.
return dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.latencyMultiplier)*latency
timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
}
return timeout
}

// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
}
return timeout
}

// latencyEwma is an EWMA of message latency
type latencyEwma struct {
alpha float64
samples uint64
latency time.Duration
}

// update the EWMA with the given sample
func (le *latencyEwma) update(elapsed time.Duration) {
le.samples++

// Initially set alpha to be 1.0 / <the number of samples>
alpha := 1.0 / float64(le.samples)
if alpha < le.alpha {
// Once we have enough samples, clamp alpha
alpha = le.alpha
}
le.latency = time.Duration(float64(elapsed)*alpha + (1-alpha)*float64(le.latency))
}
88 changes: 80 additions & 8 deletions bitswap/internal/messagequeue/donthavetimeoutmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand All @@ -102,7 +102,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {

// At this stage first set of keys should have timed out
if tr.timedOutCount() != len(firstks) {
t.Fatal("expected timeout")
t.Fatal("expected timeout", tr.timedOutCount(), len(firstks))
}

// Clear the recorded timed out keys
Expand All @@ -129,7 +129,7 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) {
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand Down Expand Up @@ -160,7 +160,7 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand Down Expand Up @@ -204,7 +204,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand All @@ -222,6 +222,78 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
}
}

func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) {
ks := testutil.GenerateCids(2)
latency := time.Millisecond * 40
latMultiplier := 1
expProcessTime := time.Duration(0)
msgLatencyMultiplier := 1
pc := &mockPeerConn{latency: latency}
tr := timeoutRecorder{}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, msgLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add keys
dhtm.AddPending(ks)

// expectedTimeout
// = expProcessTime + latency*time.Duration(latMultiplier)
// = 0 + 40ms * 1
// = 40ms

// Wait for less than the expected timeout
time.Sleep(25 * time.Millisecond)

// Receive two message latency updates
dhtm.UpdateMessageLatency(time.Millisecond * 20)
dhtm.UpdateMessageLatency(time.Millisecond * 10)

// alpha is 0.5 so timeout should be
// = (20ms * alpha) + (10ms * (1 - alpha))
// = (20ms * 0.5) + (10ms * 0.5)
// = 15ms
// We've already slept for 25ms so with the new 15ms timeout
// the keys should have timed out

// Give the queue some time to process the updates
time.Sleep(5 * time.Millisecond)

if tr.timedOutCount() != len(ks) {
t.Fatal("expected keys to timeout")
}
}

func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) {
ks := testutil.GenerateCids(2)
pc := &mockPeerConn{latency: time.Second} // ignored
tr := timeoutRecorder{}
msgLatencyMultiplier := 1
testMaxTimeout := time.Millisecond * 10

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, testMaxTimeout, pingLatencyMultiplier, msgLatencyMultiplier, maxExpectedWantProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

// Add keys
dhtm.AddPending(ks)

// Receive a message latency update that would make the timeout greater
// than the maximum timeout
dhtm.UpdateMessageLatency(testMaxTimeout * 4)

// Sleep until just after the maximum timeout
time.Sleep(testMaxTimeout + 5*time.Millisecond)

// Keys should have timed out
if tr.timedOutCount() != len(ks) {
t.Fatal("expected keys to timeout")
}
}

func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
ks := testutil.GenerateCids(2)
latency := time.Millisecond * 1
Expand All @@ -233,7 +305,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
pc := &mockPeerConn{latency: latency, err: fmt.Errorf("ping error")}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, latMultiplier, expProcessTime)
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand Down Expand Up @@ -267,7 +339,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
pc := &mockPeerConn{latency: latency}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, latMultiplier, expProcessTime)
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand Down Expand Up @@ -300,7 +372,7 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
pc := &mockPeerConn{latency: latency}

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()

Expand Down
Loading

0 comments on commit 486c683

Please sign in to comment.