Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Statement Distribution Per Peer Rate Limit #3444

Merged
merged 31 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1b1ba25
minimal scaffolding
Overkillus Feb 22, 2024
3345aa6
respond task select overhaul
Overkillus Feb 22, 2024
1734c09
test adjustment
Overkillus Feb 23, 2024
cb1d210
fmt
Overkillus Feb 23, 2024
58d828d
Rate limit sending from the requester side
Overkillus Feb 26, 2024
b5e7e06
sender side rate limiter test
Overkillus Mar 20, 2024
456a466
spammer malus variant
Overkillus Mar 21, 2024
975e3bd
Duplicate requests
Overkillus Apr 2, 2024
7056159
Filtering test
Overkillus Apr 2, 2024
3f234e1
fmt
Overkillus Apr 2, 2024
7e817fa
prdoc
Overkillus Apr 2, 2024
913b234
pipeline
Overkillus Apr 2, 2024
e812dcc
increment test
Overkillus Apr 2, 2024
c01be29
Merge branch 'master' into mkz-statement-distribution-rate-limit
Overkillus Apr 2, 2024
af234ef
crate bump
Overkillus Apr 2, 2024
fdaaf4a
clippy nits
Overkillus Apr 2, 2024
e988c60
zombienet test simplification
Overkillus Apr 19, 2024
83cf297
review fixes
Overkillus Apr 19, 2024
37fd618
debug -> trace
Overkillus Apr 30, 2024
adf84a2
cleanup
Overkillus Apr 30, 2024
f0a40d5
metric registered
Overkillus Apr 30, 2024
6099498
reverting paste mistake
Overkillus Apr 30, 2024
07d44ee
registering max parallel requests metric
Overkillus Apr 30, 2024
09e7367
updating metrics in statement distribution respond task
Overkillus Apr 30, 2024
d1f50c5
fmt
Overkillus Apr 30, 2024
e9680d1
Merge branch 'master' into mkz-statement-distribution-rate-limit
Overkillus May 1, 2024
678b141
param typo
Overkillus May 1, 2024
7ff1ddd
Bump test numbering
Overkillus May 1, 2024
f6e8def
remove unused malus param
Overkillus May 1, 2024
8e79dc4
tracing
Overkillus May 1, 2024
a38762b
fmt
Overkillus May 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 48 additions & 24 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ use futures::{
channel::{mpsc, oneshot},
stream::FuturesUnordered,
SinkExt, StreamExt,
select,
future::FutureExt,
};

