From ac6d9f25cfc54a8a157e03d31c50b4ad872bacbb Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 9 Apr 2020 23:06:21 +0530 Subject: [PATCH 1/3] add more state to peer --- bucket.go | 11 ++++++++--- go.mod | 2 +- table.go | 46 ++++++++++++++++++++++++++++++++-------------- table_test.go | 32 ++++++++++++++++++-------------- 4 files changed, 59 insertions(+), 32 deletions(-) diff --git a/bucket.go b/bucket.go index aa3f046..2b74b73 100644 --- a/bucket.go +++ b/bucket.go @@ -12,9 +12,14 @@ import ( // PeerInfo holds all related information for a peer in the K-Bucket. type PeerInfo struct { Id peer.ID - // LastSuccessfulOutboundQuery is the time instant when we last made a successful - // outbound query to this peer - LastSuccessfulOutboundQuery time.Time + + // LastUsefulAt is the time instant at which the peer was last "useful" to us. + // Please see the DHT docs for the definition of usefulness. + LastUsefulAt time.Time + + // LastSuccessfulOutboundQueryAt is the time instant at which we last got a + // successful query response from the peer. + LastSuccessfulOutboundQueryAt time.Time // Id of the peer in the DHT XOR keyspace dhtId ID diff --git a/go.mod b/go.mod index 3185c00..fd828ff 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/libp2p/go-libp2p-kbucket require ( github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-log v1.0.3 - github.com/jbenet/goprocess v0.1.3 + github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-libp2p-core v0.5.1 github.com/libp2p/go-libp2p-peerstore v0.2.2 github.com/minio/sha256-simd v0.1.1 diff --git a/table.go b/table.go index c69e8a6..78bfdbc 100644 --- a/table.go +++ b/table.go @@ -49,15 +49,14 @@ type RoutingTable struct { PeerRemoved func(peer.ID) PeerAdded func(peer.ID) - // maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "LastSuccessfulOutboundQuery" + // maxLastUsefulAt is the max threshold/upper limit for the value of "LastUsefulAt" // of the peer in the bucket above which we will evict it to make place for a new peer if the bucket // is full - maxLastSuccessfulOutboundThreshold float64 + maxLastUsefulAt float64 } // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. -// Passing a nil PeerValidationFunc disables periodic table cleanup. -func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, maxLastSuccessfulOutboundThreshold float64) (*RoutingTable, error) { +func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, maxLastUsefulAt float64) (*RoutingTable, error) { rt := &RoutingTable{ buckets: []*bucket{newBucket()}, bucketsize: bucketsize, @@ -71,7 +70,7 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst PeerRemoved: func(peer.ID) {}, PeerAdded: func(peer.ID) {}, - maxLastSuccessfulOutboundThreshold: maxLastSuccessfulOutboundThreshold, + maxLastUsefulAt: maxLastUsefulAt, } rt.ctx, rt.ctxCancel = context.WithCancel(context.Background()) @@ -111,9 +110,9 @@ func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool) (bool, error) { func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { bucketID := rt.bucketIdForPeer(p) bucket := rt.buckets[bucketID] - var lastSuccessfulOutboundQuery time.Time + var lastUsefulAt time.Time if queryPeer { - lastSuccessfulOutboundQuery = time.Now() + lastUsefulAt = time.Now() } // peer already exists in the Routing Table. @@ -129,7 +128,8 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { // We have enough space in the bucket (whether spawned or grouped). if bucket.len() < rt.bucketsize { - bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) + bucket.pushFront(&PeerInfo{Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: time.Now(), + dhtId: ConvertPeerID(p)}) rt.PeerAdded(p) return true, nil } @@ -143,7 +143,8 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { // push the peer only if the bucket isn't overflowing after slitting if bucket.len() < rt.bucketsize { - bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) + bucket.pushFront(&PeerInfo{Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: time.Now(), + dhtId: ConvertPeerID(p)}) rt.PeerAdded(p) return true, nil } @@ -153,10 +154,11 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { // in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it. allPeers := bucket.peers() for _, pc := range allPeers { - if float64(time.Since(pc.LastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold { + if float64(time.Since(pc.LastUsefulAt)) > rt.maxLastUsefulAt { // let's evict it and add the new peer if bucket.remove(pc.Id) { - bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) + bucket.pushFront(&PeerInfo{Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: time.Now(), + dhtId: ConvertPeerID(p)}) rt.PeerAdded(p) return true, nil } @@ -180,9 +182,9 @@ func (rt *RoutingTable) GetPeerInfos() []PeerInfo { return pis } -// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQuery time of the peer +// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQueryAt time of the peer. // Returns true if the update was successful, false otherwise. -func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time) bool { +func (rt *RoutingTable) UpdateLastSuccessfulOutboundQueryAt(p peer.ID, t time.Time) bool { rt.tabLock.Lock() defer rt.tabLock.Unlock() @@ -190,7 +192,23 @@ func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time bucket := rt.buckets[bucketID] if pc := bucket.getPeer(p); pc != nil { - pc.LastSuccessfulOutboundQuery = t + pc.LastSuccessfulOutboundQueryAt = t + return true + } + return false +} + +// UpdateLastUsefulAt updates the LastUsefulAt time of the peer. +// Returns true if the update was successful, false otherwise. +func (rt *RoutingTable) UpdateLastUsefulAt(p peer.ID, t time.Time) bool { + rt.tabLock.Lock() + defer rt.tabLock.Unlock() + + bucketID := rt.bucketIdForPeer(p) + bucket := rt.buckets[bucketID] + + if pc := bucket.getPeer(p); pc != nil { + pc.LastUsefulAt = t return true } return false diff --git a/table_test.go b/table_test.go index eac385b..464b707 100644 --- a/table_test.go +++ b/table_test.go @@ -27,13 +27,14 @@ func TestPrint(t *testing.T) { func TestBucket(t *testing.T) { t.Parallel() testTime1 := time.Now() + testTime2 := time.Now().AddDate(1, 0, 0) b := newBucket() peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) - b.pushFront(&PeerInfo{peers[i], testTime1, ConvertPeerID(peers[i])}) + b.pushFront(&PeerInfo{peers[i], testTime1, testTime2, ConvertPeerID(peers[i])}) } local := test.RandPeerIDFatal(t) @@ -47,14 +48,17 @@ func TestBucket(t *testing.T) { require.NotNil(t, p) require.Equal(t, peers[i], p.Id) require.Equal(t, ConvertPeerID(peers[i]), p.dhtId) - require.EqualValues(t, testTime1, p.LastSuccessfulOutboundQuery) + require.EqualValues(t, testTime1, p.LastUsefulAt) + require.EqualValues(t, testTime2, p.LastSuccessfulOutboundQueryAt) - // mark as missing t2 := time.Now().Add(1 * time.Hour) - p.LastSuccessfulOutboundQuery = t2 + t3 := t2.Add(1 * time.Hour) + p.LastSuccessfulOutboundQueryAt = t2 + p.LastUsefulAt = t3 p = b.getPeer(peers[i]) require.NotNil(t, p) - require.EqualValues(t, t2, p.LastSuccessfulOutboundQuery) + require.EqualValues(t, t2, p.LastSuccessfulOutboundQueryAt) + require.EqualValues(t, t3, p.LastUsefulAt) spl := b.split(0, ConvertPeerID(local)) llist := b.list @@ -201,7 +205,7 @@ func TestTableFind(t *testing.T) { } } -func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) { +func TestUpdateLastSuccessfulOutboundQueryAt(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, NoOpThreshold) @@ -214,11 +218,11 @@ func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) { // increment and assert t2 := time.Now().Add(1 * time.Hour) - rt.UpdateLastSuccessfulOutboundQuery(p, t2) + rt.UpdateLastSuccessfulOutboundQueryAt(p, t2) rt.tabLock.Lock() pi := rt.buckets[0].getPeer(p) require.NotNil(t, pi) - require.EqualValues(t, t2, pi.LastSuccessfulOutboundQuery) + require.EqualValues(t, t2, pi.LastSuccessfulOutboundQueryAt) rt.tabLock.Unlock() } @@ -257,9 +261,9 @@ func TestTryAddPeer(t *testing.T) { require.True(t, b) require.Equal(t, p4, rt.Find(p4)) - // adding a peer with cpl 0 works if an existing peer has LastSuccessfulOutboundQuery above the max threshold + // adding a peer with cpl 0 works if an existing peer has LastUsefulAt above the max threshold // because that existing peer will get replaced - require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p2, time.Now().AddDate(0, 0, -2))) + require.True(t, rt.UpdateLastUsefulAt(p2, time.Now().AddDate(0, 0, -2))) b, err = rt.TryAddPeer(p3, true) require.NoError(t, err) require.True(t, b) @@ -271,7 +275,7 @@ func TestTryAddPeer(t *testing.T) { // however adding peer fails if below threshold p5, err := rt.GenRandPeerID(0) require.NoError(t, err) - require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p1, time.Now())) + require.True(t, rt.UpdateLastUsefulAt(p1, time.Now())) b, err = rt.TryAddPeer(p5, true) require.Error(t, err) require.False(t, b) @@ -285,7 +289,7 @@ func TestTryAddPeer(t *testing.T) { rt.tabLock.Lock() pi := rt.buckets[rt.bucketIdForPeer(p6)].getPeer(p6) require.NotNil(t, p6) - require.True(t, pi.LastSuccessfulOutboundQuery.IsZero()) + require.True(t, pi.LastUsefulAt.IsZero()) rt.tabLock.Unlock() } @@ -425,9 +429,9 @@ func TestGetPeerInfos(t *testing.T) { } require.Equal(t, p1, ms[p1].Id) - require.True(t, ms[p1].LastSuccessfulOutboundQuery.IsZero()) + require.True(t, ms[p1].LastUsefulAt.IsZero()) require.Equal(t, p2, ms[p2].Id) - require.False(t, ms[p2].LastSuccessfulOutboundQuery.IsZero()) + require.False(t, ms[p2].LastUsefulAt.IsZero()) } func BenchmarkAddPeer(b *testing.B) { From a4329c395c4df7816a1c58c69d5703616d10fff4 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 9 Apr 2020 23:26:33 +0530 Subject: [PATCH 2/3] better naming --- table.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/table.go b/table.go index 78bfdbc..96750a2 100644 --- a/table.go +++ b/table.go @@ -49,14 +49,14 @@ type RoutingTable struct { PeerRemoved func(peer.ID) PeerAdded func(peer.ID) - // maxLastUsefulAt is the max threshold/upper limit for the value of "LastUsefulAt" - // of the peer in the bucket above which we will evict it to make place for a new peer if the bucket + // usefulnessGracePeriod is the maximum grace period we will give to a + // peer in the bucket to be useful to us, failing which, we will evict it to make place for a new peer if the bucket // is full - maxLastUsefulAt float64 + usefulnessGracePeriod float64 } // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. -func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, maxLastUsefulAt float64) (*RoutingTable, error) { +func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, usefulnessGracePeriod float64) (*RoutingTable, error) { rt := &RoutingTable{ buckets: []*bucket{newBucket()}, bucketsize: bucketsize, @@ -70,7 +70,7 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst PeerRemoved: func(peer.ID) {}, PeerAdded: func(peer.ID) {}, - maxLastUsefulAt: maxLastUsefulAt, + usefulnessGracePeriod: usefulnessGracePeriod, } rt.ctx, rt.ctxCancel = context.WithCancel(context.Background()) @@ -154,7 +154,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { // in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it. allPeers := bucket.peers() for _, pc := range allPeers { - if float64(time.Since(pc.LastUsefulAt)) > rt.maxLastUsefulAt { + if float64(time.Since(pc.LastUsefulAt)) > rt.usefulnessGracePeriod { // let's evict it and add the new peer if bucket.remove(pc.Id) { bucket.pushFront(&PeerInfo{Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: time.Now(), From 8ee3433d2070af75327f699e837534ab21781b44 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 9 Apr 2020 23:27:16 +0530 Subject: [PATCH 3/3] better naming --- table_test.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/table_test.go b/table_test.go index 464b707..e40020d 100644 --- a/table_test.go +++ b/table_test.go @@ -226,6 +226,27 @@ func TestUpdateLastSuccessfulOutboundQueryAt(t *testing.T) { rt.tabLock.Unlock() } +func TestUpdateLastUsefulAt(t *testing.T) { + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, NoOpThreshold) + require.NoError(t, err) + + p := test.RandPeerIDFatal(t) + b, err := rt.TryAddPeer(p, true) + require.True(t, b) + require.NoError(t, err) + + // increment and assert + t2 := time.Now().Add(1 * time.Hour) + rt.UpdateLastUsefulAt(p, t2) + rt.tabLock.Lock() + pi := rt.buckets[0].getPeer(p) + require.NotNil(t, pi) + require.EqualValues(t, t2, pi.LastUsefulAt) + rt.tabLock.Unlock() +} + func TestTryAddPeer(t *testing.T) { minThreshold := float64(24 * 1 * time.Hour) t.Parallel()