Skip to content

Commit

Permalink
Revert most of: Peer Diversity for Queries (#664)
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Aug 11, 2020
1 parent 5f066f2 commit 8b5d432
Show file tree
Hide file tree
Showing 8 changed files with 5 additions and 256 deletions.
4 changes: 0 additions & 4 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ type IpfsDHT struct {
routingTablePeerFilter RouteTableFilterFunc
rtPeerDiversityFilter peerdiversity.PeerIPGroupFilter

queryDiversityFilter peerdiversity.PeerIPGroupFilter

autoRefresh bool

// A set of bootstrap peers to fallback on if all other attempts to fix
Expand Down Expand Up @@ -294,8 +292,6 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
routingTablePeerFilter: cfg.routingTable.peerFilter,
rtPeerDiversityFilter: cfg.routingTable.diversityFilter,

queryDiversityFilter: cfg.queryDiversityFilter,

fixLowPeersChan: make(chan struct{}, 1),

addPeerToRTChan: make(chan addPeerRTReq),
Expand Down
14 changes: 1 addition & 13 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ type config struct {
diversityFilter peerdiversity.PeerIPGroupFilter
}

queryDiversityFilter peerdiversity.PeerIPGroupFilter

// set to true if we're operating in v1 dht compatible mode
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
Expand Down Expand Up @@ -127,7 +125,7 @@ var defaults = func(o *config) error {

o.bucketSize = defaultBucketSize
o.concurrency = 10
o.resiliency = 5
o.resiliency = 3

o.v1CompatibleMode = true

Expand Down Expand Up @@ -422,16 +420,6 @@ func RoutingTablePeerDiversityFilter(pg peerdiversity.PeerIPGroupFilter) Option
}
}

// QueryDiversityFilter configures the implementation of the `PeerIPGroupFilter` that will be used
// to construct the diversity filter for DHT queries.
// Please see the docs for `peerdiversity.PeerIPGroupFilter` AND `peerdiversity.Filter` for more details.
func QueryDiversityFilter(pg peerdiversity.PeerIPGroupFilter) Option {
return func(c *config) error {
c.queryDiversityFilter = pg
return nil
}
}

// disableFixLowPeersRoutine disables the "fixLowPeers" routine in the DHT.
// This is ONLY for tests.
func disableFixLowPeersRoutine(t *testing.T) Option {
Expand Down
1 change: 0 additions & 1 deletion dual/dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error)
dht.QueryFilter(dht.PublicQueryFilter),
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)),
dht.QueryDiversityFilter(dht.NewQueryDiversityFilter(h, maxPrefixCount)),
)
wan, err := dht.New(ctx, h, wanOpts...)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func NewLookupUpdateEvent(
waiting []peer.ID,
queried []peer.ID,
unreachable []peer.ID,
rejected []peer.ID,
) *LookupUpdateEvent {
return &LookupUpdateEvent{
Cause: OptPeerKadID(cause),
Expand All @@ -110,7 +109,6 @@ func NewLookupUpdateEvent(
Waiting: NewPeerKadIDSlice(waiting),
Queried: NewPeerKadIDSlice(queried),
Unreachable: NewPeerKadIDSlice(unreachable),
Rejected: NewPeerKadIDSlice(rejected),
}
}

Expand All @@ -129,8 +127,6 @@ type LookupUpdateEvent struct {
Queried []*PeerKadID
// Unreachable is a set of peers whose state in the lookup's peerset is being set to "unreachable".
Unreachable []*PeerKadID
// Rejected is a set of peers whose state in the lookup's peerset is being set to "rejected".
Rejected []*PeerKadID
}

// LookupTerminateEvent describes a lookup termination event.
Expand Down
2 changes: 0 additions & 2 deletions qpeerset/qpeerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ const (
PeerQueried
// PeerUnreachable is applied to peers who have been queried and a response was not retrieved successfully.
PeerUnreachable
// PeerRejected is applied to peers who have not been queried because they were rejected by the diversity filter.
PeerRejected
)

// QueryPeerset maintains the state of a Kademlia asynchronous lookup.
Expand Down
97 changes: 4 additions & 93 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import (
pstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"

"github.com/google/uuid"
)

// ErrNoPeersQueried is returned when we failed to connect to any peers.
Expand Down Expand Up @@ -48,13 +46,6 @@ type query struct {
// queryPeers is the set of peers known by this query and their respective states.
queryPeers *qpeerset.QueryPeerset

// dFilter is the diversity filter we use to ensure that peers we contact during the query are as diverse as possible.
// for now, we measure diversity by IPv4 prefixes/IPv6 ASNs.
dFilter *peerdiversity.Filter

// whiteListedPeers is ONLY used to assert that we NEVER move a Whitelisted Peer(Rejected -> Heard) to the Rejected state.
whiteListedPeers map[peer.ID]struct{}

// terminated is set when the first worker thread encounters the termination condition.
// Its role is to make sure that once termination is determined, it is sticky.
terminated bool
Expand Down Expand Up @@ -98,7 +89,7 @@ func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, qu
// by stateless query functions (e.g. GetClosestPeers and therefore Provide and PutValue)
queryPeers := make([]peer.ID, 0, len(lookupRes.peers))
for i, p := range lookupRes.peers {
if state := lookupRes.state[i]; state == qpeerset.PeerHeard || state == qpeerset.PeerWaiting || state == qpeerset.PeerRejected {
if state := lookupRes.state[i]; state == qpeerset.PeerHeard || state == qpeerset.PeerWaiting {
queryPeers = append(queryPeers, p)
}
}
Expand Down Expand Up @@ -179,18 +170,6 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn
stopFn: stopFn,
}

if dht.queryDiversityFilter != nil {
d, err := peerdiversity.NewFilter(dht.queryDiversityFilter, "dht/query", func(p peer.ID) int {
return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
})
if err != nil {
return nil, fmt.Errorf("failed to contruct diversity filter for query: %w", err)
}

q.dFilter = d
q.whiteListedPeers = make(map[peer.ID]struct{})
}

// run the query
q.run()

Expand Down Expand Up @@ -243,8 +222,7 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult {
// extract the top K not unreachable peers
var peers []peer.ID
peerState := make(map[peer.ID]qpeerset.PeerState)
qp := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried,
qpeerset.PeerRejected)
qp := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)
for _, p := range qp {
state := q.queryPeers.GetState(p)
peerState[p] = state
Expand Down Expand Up @@ -301,8 +279,6 @@ func (q *query) run() {
q.terminate(pathCtx, cancelPath, LookupCancelled)
}

q.allowClosestRejectedPeer()

// calculate the maximum number of queries we could be spawning.
// Note: NumWaiting will be updated in spawnQuery
maxNumQueriesToSpawn := alpha - q.queryPeers.NumWaiting()
Expand Down Expand Up @@ -339,7 +315,6 @@ func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID
[]peer.ID{queryPeer}, // waiting
nil, // queried
nil, // unreachable
nil, // rejected
),
nil,
nil,
Expand All @@ -362,42 +337,11 @@ func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool
return true, LookupCompleted, nil
}