use std::{
Expand Down Expand Up @@ -3335,33 +3337,55 @@ pub(crate) async fn respond_task(
mut sender: mpsc::Sender<ResponderMessage>,
) {
let mut pending_out = FuturesUnordered::new();
let mut active_peers = HashSet::new();

loop {
// Ensure we are not handling too many requests in parallel.
if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
// Wait for one to finish:
pending_out.next().await;
}
select! {
// New request
request_result = receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse() => {
let request = match request_result.into_nested() {
Ok(Ok(v)) => v,
Err(fatal) => {
gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder");
return
},
Ok(Err(jfyi)) => {
gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed");
continue
},
};

let req = match receiver.recv(|| vec![COST_INVALID_REQUEST]).await.into_nested() {
Ok(Ok(v)) => v,
Err(fatal) => {
gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder");
return
// If peer currently being served drop request
if active_peers.contains(&request.peer) {
alexggh marked this conversation as resolved.
Show resolved Hide resolved
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be handled in the context of a candidate/relay parent ? Otherwise we would drop legit requests.

Copy link
Contributor Author

@Overkillus Overkillus Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely a fair point. Rob mentioned it https://github.com/paritytech-secops/srlabs_findings/issues/303#issuecomment-1588185410 as well.

It's a double-edged sword. If we make it 1 per context then the request/response protocol is more efficient, but with async backing enabled (which to my understanding broadens the scope of possible contexts) the mpsc channel (size 25-ish) can still be easily overwhelmed. The problem of dropped requests gets somewhat a bit better after the recent commit [Rate limit sending from the requester side](https://github.com/paritytech/polkadot-sdk/pull/3444/commits/58d828db45b7c023e8a8266d3180be4361723519) which also introduces the rate limit on the requester side so effort is not wasted.

If we are already requesting from a peer and want to request something else as well we simply wait for the first one to finish.

What could be argued is the limit as a small constant 1-3 instead of 1 per peerID. I would like to do some testing and more reviews and changing it to a configurable constant instead of a hard limit to 1 is actually a trivial change. (I don't think it's necessary. Better have a single completely sent candidate than two half sent candidates.)

Truth be told this whole solution is far from ideal. It's a fix but the final solution is to manage DoS at a significantly lower level than in parachain consensus. It will do for now but DoS needs to be looked on in more detail in near future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely a fair point. Rob mentioned it paritytech-secops/srlabs_findings#303 (comment) as well.

It's a double-edged sword. If we make it 1 per context then the request/response protocol is more efficient, but with async backing enabled (which to my understanding broadens the scope of possible contexts) the mpsc channel (size 25-ish) can still be easily overwhelmed. The problem of dropped requests gets somewhat a bit better after the recent commit [Rate limit sending from the requester side](https://github.com/paritytech/polkadot-sdk/pull/3444/commits/58d828db45b7c023e8a8266d3180be4361723519) which also introduces the rate limit on the requester side so effort is not wasted.

If we are already requesting from a peer and want to request something else as well we simply wait for the first one to finish.

I would rather put the constraint on the responder side rather than the requester. The reasoning is to avoid any bugs in other clients implementation due to the subtle requirement to wait for first request to finish.

What could be argued is the limit as a small constant 1-3 instead of 1 per peerID. I would like to do some testing and more reviews and changing it to a configurable constant instead of a hard limit to 1 is actually a trivial change. (I don't think it's necessary. Better have a single completely sent candidate than two half sent candidates.)

I totally agree with DoS protection as low as possible in the stack, but it seems to be sensible. Maybe instead of relying on a hardcoded constant we can derive the value based on the async backing parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather put the constraint on the responder side rather than the requester. The reasoning is to avoid any bugs in other clients implementation due to the subtle requirement to wait for first request to finish.

I agree that constraint on the responder side is more important, but having it in both places seems even better.

The reasoning is to avoid any bugs in other clients implementation due to the subtle requirement to wait for first request to finish.

Even without this PR the risk seems exactly the same. We already were checking for the number of parallel requests being handled in the requester so even if there would be bugs in other clients they could surface there with or without this PR.

The only really bad scenario would be if for some reason we keep a PeerID marked as still being active when it really isn't effectively blacklisting that peer. If that would happen it means that the future connected to that PeerID is still in the pending_responses which would eventually brick the requester anyway since we cannotgo over the max limit of MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS. Both situations are just as fatal, but the new system at least has extra protections against DoS attempts so it's a security gain.

By adding the constraint on the requester side we limit the wasted effort and can potentially safely add reputation updates if we get those unsolicited requests which protects us from DoS even further. (Some honest requests might slip through so the rep update needs to be small.)

Maybe instead of relying on a hardcoded constant we can derive the value based on the async backing parameters.

We potentially could but also I don't why this value would need to change a lot. Even if we change some async backing params we should be fine. It seems to be more sensitive to channel size as we want to make it hard to dominate that queue as a malicious node and the MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS.

Copy link

@burdges burdges Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need not choose a uniform distribution over recent relay parents either. Instead the full value on the first couple, then 1/2 and 1/4 for a few, and then 1 one for a while. You need to figure out if the selected distribution breaks collators, but maybe works.

}

// If we are over parallel limit wait for one to finish
if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
let result = pending_out.select_next_some().await;
let (_, peer) = result;
Overkillus marked this conversation as resolved.
Show resolved Hide resolved
active_peers.remove(&peer);
}

// Start serving the request
let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
let peer = request.peer.clone();
if let Err(err) = sender
.feed(ResponderMessage { request: request, sent_feedback: pending_sent_tx })
Overkillus marked this conversation as resolved.
Show resolved Hide resolved
.await
{
gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder");
return
}
let future_with_peer = pending_sent_rx.map(move |result| (result, peer));
pending_out.push(future_with_peer);
active_peers.insert(peer);
},
Ok(Err(jfyi)) => {
gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed");
continue
// Request served/finished
result = pending_out.select_next_some() => {
let (_, peer) = result;
active_peers.remove(&peer);
},
};

let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
if let Err(err) = sender
.feed(ResponderMessage { request: req, sent_feedback: pending_sent_tx })
.await
{
gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder");
return
}
pending_out.push(pending_sent_rx);
}
}
}
51 changes: 37 additions & 14 deletions polkadot/node/network/statement-distribution/src/v2/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ impl RequestManager {
id,
&props,
&peer_advertised,
&response_manager,
) {
None => continue,
Some(t) => t,
Expand All @@ -378,14 +379,17 @@ impl RequestManager {
);

let stored_id = id.clone();
response_manager.push(Box::pin(async move {
TaggedResponse {
identifier: stored_id,
requested_peer: target,
props,
response: response_fut.await,
}
}));
response_manager.push(
Box::pin(async move {
TaggedResponse {
identifier: stored_id,
requested_peer: target,
props,
response: response_fut.await,
}
}),
target,
);

entry.in_flight = true;

Expand Down Expand Up @@ -413,28 +417,40 @@ impl RequestManager {
/// A manager for pending responses.
pub struct ResponseManager {
pending_responses: FuturesUnordered<BoxFuture<'static, TaggedResponse>>,
active_peers: HashSet<PeerId>,
}

impl ResponseManager {
pub fn new() -> Self {
Self { pending_responses: FuturesUnordered::new() }
Self {
pending_responses: FuturesUnordered::new(),
active_peers: HashSet::new(),
}
}

/// Await the next incoming response to a sent request, or immediately
/// return `None` if there are no pending responses.
pub async fn incoming(&mut self) -> Option<UnhandledResponse> {
self.pending_responses
.next()
.await
.map(|response| UnhandledResponse { response })
if let Some(response) = self.pending_responses.next().await {
// Upon receiving a response, remove the peer from the active set.
self.active_peers.remove(&response.requested_peer);
return Some(UnhandledResponse { response });
}
None
Overkillus marked this conversation as resolved.
Show resolved Hide resolved
}

fn len(&self) -> usize {
self.pending_responses.len()
}

fn push(&mut self, response: BoxFuture<'static, TaggedResponse>) {
fn push(&mut self, response: BoxFuture<'static, TaggedResponse>, target: PeerId) {
self.pending_responses.push(response);
self.active_peers.insert(target);
}

/// Returns true if we are currently sending a request to the peer.
fn is_sending_to(&self, peer: &PeerId) -> bool {
self.active_peers.contains(peer)
}
}

Expand Down Expand Up @@ -462,10 +478,17 @@ fn find_request_target_with_update(
candidate_identifier: &CandidateIdentifier,
props: &RequestProperties,
peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option<StatementFilter>,
response_manager: &ResponseManager,
) -> Option<PeerId> {
let mut prune = Vec::new();
let mut target = None;
for (i, p) in known_by.iter().enumerate() {

// If we are already sending to that peer, skip for now
if response_manager.is_sending_to(p) {
alexggh marked this conversation as resolved.
Show resolved Hide resolved
continue
}

let mut filter = match peer_advertised(candidate_identifier, p) {
None => {
prune.push(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1891,7 +1891,7 @@ fn local_node_sanity_checks_incoming_requests() {
let mask = StatementFilter::blank(state.config.group_size + 1);
let response = state
.send_request(
peer_c,
peer_a,
request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask },
)
.await
Expand Down
Loading