-
Notifications
You must be signed in to change notification settings - Fork 223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Peer Diversity for Queries #664
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a few issues with this PR, but nothing we can't fix 😁. I added some suggestions where I found issues, but any solution is fine as long as the underlying issues are fixed without the code becoming difficult to reason about.
Note: We need to search the code for places we reference dht.bucketSize
since we may now end up returning up to 2*dht.bucketSize
peers from a lookup. Hopefully no upstream users are depending on us returning exactly bucketSize
peers from GetClosestPeers
, despite it referencing K
in the documentation for that function.
query.go
Outdated
"github.com/libp2p/go-libp2p-kbucket/peerdiversity" | ||
ks "github.com/whyrusleeping/go-keyspace" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import ordering (we should probably just have a git hook that does this for us, or at least does the lint check)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
query.go
Outdated
func (qp *query) distanceToKey(p peer.ID) *big.Int { | ||
k := ks.XORKeySpace.Key([]byte(qp.key)) | ||
return ks.XORKeySpace.Key([]byte(p)).Distance(k) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be used anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this.
query_diversity_filter_test.go
Outdated
h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) | ||
r := NewQueryDiversityFilter(h, 2) | ||
|
||
// table should only have 2 for each prefix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// table should only have 2 for each prefix | |
// query should only have 2 for each prefix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
query.go
Outdated
for _, p := range peers { | ||
if kb.Closer(clFiltered[0], p, q.key) { | ||
return false | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we looking through all of the peers
here? They're ordered aren't they, so looking at the last one should be sufficient. Maybe we should add a comment to GetClosestNotUnreachable
indicating that the results are ordered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
query.go
Outdated
// add the peer as heard if it's a diverse peer. | ||
if q.dFilter.TryAdd(p) { | ||
// filter allowed the peer. Try to add it to the query set and remove it from | ||
// the filter if it's not added to the query set. | ||
if !q.queryPeers.TryAdd(p, up.cause) { | ||
q.dFilter.Remove(p) | ||
} | ||
} else { | ||
// filter didn't allow the peer. So, add it to the set of query peers rejected by the diversity filter. | ||
q.dfRejectedPeers.TryAdd(p, up.cause) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an issue here where since we mark peers as non-diverse when they are added to the queue, not popped off the queue this means that if I hear about maxAllowed + 1
(e.g. 3) peers in the same IP range that only maxAllowed
(e.g. 2) of them are actually queryable. This seems like a potential attack since an attacker can return a set of peers from IP ranges/ASNs that it wants to deny the client access to without actually owning any peers in that range (e.g. I can just make up an AWS IP range and when you get a dial failure that'll be enough to stop you from querying other AWS peers).
Figuring this out is a bit tricky because of the asynchronous nature of things. One thought I had is if we're willing to potentially waste some dials maybe we could add something like this:
- Add
Rejected
state to the peer queue - In the
updateState
function when we receive a response see if the peer is allowed and if not mark it asRejected
, take thequeryUpdate
information and stash it in some map of rejected peers that's on the query instance - Have some strategy for minimizing the number of peers we query that end up
Rejected
(e.g. when processing a new peer that is entering theQueried
state go through the peer queue and mark disallowed peers asRejected
. Alternatively, we could add another check inspawnQuery
for rejecting peers and process those rejections inupdateState
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may also be some advantages to be gained from limiting the number of outbound queries to a single IP range so as to limit the issues above:
For example, in spawnQuery
we could select the top candidate that is not in the same IP range as one of the peers that is in the Queried
state. This could potentially prevent us from having to discard a peer due to filter rejection. However, since we're not changing the termination condition this "skip" is just a temporary measure. Of course, if a peer should actually be rejected (since we have enough successful peers in the IP range), then we should reject it and not check it again.
Question: This still allows peer A to effect when peer B is queried by delaying the query. I don't think this matters though given the constraints we're imposing on the attackers in this scenario. lmk if you disagree.
query.go
Outdated
// if the closet beta diverse peers have been queried, pick a non-diverse peer that's closer | ||
// than any of the beta peer and add it to the set of peers to be queried. | ||
q.allowClosestFilteredPeer() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this approach of combining both the diverse and non-diverse lookups together is going to cause us problems/be confusing.
For example, by ending up with all the diverse and non-diverse peers in the same queue we have no way of getting out the results we want which are the K best diverse peers + the <=K best non-diverse peers.
One idea for how to handle this (I wouldn't be surprised if you found a better one) would be to:
- In the
runQuery
function create two separate query objects (not sure whether to use the same queryID here, but probably)- The first one has a filter and the rest of the parameters are normal. When peers are rejected for being non-diverse they are added
- After the first one completes create a second one where the state is initialized based off of the first query
- Going through the sorted list of peers from the peer queue (this function needs to be exported, or perhaps can be handled with an API similar to what we already have):
- Find the K closest peers that are either Queried, Heard, or Rejected. Add the Queried and Heard ones (along with their referrers) to the peer queue
- Add any Unreachable peers between up until the last of these peers to the peer queue
- Going through the sorted list of peers from the peer queue (this function needs to be exported, or perhaps can be handled with an API similar to what we already have):
- Run the second query
- Combine the results into a single lookup result instance
query.go
Outdated
break | ||
} | ||
|
||
if len(qp) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a function qp.Validate
now that will check if the peerID is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
query.go
Outdated
// main `queryPeers` set. Instead, mark it as queried but rejected and cache the query results | ||
// in `rejectedResults` so we can process them later if we need them to finish the query. | ||
qp := up.queried | ||
if qp != q.dht.self && len(qp) != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
len(qp) != 0
is really meant as qp != peer.ID("")
right? If so maybe either call that out explicitly or use qp.Validate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
query_diversity_filter_test.go
Outdated
"github.com/libp2p/go-libp2p-core/peer" | ||
ma "github.com/multiformats/go-multiaddr" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ordering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
query.go
Outdated
queried peer.ID | ||
heard []peer.ID | ||
queried []peer.ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For clarity/posterity: Why the switch? Does it make the code much simpler and does it limit us down the line at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking through this, I've reverted this.
query.go
Outdated
} else { | ||
panic(fmt.Errorf("kademlia protocol error: tried to transition to the queried state from state %v", st)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous structure of this function did not assume that every call to updateState
was performed by a query returning (i.e. a transition from Waiting -> Queried/Unreachable), instead processing the updates as they came.
The idea was to try to keep all the state machine logic in one place (although it was actually two since the transition from Heard -> Waiting
happens in spawnQuery
. Is there a reason not to preserve that here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aschmahmann I've fixed this and reverted the code to the structure it had earlier after pushing the "non-state transition" part to the queryPeer
function.
However, I don't have a clear idea in my head of how to do the same with the allowClosestRejectedPeer
function. Let's discuss this with @petar .
@@ -373,6 +417,51 @@ func (q *query) isLookupTermination() bool { | |||
return true | |||
} | |||
|
|||
func (q *query) allowClosestRejectedPeer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about modeling these as events in updateState
and perhaps adding events to the Query events in order to track the state transitions here?
@petar will your parsers fail if they run into an unexpected event type? Also, wondering if you have any thoughts on how to integrate the peer diversity into the query logic without muddling it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parser would fail, but the fix is easy. The relevant code is here: https://github.com/libp2p/py-libp2p-xor/blob/master/lookup/events.py#L82
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how we would do this. Let's discuss this.
query.go
Outdated
// TODO | ||
// select a peer whose ASN does not conflict with that of an inflight request so we don't | ||
// query peers that eventually end up rejected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's much of a difference between them, but another option which might more accurately reflect the filter is to not query a peer when if all queries in progress succeed would fail the filter. To me the latter seems easier to implement as it more naturally follows the filter pattern.
You could then have a second filter with duplicate state except that it also has all of the Waiting
peers added to it, then you can check if each peer that you want to query is allowed or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be off-base here, but:
If you change your design to insert peers into the filter right before they are queried (i.e. get rid of "q.dFilter.Remove(p)" here), then you resolve the "TODO" above and you simplify the code in updateState() and you don't need the state RejectedAndQueried (since it cannot happen).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@petar That's what we started with but @aschmahmann pointed out some important problems it would create by opening up new attack vectors:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aschmahmann Yeah, I get what you mean. I'll fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aschmahmann I tried implementing this but it's messy to get it right because synchronising the query filter and the waiting filter isn't easy. For now, we can either go with a simple "don't query a peer" with the same prefix heuristic or discuss this in more depth.
qpeerset/qpeerset.go
Outdated
func (qp *QueryPeerset) GetClosestNotUnreachable(count int) (result []peer.ID) { | ||
// The returned peers are sorted in ascending order by their distance to the key. | ||
func (qp *QueryPeerset) GetClosestNotUnreachableOrRejected(count int) (result []peer.ID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about replacing all of these GetXYZ
functions with GetClosestWithStates(count int, states... PeerState)
. Seems like it'd make the code a bit easier to work with as we add new states. The downside is that it might be useful for this to be a little bit miserable when we change the state machine to make sure we do so carefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After trying this out, it does look good. Have fixed this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are some initial comments. I'll add more later today. I need to think about this more.
It is hard for me to verify at a glance that the code won't block forever when isReadyToTerminate returns false, but then the diversity check rejects all candidates for querying.
query.go
Outdated
@@ -237,7 +256,7 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult { | |||
// extract the top K not unreachable peers | |||
var peers []peer.ID | |||
peerState := make(map[peer.ID]qpeerset.PeerState) | |||
qp := q.queryPeers.GetClosestNotUnreachable(q.dht.bucketSize) | |||
qp := q.queryPeers.GetClosestNotUnreachableOrRejected(q.dht.bucketSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, queried peers who are not diverse are not acceptable in the result, even though they may be closest to the target?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@petar Good catch. This is a mistake and have fixed this.
The idea is to discover the K closest peers using diverse peers along the way. However, if the K closest peers to the key are non-diverse/malicious, I am not sure there is much we can do. Let's discuss this.
query.go
Outdated
// TODO | ||
// select a peer whose ASN does not conflict with that of an inflight request so we don't | ||
// query peers that eventually end up rejected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be off-base here, but:
If you change your design to insert peers into the filter right before they are queried (i.e. get rid of "q.dFilter.Remove(p)" here), then you resolve the "TODO" above and you simplify the code in updateState() and you don't need the state RejectedAndQueried (since it cannot happen).
@@ -373,6 +417,51 @@ func (q *query) isLookupTermination() bool { | |||
return true | |||
} | |||
|
|||
func (q *query) allowClosestRejectedPeer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parser would fail, but the fix is easy. The relevant code is here: https://github.com/libp2p/py-libp2p-xor/blob/master/lookup/events.py#L82
qpeerset/qpeerset.go
Outdated
// GetClosestInStates returns the closest to the key peers, which are in one of the given states. | ||
// The returned peers are sorted in ascending order by their distance to the key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// GetClosestInStates returns the closest to the key peers, which are in one of the given states. | |
// The returned peers are sorted in ascending order by their distance to the key. | |
// GetClosestInStates returns the peers, which are in one of the given states. | |
// The returned peers are sorted in ascending order by their distance to the key. |
We're returning all the keys here sorted, I just removed the words closest from the comment since it seemed extraneous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
query.go
Outdated
// 2. If the rejected peer has already been queried, change it's state to "queried" and queue up all results returned by it for querying | ||
// by changing their state to "heard". | ||
|
||
// peer among the Beta closest and queried peers simply change it's state to "heard" and whitelist it in the filter so it can be queried. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// peer among the Beta closest and queried peers simply change it's state to "heard" and whitelist it in the filter so it can be queried. | |
// peer among the Beta closest and queried peers, simply change it's state to "heard" and whitelist it in the filter so it can be queried. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
query.go
Outdated
} | ||
|
||
if len(peersToQuery) >= nPeersToQuery { | ||
peersToQuery = peersToQuery[:nPeersToQuery] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you cut down the slice, for all cut peers, you also need to undo the adding to the filter and the state setting. I think the easiest thing to do is to limit the for-loop itself to not exceed nPeersToQuery steps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this catch ! Have fixed this.
query.go
Outdated
@@ -450,6 +531,10 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID | |||
ch <- &queryUpdate{cause: p, heard: saw, queried: []peer.ID{p}, queryDuration: queryDuration} | |||
} | |||
|
|||
func (q *query) shouldRejectQueriedPeer(p peer.ID) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have removed this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of comments.
a9700ed
to
cf44590
Compare
86b1a4b
to
8e56690
Compare
@aschmahmann @petar Have increased |
whiteListedPeers map[peer.ID]struct{} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment here about how this is only used for verification and not for actually performing the whitelisting.
routing.go
Outdated
@@ -667,7 +667,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, | |||
// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol | |||
// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT | |||
// server peer and is not identified as such is a bug. | |||
dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard | |||
dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard && lookupRes.state[i] != qpeerset.PeerRejected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's flip this to check if the state is Queried
or Waiting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
return true, LookupCompleted, nil | ||
} | ||
|
||
return false, -1, peersToQuery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a constant (define if needed), instead of -1.
* try to query with diverse peers.
This reverts commit 9a54302.
TODO
@aschmahmann This is a first draft. Let me know what you think.