// The peers we query next should be ones that are NOT rejected by the filter.
// The peers we query next should be ones that we have only Heard about.
var peersToQuery []peer.ID
peers := q.queryPeers.GetClosestInStates(qpeerset.PeerHeard)
count := 0
for _, p := range peers {
if q.dFilter != nil {
allowed := q.dFilter.TryAdd(p)
if !allowed {
src := q.queryPeers.GetReferrer(p)
PublishLookupEvent(ctx,
NewLookupEvent(
q.dht.self,
q.id,
q.key,
NewLookupUpdateEvent(
src,
src,
nil, // heard
nil, // waiting
nil, // queried
nil, // unreachable
[]peer.ID{p}, // rejected
),
nil,
nil,
),
)

// sanity check: we should NEVER REJECT a whitelisted peer as that can cause Heard -> Rejected -> Heard -> Rejected cycles.
if _, ok := q.whiteListedPeers[p]; ok {
panic(fmt.Errorf("invalid state: rejecting whitelisted peer %s", p))
}
q.queryPeers.SetState(p, qpeerset.PeerRejected)
continue
}
}
peersToQuery = append(peersToQuery, p)
count++
if count == nPeersToQuery {
Expand Down Expand Up @@ -429,32 +373,6 @@ func (q *query) isStarvationTermination() bool {
return q.queryPeers.NumHeard() == 0 && q.queryPeers.NumWaiting() == 0
}

func (q *query) allowClosestRejectedPeer() {
peers := q.queryPeers.GetClosestNInStates(q.dht.beta, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)
for _, p := range peers {
if q.queryPeers.GetState(p) != qpeerset.PeerQueried {
return
}
}

if q.dFilter != nil {
// If a rejected peer is closer to the key than the furthest
// peer among the Beta closest and queried peers, simply change it's state to "heard" and whitelist it in the filter so it can be queried.
cps := q.queryPeers.GetClosestInStates(qpeerset.PeerRejected)
if len(cps) == 0 {
return
}

for _, cp := range cps {
if len(peers) == 0 || kb.Closer(cp, peers[len(peers)-1], q.key) {
q.dFilter.WhitelistPeers(cp)
q.whiteListedPeers[cp] = struct{}{}
q.queryPeers.SetState(cp, qpeerset.PeerHeard)
}
}
}
}

func (q *query) terminate(ctx context.Context, cancel context.CancelFunc, reason LookupTerminationReason) {
if q.terminated {
return
Expand Down Expand Up @@ -545,7 +463,6 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) {
nil, // waiting
up.queried, // queried
up.unreachable, // unreachable
nil, // rejected
),
nil,
),
Expand Down Expand Up @@ -574,12 +491,6 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) {

if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting {
q.queryPeers.SetState(p, qpeerset.PeerUnreachable)
// we added this peer to the filter state when we selected it for querying.
// however, since this peer hasn't given us any information, we should not
// "book a slot" in the filter state for it.
if q.dFilter != nil {
q.dFilter.Remove(p)
}
} else {
panic(fmt.Errorf("kademlia protocol error: tried to transition to the unreachable state from state %v", st))
}
Expand Down
85 changes: 0 additions & 85 deletions query_diversity_filter.go

This file was deleted.

Loading

0 comments on commit 8b5d432

Please sign in to comment.