Skip to content

Commit

Permalink
Statement Distribution Per Peer Rate Limit (#3444)
Browse files Browse the repository at this point in the history
- [x] Drop requests from a PeerID that is already being served by us.
- [x] Don't sent requests to a PeerID if we already are requesting
something from them at that moment (prioritise other requests or wait).
- [x] Tests
- [ ] ~~Add a small rep update for unsolicited requests (same peer
request)~~ not included in original PR due to potential issues with
nodes slowly updating
- [x] Add a metric to track the amount of dropped requests due to peer
rate limiting
- [x] Add a metric to track how many time a node reaches the max
parallel requests limit in v2+

Helps with but does not close yet:
paritytech-secops/srlabs_findings#303
  • Loading branch information
Overkillus authored May 1, 2024
1 parent c973fe8 commit 6d392c7
Show file tree
Hide file tree
Showing 14 changed files with 539 additions and 48 deletions.
8 changes: 8 additions & 0 deletions .gitlab/pipeline/zombienet/polkadot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ zombienet-polkadot-elastic-scaling-0002-elastic-scaling-doesnt-break-parachains:
--local-dir="${LOCAL_DIR}/elastic_scaling"
--test="0002-elastic-scaling-doesnt-break-parachains.zndsl"

zombienet-polkadot-functional-0012-spam-statement-distribution-requests:
extends:
- .zombienet-polkadot-common
script:
- /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
--local-dir="${LOCAL_DIR}/functional"
--test="0012-spam-statement-distribution-requests.zndsl"

zombienet-polkadot-smoke-0001-parachains-smoke-test:
extends:
- .zombienet-polkadot-common
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions polkadot/node/malus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator"
polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" }
polkadot-node-core-backing = { path = "../core/backing" }
polkadot-node-primitives = { path = "../primitives" }
polkadot-node-network-protocol = { path = "../network/protocol" }
polkadot-primitives = { path = "../../primitives" }
color-eyre = { version = "0.6.1", default-features = false }
assert_matches = "1.5"
Expand Down
7 changes: 7 additions & 0 deletions polkadot/node/malus/src/malus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ enum NemesisVariant {
DisputeAncestor(DisputeAncestorOptions),
/// Delayed disputing of finalized candidates.
DisputeFinalizedCandidates(DisputeFinalizedCandidatesOptions),
/// Spam many request statements instead of sending a single one.
SpamStatementRequests(SpamStatementRequestsOptions),
}

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -98,6 +100,11 @@ impl MalusCli {
finality_delay,
)?
},
NemesisVariant::SpamStatementRequests(opts) => {
let SpamStatementRequestsOptions { spam_factor, cli } = opts;

polkadot_cli::run_node(cli, SpamStatementRequests { spam_factor }, finality_delay)?
},
}
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions polkadot/node/malus/src/variants/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ mod back_garbage_candidate;
mod common;
mod dispute_finalized_candidates;
mod dispute_valid_candidates;
mod spam_statement_requests;
mod suggest_garbage_candidate;
mod support_disabled;

pub(crate) use self::{
back_garbage_candidate::{BackGarbageCandidateOptions, BackGarbageCandidates},
dispute_finalized_candidates::{DisputeFinalizedCandidates, DisputeFinalizedCandidatesOptions},
dispute_valid_candidates::{DisputeAncestorOptions, DisputeValidCandidates},
spam_statement_requests::{SpamStatementRequests, SpamStatementRequestsOptions},
suggest_garbage_candidate::{SuggestGarbageCandidateOptions, SuggestGarbageCandidates},
support_disabled::{SupportDisabled, SupportDisabledOptions},
};
Expand Down
155 changes: 155 additions & 0 deletions polkadot/node/malus/src/variants/spam_statement_requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! A malicious node variant that attempts spam statement requests.
//!
//! This malus variant behaves honestly in everything except when propagating statement distribution
//! requests through the network bridge subsystem. Instead of sending a single request when it needs
//! something it attempts to spam the peer with multiple requests.
//!
//! Attention: For usage with `zombienet` only!
#![allow(missing_docs)]

use polkadot_cli::{
service::{
AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
OverseerGenArgs, OverseerHandle,
},
validator_overseer_builder, Cli,
};
use polkadot_node_network_protocol::request_response::{outgoing::Requests, OutgoingRequest};
use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, SpawnGlue};
use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
use sp_core::traits::SpawnNamed;

// Filter wrapping related types.
use crate::{interceptor::*, shared::MALUS};

use std::sync::Arc;

/// Wraps around network bridge and replaces it.
#[derive(Clone)]
struct RequestSpammer {
spam_factor: u32, // How many statement distribution requests to send.
}

