diff --git a/table.go b/table.go index 93d526a..0405d28 100644 --- a/table.go +++ b/table.go @@ -85,11 +85,37 @@ func (rt *RoutingTable) Close() error { return nil } -// TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op. +// NPeersForCPL returns the number of peers we have for a given Cpl +func (rt *RoutingTable) NPeersForCpl(cpl uint) int { + rt.tabLock.RLock() + defer rt.tabLock.RUnlock() + + // it's in the last bucket + if int(cpl) >= len(rt.buckets)-1 { + count := 0 + b := rt.buckets[len(rt.buckets)-1] + for _, p := range b.peers() { + if CommonPrefixLen(rt.local, p.dhtId) == int(cpl) { + count++ + } + } + return count + } else { + return rt.buckets[cpl].len() + } +} + +// TryAddPeer tries to add a peer to the Routing table. +// If the peer ALREADY exists in the Routing Table and has been queried before, this call is a no-op. +// If the peer ALREADY exists in the Routing Table but hasn't been queried before, we set it's LastUsefulAt value to +// the current time. This needs to done because we don't mark peers as "Useful"(by setting the LastUsefulAt value) +// when we first connect to them. +// // If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time. // If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having // no LastSuccessfulOutboundQuery. // +// // If the logical bucket to which the peer belongs is full and it's not the last bucket, we try to replace an existing peer // whose LastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer. // If no such peer exists in that bucket, we do NOT add the peer to the Routing Table and return error "ErrPeerRejectedNoCapacity". @@ -117,6 +143,11 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { // peer already exists in the Routing Table. if peer := bucket.getPeer(p); peer != nil { + // if we're querying the peer first time after adding it, let's give it a + // usefulness bump. This will ONLY happen once. + if peer.LastUsefulAt.IsZero() && queryPeer { + peer.LastUsefulAt = lastUsefulAt + } return false, nil } @@ -228,9 +259,25 @@ func (rt *RoutingTable) removePeer(p peer.ID) { bucketID := rt.bucketIdForPeer(p) bucket := rt.buckets[bucketID] if bucket.remove(p) { + for { + lastBucketIndex := len(rt.buckets) - 1 + + // remove the last bucket if it's empty and it isn't the only bucket we have + if len(rt.buckets) > 1 && rt.buckets[lastBucketIndex].len() == 0 { + rt.buckets[lastBucketIndex] = nil + rt.buckets = rt.buckets[:lastBucketIndex] + } else if len(rt.buckets) >= 2 && rt.buckets[lastBucketIndex-1].len() == 0 { + // if the second last bucket just became empty, remove and replace it with the last bucket. + rt.buckets[lastBucketIndex-1] = rt.buckets[lastBucketIndex] + rt.buckets[lastBucketIndex] = nil + rt.buckets = rt.buckets[:lastBucketIndex] + } else { + break + } + } + // peer removed callback rt.PeerRemoved(p) - return } } diff --git a/table_test.go b/table_test.go index 3e6f8b4..ab5352a 100644 --- a/table_test.go +++ b/table_test.go @@ -80,6 +80,137 @@ func TestBucket(t *testing.T) { } } +func TestNPeersForCpl(t *testing.T) { + t.Parallel() + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold) + require.NoError(t, err) + + require.Equal(t, 0, rt.NPeersForCpl(0)) + require.Equal(t, 0, rt.NPeersForCpl(1)) + + // one peer with cpl 1 + p, _ := rt.GenRandPeerID(1) + rt.TryAddPeer(p, true) + require.Equal(t, 0, rt.NPeersForCpl(0)) + require.Equal(t, 1, rt.NPeersForCpl(1)) + require.Equal(t, 0, rt.NPeersForCpl(2)) + + // one peer with cpl 0 + p, _ = rt.GenRandPeerID(0) + rt.TryAddPeer(p, true) + require.Equal(t, 1, rt.NPeersForCpl(0)) + require.Equal(t, 1, rt.NPeersForCpl(1)) + require.Equal(t, 0, rt.NPeersForCpl(2)) + + // split the bucket with a peer with cpl 1 + p, _ = rt.GenRandPeerID(1) + rt.TryAddPeer(p, true) + require.Equal(t, 1, rt.NPeersForCpl(0)) + require.Equal(t, 2, rt.NPeersForCpl(1)) + require.Equal(t, 0, rt.NPeersForCpl(2)) + + p, _ = rt.GenRandPeerID(0) + rt.TryAddPeer(p, true) + require.Equal(t, 2, rt.NPeersForCpl(0)) +} + +func TestEmptyBucketCollapse(t *testing.T) { + t.Parallel() + local := test.RandPeerIDFatal(t) + + m := pstore.NewMetrics() + rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, NoOpThreshold) + require.NoError(t, err) + + // generate peers with cpl 0,1,2 & 3 + p1, _ := rt.GenRandPeerID(0) + p2, _ := rt.GenRandPeerID(1) + p3, _ := rt.GenRandPeerID(2) + p4, _ := rt.GenRandPeerID(3) + + // remove peer on an empty bucket should not panic. + rt.RemovePeer(p1) + + // add peer with cpl 0 and remove it..bucket should still exist as it's the ONLY bucket we have + b, err := rt.TryAddPeer(p1, true) + require.True(t, b) + require.NoError(t, err) + rt.RemovePeer(p1) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 1) + rt.tabLock.Unlock() + require.Empty(t, rt.ListPeers()) + + // add peer with cpl 0 and cpl 1 and verify we have two buckets. + b, err = rt.TryAddPeer(p1, true) + require.True(t, b) + b, err = rt.TryAddPeer(p2, true) + require.True(t, b) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 2) + rt.tabLock.Unlock() + + // removing a peer from the last bucket collapses it. + rt.RemovePeer(p2) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 1) + rt.tabLock.Unlock() + require.Len(t, rt.ListPeers(), 1) + require.Contains(t, rt.ListPeers(), p1) + + // add p2 again + b, err = rt.TryAddPeer(p2, true) + require.True(t, b) + require.NoError(t, err) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 2) + rt.tabLock.Unlock() + + // now remove a peer from the second-last i.e. first bucket and ensure it collapses + rt.RemovePeer(p1) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 1) + rt.tabLock.Unlock() + require.Len(t, rt.ListPeers(), 1) + require.Contains(t, rt.ListPeers(), p2) + + // let's have a total of 4 buckets now + rt.TryAddPeer(p1, true) + rt.TryAddPeer(p2, true) + rt.TryAddPeer(p3, true) + rt.TryAddPeer(p4, true) + + rt.tabLock.Lock() + require.Len(t, rt.buckets, 4) + rt.tabLock.Unlock() + + // removing from 2,3 and then 4 leaves us with ONLY one bucket + rt.RemovePeer(p2) + rt.RemovePeer(p3) + rt.RemovePeer(p4) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 1) + rt.tabLock.Unlock() + + // an empty bucket in the middle DOES NOT collapse buckets + rt.TryAddPeer(p1, true) + rt.TryAddPeer(p2, true) + rt.TryAddPeer(p3, true) + rt.TryAddPeer(p4, true) + + rt.tabLock.Lock() + require.Len(t, rt.buckets, 4) + rt.tabLock.Unlock() + + rt.RemovePeer(p2) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 4) + rt.tabLock.Unlock() + require.NotContains(t, rt.ListPeers(), p2) +} + func TestRemovePeer(t *testing.T) { t.Parallel() local := test.RandPeerIDFatal(t)