diff --git a/dual/dual.go b/dual/dual.go index 0b4317999..efb5353e8 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -42,6 +42,11 @@ var ( _ routing.ValueStore = (*DHT)(nil) ) +var ( + maxPrefixCountPerCpl = 2 + maxPrefixCount = 3 +) + // New creates a new DualDHT instance. Options provided are forwarded on to the two concrete // IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce // the LAN-vs-WAN distinction. @@ -51,7 +56,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) wanOpts := append(options, dht.QueryFilter(dht.PublicQueryFilter), dht.RoutingTableFilter(dht.PublicRoutingTableFilter), - dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, 2, 3)), + dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)), ) wan, err := dht.New(ctx, h, wanOpts...) if err != nil { diff --git a/qpeerset/qpeerset.go b/qpeerset/qpeerset.go index 971b5489d..b940a0430 100644 --- a/qpeerset/qpeerset.go +++ b/qpeerset/qpeerset.go @@ -121,53 +121,39 @@ func (qp *QueryPeerset) GetReferrer(p peer.ID) peer.ID { return qp.all[qp.find(p)].referredBy } -// NumWaiting returns the number of peers in state PeerWaiting. -func (qp *QueryPeerset) NumWaiting() int { - return len(qp.GetWaitingPeers()) -} - -// GetWaitingPeers returns a slice of all peers in state PeerWaiting, in an undefined order. -func (qp *QueryPeerset) GetWaitingPeers() (result []peer.ID) { - for _, p := range qp.all { - if p.state == PeerWaiting { - result = append(result, p.id) - } +// GetClosestNInStates returns the closest to the key peers, which are in one of the given states. +// It returns n peers or less, if fewer peers meet the condition. +// The returned peers are sorted in ascending order by their distance to the key. +func (qp *QueryPeerset) GetClosestNInStates(n int, states ...PeerState) (result []peer.ID) { + qp.sort() + m := make(map[PeerState]struct{}, len(states)) + for i := range states { + m[states[i]] = struct{}{} } - return -} -// GetClosestNotUnreachable returns the closest to the key peers, which are not in state PeerUnreachable. -// It returns count peers or less, if fewer peers meet the condition. -func (qp *QueryPeerset) GetClosestNotUnreachable(count int) (result []peer.ID) { - qp.sort() for _, p := range qp.all { - if p.state != PeerUnreachable { + if _, ok := m[p.state]; ok { result = append(result, p.id) } } - if len(result) >= count { - return result[:count] + if len(result) >= n { + return result[:n] } return result } -// NumHeard returns the number of peers in state PeerHeard. -func (qp *QueryPeerset) NumHeard() int { - return len(qp.GetHeardPeers()) +// GetClosestInStates returns the peers, which are in one of the given states. +// The returned peers are sorted in ascending order by their distance to the key. +func (qp *QueryPeerset) GetClosestInStates(states ...PeerState) (result []peer.ID) { + return qp.GetClosestNInStates(len(qp.all), states...) } -// GetHeardPeers returns a slice of all peers in state PeerHeard, in an undefined order. -func (qp *QueryPeerset) GetHeardPeers() (result []peer.ID) { - for _, p := range qp.all { - if p.state == PeerHeard { - result = append(result, p.id) - } - } - return +// NumHeard returns the number of peers in state PeerHeard. +func (qp *QueryPeerset) NumHeard() int { + return len(qp.GetClosestInStates(PeerHeard)) } -// GetSortedHeard returns a slice of all peers in state PeerHeard, ordered by ascending distance to the target key. -func (qp *QueryPeerset) GetSortedHeard() (result []peer.ID) { - qp.sort() - return qp.GetHeardPeers() +// NumWaiting returns the number of peers in state PeerWaiting. +func (qp *QueryPeerset) NumWaiting() int { + return len(qp.GetClosestInStates(PeerWaiting)) } diff --git a/qpeerset/qpeerset_test.go b/qpeerset/qpeerset_test.go index 4d0f7db0a..bd40413af 100644 --- a/qpeerset/qpeerset_test.go +++ b/qpeerset/qpeerset_test.go @@ -56,31 +56,31 @@ func TestQPeerSet(t *testing.T) { // add peer4 require.True(t, qp.TryAdd(peer4, oracle)) - cl := qp.GetClosestNotUnreachable(2) + cl := qp.GetClosestNInStates(2, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer4, peer2}, cl) - cl = qp.GetClosestNotUnreachable(3) + cl = qp.GetClosestNInStates(3, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer4, peer2}, cl) - cl = qp.GetClosestNotUnreachable(1) + cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer4}, cl) // mark as unreachable & try to get it qp.SetState(peer4, PeerUnreachable) - cl = qp.GetClosestNotUnreachable(1) + cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer2}, cl) // add peer1 require.True(t, qp.TryAdd(peer1, oracle)) - cl = qp.GetClosestNotUnreachable(1) + cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer1}, cl) - cl = qp.GetClosestNotUnreachable(2) + cl = qp.GetClosestNInStates(2, PeerHeard, PeerWaiting, PeerQueried) require.Equal(t, []peer.ID{peer1, peer2}, cl) // mark as waiting and assert qp.SetState(peer2, PeerWaiting) - require.Equal(t, []peer.ID{peer2}, qp.GetWaitingPeers()) + require.Equal(t, []peer.ID{peer2}, qp.GetClosestInStates(PeerWaiting)) - require.Equal(t, []peer.ID{peer1}, qp.GetHeardPeers()) + require.Equal(t, []peer.ID{peer1}, qp.GetClosestInStates(PeerHeard)) require.True(t, qp.TryAdd(peer3, oracle)) - require.Equal(t, []peer.ID{peer3, peer1}, qp.GetSortedHeard()) + require.Equal(t, []peer.ID{peer3, peer1}, qp.GetClosestInStates(PeerHeard)) require.Equal(t, 2, qp.NumHeard()) } diff --git a/query.go b/query.go index 88a1b63e6..47d87df05 100644 --- a/query.go +++ b/query.go @@ -8,13 +8,12 @@ import ( "sync" "time" - "github.com/google/uuid" - "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" pstore "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + "github.com/google/uuid" "github.com/libp2p/go-libp2p-kad-dht/qpeerset" kb "github.com/libp2p/go-libp2p-kbucket" ) @@ -223,7 +222,7 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult { // extract the top K not unreachable peers var peers []peer.ID peerState := make(map[peer.ID]qpeerset.PeerState) - qp := q.queryPeers.GetClosestNotUnreachable(q.dht.bucketSize) + qp := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried) for _, p := range qp { state := q.queryPeers.GetState(p) peerState[p] = state @@ -251,10 +250,11 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult { } type queryUpdate struct { - cause peer.ID - heard []peer.ID - queried []peer.ID - unreachable []peer.ID + cause peer.ID + queried []peer.ID + heard []peer.ID + unreachable []peer.ID + queryDuration time.Duration } @@ -279,8 +279,14 @@ func (q *query) run() { q.terminate(pathCtx, cancelPath, LookupCancelled) } + // calculate the maximum number of queries we could be spawning. + // Note: NumWaiting will be updated in spawnQuery + maxNumQueriesToSpawn := alpha - q.queryPeers.NumWaiting() + // termination is triggered on end-of-lookup conditions or starvation of unused peers - if ready, reason := q.isReadyToTerminate(); ready { + // it also returns the peers we should query next for a maximum of `maxNumQueriesToSpawn` peers. + ready, reason, qPeers := q.isReadyToTerminate(pathCtx, maxNumQueriesToSpawn) + if ready { q.terminate(pathCtx, cancelPath, reason) } @@ -288,29 +294,15 @@ func (q *query) run() { return } - // if all "threads" are busy, wait until someone finishes - if q.queryPeers.NumWaiting() >= alpha { - continue - } - - // spawn new queries, up to the parallelism allowance - // calculate the maximum number of queries we could be spawning. - // Note: NumWaiting will be updated in spawnQuery - maxNumQueriesToSpawn := alpha - q.queryPeers.NumWaiting() // try spawning the queries, if there are no available peers to query then we won't spawn them - for j := 0; j < maxNumQueriesToSpawn; j++ { - q.spawnQuery(pathCtx, cause, ch) + for _, p := range qPeers { + q.spawnQuery(pathCtx, cause, p, ch) } } } // spawnQuery starts one query, if an available heard peer is found -func (q *query) spawnQuery(ctx context.Context, cause peer.ID, ch chan<- *queryUpdate) { - peers := q.queryPeers.GetSortedHeard() - if len(peers) == 0 { - return - } - +func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID, ch chan<- *queryUpdate) { PublishLookupEvent(ctx, NewLookupEvent( q.dht.self, @@ -318,39 +310,52 @@ func (q *query) spawnQuery(ctx context.Context, cause peer.ID, ch chan<- *queryU q.key, NewLookupUpdateEvent( cause, - q.queryPeers.GetReferrer(peers[0]), - nil, // heard - []peer.ID{peers[0]}, // waiting - nil, // queried - nil, // unreachable + q.queryPeers.GetReferrer(queryPeer), + nil, // heard + []peer.ID{queryPeer}, // waiting + nil, // queried + nil, // unreachable ), nil, nil, ), ) - q.queryPeers.SetState(peers[0], qpeerset.PeerWaiting) + q.queryPeers.SetState(queryPeer, qpeerset.PeerWaiting) q.waitGroup.Add(1) - go q.queryPeer(ctx, ch, peers[0]) + go q.queryPeer(ctx, ch, queryPeer) } -func (q *query) isReadyToTerminate() (bool, LookupTerminationReason) { +func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) { // give the application logic a chance to terminate if q.stopFn() { - return true, LookupStopped + return true, LookupStopped, nil } if q.isStarvationTermination() { - return true, LookupStarvation + return true, LookupStarvation, nil } if q.isLookupTermination() { - return true, LookupCompleted + return true, LookupCompleted, nil } - return false, -1 + + // The peers we query next should be ones that we have only Heard about. + var peersToQuery []peer.ID + peers := q.queryPeers.GetClosestInStates(qpeerset.PeerHeard) + count := 0 + for _, p := range peers { + peersToQuery = append(peersToQuery, p) + count++ + if count == nPeersToQuery { + break + } + } + + return false, -1, peersToQuery } // From the set of all nodes that are not unreachable, // if the closest beta nodes are all queried, the lookup can terminate. func (q *query) isLookupTermination() bool { - peers := q.queryPeers.GetClosestNotUnreachable(q.dht.beta) + peers := q.queryPeers.GetClosestNInStates(q.dht.beta, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried) for _, p := range peers { if q.queryPeers.GetState(p) != qpeerset.PeerQueried { return false @@ -478,6 +483,7 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) { if p == q.dht.self { // don't add self. continue } + if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { q.queryPeers.SetState(p, qpeerset.PeerUnreachable) } else { diff --git a/routing.go b/routing.go index b57e0ae84..f31e9a5b2 100644 --- a/routing.go +++ b/routing.go @@ -667,7 +667,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, // Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol // and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT // server peer and is not identified as such is a bug. - dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard + dialedPeerDuringQuery = (lookupRes.state[i] == qpeerset.PeerQueried || lookupRes.state[i] == qpeerset.PeerUnreachable || lookupRes.state[i] == qpeerset.PeerWaiting) break } } diff --git a/rt_diversity_filter.go b/rt_diversity_filter.go index 35d9021ef..06c3116e6 100644 --- a/rt_diversity_filter.go +++ b/rt_diversity_filter.go @@ -1,11 +1,14 @@ package dht import ( + "sync" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" + ma "github.com/multiformats/go-multiaddr" - "sync" ) var _ peerdiversity.PeerIPGroupFilter = (*rtPeerIPGroupFilter)(nil) diff --git a/rt_diversity_filter_test.go b/rt_diversity_filter_test.go index 1843e3604..d439e863a 100644 --- a/rt_diversity_filter_test.go +++ b/rt_diversity_filter_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + kb "github.com/libp2p/go-libp2p-kbucket" "github.com/libp2p/go-libp2p-kbucket/peerdiversity" swarmt "github.com/libp2p/go-libp2p-swarm/testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" @@ -51,10 +52,64 @@ func TestRTPeerDiversityFilter(t *testing.T) { require.False(t, r.Allow(g2)) } -func TestRoutingTableEndToEnd(t *testing.T) { +func TestRoutingTableEndToEndMaxPerCpl(t *testing.T) { ctx := context.Background() h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) - r := NewRTPeerDiversityFilter(h, 2, 3) + r := NewRTPeerDiversityFilter(h, 1, 2) + + d, err := New( + ctx, + h, + testPrefix, + NamespacedValidator("v", blankValidator{}), + Mode(ModeServer), + DisableAutoRefresh(), + RoutingTablePeerDiversityFilter(r), + ) + require.NoError(t, err) + + var d2 *IpfsDHT + var d3 *IpfsDHT + + for { + d2 = setupDHT(ctx, t, false) + if kb.CommonPrefixLen(d.selfKey, kb.ConvertPeerID(d2.self)) == 1 { + break + } + } + + for { + d3 = setupDHT(ctx, t, false) + if kb.CommonPrefixLen(d.selfKey, kb.ConvertPeerID(d3.self)) == 1 { + break + } + } + + // d2 will be allowed in the Routing table but + // d3 will not be allowed. + connectNoSync(t, ctx, d, d2) + require.Eventually(t, func() bool { + return d.routingTable.Find(d2.self) != "" + }, 1*time.Second, 100*time.Millisecond) + + connectNoSync(t, ctx, d, d3) + time.Sleep(1 * time.Second) + require.Len(t, d.routingTable.ListPeers(), 1) + require.True(t, d.routingTable.Find(d3.self) == "") + + // it works after removing d2 + d.routingTable.RemovePeer(d2.self) + b, err := d.routingTable.TryAddPeer(d3.self, true, false) + require.NoError(t, err) + require.True(t, b) + require.Len(t, d.routingTable.ListPeers(), 1) + require.True(t, d.routingTable.Find(d3.self) != "") +} + +func TestRoutingTableEndToEndMaxPerTable(t *testing.T) { + ctx := context.Background() + h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + r := NewRTPeerDiversityFilter(h, 100, 3) d, err := New( ctx, @@ -67,7 +122,7 @@ func TestRoutingTableEndToEnd(t *testing.T) { ) require.NoError(t, err) - // only 3 peers per prefix for a cpl + // only 3 peers per prefix for the table. d2 := setupDHT(ctx, t, false, DisableAutoRefresh()) connect(t, ctx, d, d2) waitForWellFormedTables(t, []*IpfsDHT{d}, 1, 1, 1*time.Second) @@ -84,4 +139,5 @@ func TestRoutingTableEndToEnd(t *testing.T) { connectNoSync(t, ctx, d, d5) time.Sleep(1 * time.Second) require.Len(t, d.routingTable.ListPeers(), 3) + require.True(t, d.routingTable.Find(d5.self) == "") }