impl<Sender> MessageInterceptor<Sender> for RequestSpammer
where
Sender: overseer::NetworkBridgeTxSenderTrait + Clone + Send + 'static,
{
type Message = NetworkBridgeTxMessage;

/// Intercept NetworkBridgeTxMessage::SendRequests with Requests::AttestedCandidateV2 inside and
/// duplicate that request
fn intercept_incoming(
&self,
_subsystem_sender: &mut Sender,
msg: FromOrchestra<Self::Message>,
) -> Option<FromOrchestra<Self::Message>> {
match msg {
FromOrchestra::Communication {
msg: NetworkBridgeTxMessage::SendRequests(requests, if_disconnected),
} => {
let mut new_requests = Vec::new();

for request in requests {
match request {
Requests::AttestedCandidateV2(ref req) => {
// Temporarily store peer and payload for duplication
let peer_to_duplicate = req.peer.clone();
let payload_to_duplicate = req.payload.clone();
// Push the original request
new_requests.push(request);

// Duplicate for spam purposes
gum::info!(
target: MALUS,
"😈 Duplicating AttestedCandidateV2 request extra {:?} times to peer: {:?}.", self.spam_factor, peer_to_duplicate,
);
new_requests.extend((0..self.spam_factor - 1).map(|_| {
let (new_outgoing_request, _) = OutgoingRequest::new(
peer_to_duplicate.clone(),
payload_to_duplicate.clone(),
);
Requests::AttestedCandidateV2(new_outgoing_request)
}));
},
_ => {
new_requests.push(request);
},
}
}

// Passthrough the message with a potentially modified number of requests
Some(FromOrchestra::Communication {
msg: NetworkBridgeTxMessage::SendRequests(new_requests, if_disconnected),
})
},
FromOrchestra::Communication { msg } => Some(FromOrchestra::Communication { msg }),
FromOrchestra::Signal(signal) => Some(FromOrchestra::Signal(signal)),
}
}
}

//----------------------------------------------------------------------------------

#[derive(Debug, clap::Parser)]
#[clap(rename_all = "kebab-case")]
#[allow(missing_docs)]
pub struct SpamStatementRequestsOptions {
/// How many statement distribution requests to send.
#[clap(long, ignore_case = true, default_value_t = 1000, value_parser = clap::value_parser!(u32).range(0..=10000000))]
pub spam_factor: u32,

#[clap(flatten)]
pub cli: Cli,
}

/// SpamStatementRequests implementation wrapper which implements `OverseerGen` glue.
pub(crate) struct SpamStatementRequests {
/// How many statement distribution requests to send.
pub spam_factor: u32,
}

impl OverseerGen for SpamStatementRequests {
fn generate<Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
ext_args: Option<ExtendedOverseerGenArgs>,
) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
where
RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
gum::info!(
target: MALUS,
"😈 Started Malus node that duplicates each statement distribution request spam_factor = {:?} times.",
&self.spam_factor,
);

let request_spammer = RequestSpammer { spam_factor: self.spam_factor };

validator_overseer_builder(
args,
ext_args.expect("Extended arguments required to build validator overseer are provided"),
)?
.replace_network_bridge_tx(move |cb| InterceptedSubsystem::new(cb, request_spammer))
.build_with_connector(connector)
.map_err(|e| e.into())
}
}
1 change: 1 addition & 0 deletions polkadot/node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
v2::respond_task(
self.req_receiver.take().expect("Mandatory argument to new. qed"),
res_sender.clone(),
self.metrics.clone(),
)
.boxed(),
)
Expand Down
40 changes: 38 additions & 2 deletions polkadot/node/network/statement-distribution/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@ const HISTOGRAM_LATENCY_BUCKETS: &[f64] = &[

#[derive(Clone)]
struct MetricsInner {
// V1
statements_distributed: prometheus::Counter<prometheus::U64>,
sent_requests: prometheus::Counter<prometheus::U64>,
received_responses: prometheus::CounterVec<prometheus::U64>,
active_leaves_update: prometheus::Histogram,
share: prometheus::Histogram,
network_bridge_update: prometheus::HistogramVec,
statements_unexpected: prometheus::CounterVec<prometheus::U64>,
created_message_size: prometheus::Gauge<prometheus::U64>,
// V1+
active_leaves_update: prometheus::Histogram,
share: prometheus::Histogram,
// V2+
peer_rate_limit_request_drop: prometheus::Counter<prometheus::U64>,
max_parallel_requests_reached: prometheus::Counter<prometheus::U64>,
}

/// Statement Distribution metrics.
Expand Down Expand Up @@ -114,6 +119,23 @@ impl Metrics {
metrics.created_message_size.set(size as u64);
}
}

/// Update sent dropped requests counter when request dropped because
/// of peer rate limit
pub fn on_request_dropped_peer_rate_limit(&self) {
if let Some(metrics) = &self.0 {
metrics.peer_rate_limit_request_drop.inc();
}
}

/// Update max parallel requests reached counter
/// This counter is updated when the maximum number of parallel requests is reached
/// and we are waiting for one of the requests to finish
pub fn on_max_parallel_requests_reached(&self) {
if let Some(metrics) = &self.0 {
metrics.max_parallel_requests_reached.inc();
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -193,6 +215,20 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
peer_rate_limit_request_drop: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_statement_distribution_peer_rate_limit_request_drop_total",
"Number of statement distribution requests dropped because of the peer rate limiting.",
)?,
registry,
)?,
max_parallel_requests_reached: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_statement_distribution_max_parallel_requests_reached_total",
"Number of times the maximum number of parallel requests was reached.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down
Loading

0 comments on commit 6d392c7

Please sign in to comment.