Skip to content

Commit

Permalink
refactor: Refactoring components from Peer Diversity for Queries (#664)
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 authored and aschmahmann committed Aug 14, 2020
1 parent 1bfc878 commit 07d0e9a
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 88 deletions.
7 changes: 6 additions & 1 deletion dual/dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ var (
_ routing.ValueStore = (*DHT)(nil)
)

var (
maxPrefixCountPerCpl = 2
maxPrefixCount = 3
)

// New creates a new DualDHT instance. Options provided are forwarded on to the two concrete
// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce
// the LAN-vs-WAN distinction.
Expand All @@ -51,7 +56,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error)
wanOpts := append(options,
dht.QueryFilter(dht.PublicQueryFilter),
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, 2, 3)),
dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)),
)
wan, err := dht.New(ctx, h, wanOpts...)
if err != nil {
Expand Down
56 changes: 21 additions & 35 deletions qpeerset/qpeerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,53 +121,39 @@ func (qp *QueryPeerset) GetReferrer(p peer.ID) peer.ID {
return qp.all[qp.find(p)].referredBy
}

// NumWaiting returns the number of peers in state PeerWaiting.
func (qp *QueryPeerset) NumWaiting() int {
return len(qp.GetWaitingPeers())
}

// GetWaitingPeers returns a slice of all peers in state PeerWaiting, in an undefined order.
func (qp *QueryPeerset) GetWaitingPeers() (result []peer.ID) {
for _, p := range qp.all {
if p.state == PeerWaiting {
result = append(result, p.id)
}
// GetClosestNInStates returns the closest to the key peers, which are in one of the given states.
// It returns n peers or less, if fewer peers meet the condition.
// The returned peers are sorted in ascending order by their distance to the key.
func (qp *QueryPeerset) GetClosestNInStates(n int, states ...PeerState) (result []peer.ID) {
qp.sort()
m := make(map[PeerState]struct{}, len(states))
for i := range states {
m[states[i]] = struct{}{}
}
return
}

// GetClosestNotUnreachable returns the closest to the key peers, which are not in state PeerUnreachable.
// It returns count peers or less, if fewer peers meet the condition.
func (qp *QueryPeerset) GetClosestNotUnreachable(count int) (result []peer.ID) {
qp.sort()
for _, p := range qp.all {
if p.state != PeerUnreachable {
if _, ok := m[p.state]; ok {
result = append(result, p.id)
}
}
if len(result) >= count {
return result[:count]
if len(result) >= n {
return result[:n]
}
return result
}

// NumHeard returns the number of peers in state PeerHeard.
func (qp *QueryPeerset) NumHeard() int {
return len(qp.GetHeardPeers())
// GetClosestInStates returns the peers, which are in one of the given states.
// The returned peers are sorted in ascending order by their distance to the key.
func (qp *QueryPeerset) GetClosestInStates(states ...PeerState) (result []peer.ID) {
return qp.GetClosestNInStates(len(qp.all), states...)
}

// GetHeardPeers returns a slice of all peers in state PeerHeard, in an undefined order.
func (qp *QueryPeerset) GetHeardPeers() (result []peer.ID) {
for _, p := range qp.all {
if p.state == PeerHeard {
result = append(result, p.id)
}
}
return
// NumHeard returns the number of peers in state PeerHeard.
func (qp *QueryPeerset) NumHeard() int {
return len(qp.GetClosestInStates(PeerHeard))
}

// GetSortedHeard returns a slice of all peers in state PeerHeard, ordered by ascending distance to the target key.
func (qp *QueryPeerset) GetSortedHeard() (result []peer.ID) {
qp.sort()
return qp.GetHeardPeers()
// NumWaiting returns the number of peers in state PeerWaiting.
func (qp *QueryPeerset) NumWaiting() int {
return len(qp.GetClosestInStates(PeerWaiting))
}
18 changes: 9 additions & 9 deletions qpeerset/qpeerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,31 @@ func TestQPeerSet(t *testing.T) {

// add peer4
require.True(t, qp.TryAdd(peer4, oracle))
cl := qp.GetClosestNotUnreachable(2)
cl := qp.GetClosestNInStates(2, PeerHeard, PeerWaiting, PeerQueried)
require.Equal(t, []peer.ID{peer4, peer2}, cl)
cl = qp.GetClosestNotUnreachable(3)
cl = qp.GetClosestNInStates(3, PeerHeard, PeerWaiting, PeerQueried)
require.Equal(t, []peer.ID{peer4, peer2}, cl)
cl = qp.GetClosestNotUnreachable(1)
cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried)
require.Equal(t, []peer.ID{peer4}, cl)

// mark as unreachable & try to get it
qp.SetState(peer4, PeerUnreachable)
cl = qp.GetClosestNotUnreachable(1)
cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried)
require.Equal(t, []peer.ID{peer2}, cl)

// add peer1
require.True(t, qp.TryAdd(peer1, oracle))
cl = qp.GetClosestNotUnreachable(1)
cl = qp.GetClosestNInStates(1, PeerHeard, PeerWaiting, PeerQueried)
require.Equal(t, []peer.ID{peer1}, cl)
cl = qp.GetClosestNotUnreachable(2)
cl = qp.GetClosestNInStates(2, PeerHeard, PeerWaiting, PeerQueried)
require.Equal(t, []peer.ID{peer1, peer2}, cl)

// mark as waiting and assert
qp.SetState(peer2, PeerWaiting)
require.Equal(t, []peer.ID{peer2}, qp.GetWaitingPeers())
require.Equal(t, []peer.ID{peer2}, qp.GetClosestInStates(PeerWaiting))

require.Equal(t, []peer.ID{peer1}, qp.GetHeardPeers())
require.Equal(t, []peer.ID{peer1}, qp.GetClosestInStates(PeerHeard))
require.True(t, qp.TryAdd(peer3, oracle))
require.Equal(t, []peer.ID{peer3, peer1}, qp.GetSortedHeard())
require.Equal(t, []peer.ID{peer3, peer1}, qp.GetClosestInStates(PeerHeard))
require.Equal(t, 2, qp.NumHeard())
}
82 changes: 44 additions & 38 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
"sync"
"time"

"github.com/google/uuid"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
)
Expand Down Expand Up @@ -223,7 +222,7 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult {
// extract the top K not unreachable peers
var peers []peer.ID
peerState := make(map[peer.ID]qpeerset.PeerState)
qp := q.queryPeers.GetClosestNotUnreachable(q.dht.bucketSize)
qp := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)
for _, p := range qp {
state := q.queryPeers.GetState(p)
peerState[p] = state
Expand Down Expand Up @@ -251,10 +250,11 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult {
}

type queryUpdate struct {
cause peer.ID
heard []peer.ID
queried []peer.ID
unreachable []peer.ID
cause peer.ID
queried []peer.ID
heard []peer.ID
unreachable []peer.ID

queryDuration time.Duration
}

Expand All @@ -279,78 +279,83 @@ func (q *query) run() {
q.terminate(pathCtx, cancelPath, LookupCancelled)
}

// calculate the maximum number of queries we could be spawning.
// Note: NumWaiting will be updated in spawnQuery
maxNumQueriesToSpawn := alpha - q.queryPeers.NumWaiting()

// termination is triggered on end-of-lookup conditions or starvation of unused peers
if ready, reason := q.isReadyToTerminate(); ready {
// it also returns the peers we should query next for a maximum of `maxNumQueriesToSpawn` peers.
ready, reason, qPeers := q.isReadyToTerminate(pathCtx, maxNumQueriesToSpawn)
if ready {
q.terminate(pathCtx, cancelPath, reason)
}

if q.terminated {
return
}

// if all "threads" are busy, wait until someone finishes
if q.queryPeers.NumWaiting() >= alpha {
continue
}

// spawn new queries, up to the parallelism allowance
// calculate the maximum number of queries we could be spawning.
// Note: NumWaiting will be updated in spawnQuery
maxNumQueriesToSpawn := alpha - q.queryPeers.NumWaiting()
// try spawning the queries, if there are no available peers to query then we won't spawn them
for j := 0; j < maxNumQueriesToSpawn; j++ {
q.spawnQuery(pathCtx, cause, ch)
for _, p := range qPeers {
q.spawnQuery(pathCtx, cause, p, ch)
}
}
}

// spawnQuery starts one query, if an available heard peer is found
func (q *query) spawnQuery(ctx context.Context, cause peer.ID, ch chan<- *queryUpdate) {
peers := q.queryPeers.GetSortedHeard()
if len(peers) == 0 {
return
}

func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID, ch chan<- *queryUpdate) {
PublishLookupEvent(ctx,
NewLookupEvent(
q.dht.self,
q.id,
q.key,
NewLookupUpdateEvent(
cause,
q.queryPeers.GetReferrer(peers[0]),
nil, // heard
[]peer.ID{peers[0]}, // waiting
nil, // queried
nil, // unreachable
q.queryPeers.GetReferrer(queryPeer),
nil, // heard
[]peer.ID{queryPeer}, // waiting
nil, // queried
nil, // unreachable
),
nil,
nil,
),
)
q.queryPeers.SetState(peers[0], qpeerset.PeerWaiting)
q.queryPeers.SetState(queryPeer, qpeerset.PeerWaiting)
q.waitGroup.Add(1)
go q.queryPeer(ctx, ch, peers[0])
go q.queryPeer(ctx, ch, queryPeer)
}

func (q *query) isReadyToTerminate() (bool, LookupTerminationReason) {
func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) {
// give the application logic a chance to terminate
if q.stopFn() {
return true, LookupStopped
return true, LookupStopped, nil
}
if q.isStarvationTermination() {
return true, LookupStarvation
return true, LookupStarvation, nil
}
if q.isLookupTermination() {
return true, LookupCompleted
return true, LookupCompleted, nil
}
return false, -1

// The peers we query next should be ones that we have only Heard about.
var peersToQuery []peer.ID
peers := q.queryPeers.GetClosestInStates(qpeerset.PeerHeard)
count := 0
for _, p := range peers {
peersToQuery = append(peersToQuery, p)
count++
if count == nPeersToQuery {
break
}
}

return false, -1, peersToQuery
}

// From the set of all nodes that are not unreachable,
// if the closest beta nodes are all queried, the lookup can terminate.
func (q *query) isLookupTermination() bool {
peers := q.queryPeers.GetClosestNotUnreachable(q.dht.beta)
peers := q.queryPeers.GetClosestNInStates(q.dht.beta, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)
for _, p := range peers {
if q.queryPeers.GetState(p) != qpeerset.PeerQueried {
return false
Expand Down Expand Up @@ -478,6 +483,7 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) {
if p == q.dht.self { // don't add self.
continue
}

if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting {
q.queryPeers.SetState(p, qpeerset.PeerUnreachable)
} else {
Expand Down
2 changes: 1 addition & 1 deletion routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
// server peer and is not identified as such is a bug.
dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard
dialedPeerDuringQuery = (lookupRes.state[i] == qpeerset.PeerQueried || lookupRes.state[i] == qpeerset.PeerUnreachable || lookupRes.state[i] == qpeerset.PeerWaiting)
break
}
}
Expand Down
5 changes: 4 additions & 1 deletion rt_diversity_filter.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package dht

import (
"sync"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/libp2p/go-libp2p-kbucket/peerdiversity"

ma "github.com/multiformats/go-multiaddr"
"sync"
)

var _ peerdiversity.PeerIPGroupFilter = (*rtPeerIPGroupFilter)(nil)
Expand Down
Loading

0 comments on commit 07d0e9a

Please sign in to comment.