diff --git a/.gitlab/pipeline/zombienet/polkadot.yml b/.gitlab/pipeline/zombienet/polkadot.yml index 6b72075c513b..38c5332f3097 100644 --- a/.gitlab/pipeline/zombienet/polkadot.yml +++ b/.gitlab/pipeline/zombienet/polkadot.yml @@ -176,6 +176,14 @@ zombienet-polkadot-elastic-scaling-0002-elastic-scaling-doesnt-break-parachains: --local-dir="${LOCAL_DIR}/elastic_scaling" --test="0002-elastic-scaling-doesnt-break-parachains.zndsl" +zombienet-polkadot-functional-0012-spam-statement-distribution-requests: + extends: + - .zombienet-polkadot-common + script: + - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh + --local-dir="${LOCAL_DIR}/functional" + --test="0012-spam-statement-distribution-requests.zndsl" + zombienet-polkadot-smoke-0001-parachains-smoke-test: extends: - .zombienet-polkadot-common diff --git a/Cargo.lock b/Cargo.lock index 1fe4012070a0..c3bf3f26385b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14272,6 +14272,7 @@ dependencies = [ "polkadot-node-core-pvf-common", "polkadot-node-core-pvf-execute-worker", "polkadot-node-core-pvf-prepare-worker", + "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", diff --git a/polkadot/node/malus/Cargo.toml b/polkadot/node/malus/Cargo.toml index 2f63c2f0938d..750074fa9b3c 100644 --- a/polkadot/node/malus/Cargo.toml +++ b/polkadot/node/malus/Cargo.toml @@ -37,6 +37,7 @@ polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator" polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" } polkadot-node-core-backing = { path = "../core/backing" } polkadot-node-primitives = { path = "../primitives" } +polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-primitives = { path = "../../primitives" } color-eyre = { version = "0.6.1", default-features = false } assert_matches = "1.5" diff --git a/polkadot/node/malus/src/malus.rs b/polkadot/node/malus/src/malus.rs index 7a9e320e2736..6257201537c8 100644 --- a/polkadot/node/malus/src/malus.rs +++ b/polkadot/node/malus/src/malus.rs @@ -40,6 +40,8 @@ enum NemesisVariant { DisputeAncestor(DisputeAncestorOptions), /// Delayed disputing of finalized candidates. DisputeFinalizedCandidates(DisputeFinalizedCandidatesOptions), + /// Spam many request statements instead of sending a single one. + SpamStatementRequests(SpamStatementRequestsOptions), } #[derive(Debug, Parser)] @@ -98,6 +100,11 @@ impl MalusCli { finality_delay, )? }, + NemesisVariant::SpamStatementRequests(opts) => { + let SpamStatementRequestsOptions { spam_factor, cli } = opts; + + polkadot_cli::run_node(cli, SpamStatementRequests { spam_factor }, finality_delay)? + }, } Ok(()) } diff --git a/polkadot/node/malus/src/variants/mod.rs b/polkadot/node/malus/src/variants/mod.rs index 3ca1bf4b4696..ec945ae19457 100644 --- a/polkadot/node/malus/src/variants/mod.rs +++ b/polkadot/node/malus/src/variants/mod.rs @@ -20,6 +20,7 @@ mod back_garbage_candidate; mod common; mod dispute_finalized_candidates; mod dispute_valid_candidates; +mod spam_statement_requests; mod suggest_garbage_candidate; mod support_disabled; @@ -27,6 +28,7 @@ pub(crate) use self::{ back_garbage_candidate::{BackGarbageCandidateOptions, BackGarbageCandidates}, dispute_finalized_candidates::{DisputeFinalizedCandidates, DisputeFinalizedCandidatesOptions}, dispute_valid_candidates::{DisputeAncestorOptions, DisputeValidCandidates}, + spam_statement_requests::{SpamStatementRequests, SpamStatementRequestsOptions}, suggest_garbage_candidate::{SuggestGarbageCandidateOptions, SuggestGarbageCandidates}, support_disabled::{SupportDisabled, SupportDisabledOptions}, }; diff --git a/polkadot/node/malus/src/variants/spam_statement_requests.rs b/polkadot/node/malus/src/variants/spam_statement_requests.rs new file mode 100644 index 000000000000..c5970c988ac2 --- /dev/null +++ b/polkadot/node/malus/src/variants/spam_statement_requests.rs @@ -0,0 +1,155 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! A malicious node variant that attempts spam statement requests. +//! +//! This malus variant behaves honestly in everything except when propagating statement distribution +//! requests through the network bridge subsystem. Instead of sending a single request when it needs +//! something it attempts to spam the peer with multiple requests. +//! +//! Attention: For usage with `zombienet` only! + +#![allow(missing_docs)] + +use polkadot_cli::{ + service::{ + AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen, + OverseerGenArgs, OverseerHandle, + }, + validator_overseer_builder, Cli, +}; +use polkadot_node_network_protocol::request_response::{outgoing::Requests, OutgoingRequest}; +use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, SpawnGlue}; +use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient}; +use sp_core::traits::SpawnNamed; + +// Filter wrapping related types. +use crate::{interceptor::*, shared::MALUS}; + +use std::sync::Arc; + +/// Wraps around network bridge and replaces it. +#[derive(Clone)] +struct RequestSpammer { + spam_factor: u32, // How many statement distribution requests to send. +} + +impl MessageInterceptor for RequestSpammer +where + Sender: overseer::NetworkBridgeTxSenderTrait + Clone + Send + 'static, +{ + type Message = NetworkBridgeTxMessage; + + /// Intercept NetworkBridgeTxMessage::SendRequests with Requests::AttestedCandidateV2 inside and + /// duplicate that request + fn intercept_incoming( + &self, + _subsystem_sender: &mut Sender, + msg: FromOrchestra, + ) -> Option> { + match msg { + FromOrchestra::Communication { + msg: NetworkBridgeTxMessage::SendRequests(requests, if_disconnected), + } => { + let mut new_requests = Vec::new(); + + for request in requests { + match request { + Requests::AttestedCandidateV2(ref req) => { + // Temporarily store peer and payload for duplication + let peer_to_duplicate = req.peer.clone(); + let payload_to_duplicate = req.payload.clone(); + // Push the original request + new_requests.push(request); + + // Duplicate for spam purposes + gum::info!( + target: MALUS, + "😈 Duplicating AttestedCandidateV2 request extra {:?} times to peer: {:?}.", self.spam_factor, peer_to_duplicate, + ); + new_requests.extend((0..self.spam_factor - 1).map(|_| { + let (new_outgoing_request, _) = OutgoingRequest::new( + peer_to_duplicate.clone(), + payload_to_duplicate.clone(), + ); + Requests::AttestedCandidateV2(new_outgoing_request) + })); + }, + _ => { + new_requests.push(request); + }, + } + } + + // Passthrough the message with a potentially modified number of requests + Some(FromOrchestra::Communication { + msg: NetworkBridgeTxMessage::SendRequests(new_requests, if_disconnected), + }) + }, + FromOrchestra::Communication { msg } => Some(FromOrchestra::Communication { msg }), + FromOrchestra::Signal(signal) => Some(FromOrchestra::Signal(signal)), + } + } +} + +//---------------------------------------------------------------------------------- + +#[derive(Debug, clap::Parser)] +#[clap(rename_all = "kebab-case")] +#[allow(missing_docs)] +pub struct SpamStatementRequestsOptions { + /// How many statement distribution requests to send. + #[clap(long, ignore_case = true, default_value_t = 1000, value_parser = clap::value_parser!(u32).range(0..=10000000))] + pub spam_factor: u32, + + #[clap(flatten)] + pub cli: Cli, +} + +/// SpamStatementRequests implementation wrapper which implements `OverseerGen` glue. +pub(crate) struct SpamStatementRequests { + /// How many statement distribution requests to send. + pub spam_factor: u32, +} + +impl OverseerGen for SpamStatementRequests { + fn generate( + &self, + connector: OverseerConnector, + args: OverseerGenArgs<'_, Spawner, RuntimeClient>, + ext_args: Option, + ) -> Result<(Overseer, Arc>, OverseerHandle), Error> + where + RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static, + Spawner: 'static + SpawnNamed + Clone + Unpin, + { + gum::info!( + target: MALUS, + "😈 Started Malus node that duplicates each statement distribution request spam_factor = {:?} times.", + &self.spam_factor, + ); + + let request_spammer = RequestSpammer { spam_factor: self.spam_factor }; + + validator_overseer_builder( + args, + ext_args.expect("Extended arguments required to build validator overseer are provided"), + )? + .replace_network_bridge_tx(move |cb| InterceptedSubsystem::new(cb, request_spammer)) + .build_with_connector(connector) + .map_err(|e| e.into()) + } +} diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 7e91d2849120..4ca199c3378b 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -207,6 +207,7 @@ impl StatementDistributionSubsystem { v2::respond_task( self.req_receiver.take().expect("Mandatory argument to new. qed"), res_sender.clone(), + self.metrics.clone(), ) .boxed(), ) diff --git a/polkadot/node/network/statement-distribution/src/metrics.rs b/polkadot/node/network/statement-distribution/src/metrics.rs index b9a51dc89d61..1bc994174263 100644 --- a/polkadot/node/network/statement-distribution/src/metrics.rs +++ b/polkadot/node/network/statement-distribution/src/metrics.rs @@ -24,14 +24,19 @@ const HISTOGRAM_LATENCY_BUCKETS: &[f64] = &[ #[derive(Clone)] struct MetricsInner { + // V1 statements_distributed: prometheus::Counter, sent_requests: prometheus::Counter, received_responses: prometheus::CounterVec, - active_leaves_update: prometheus::Histogram, - share: prometheus::Histogram, network_bridge_update: prometheus::HistogramVec, statements_unexpected: prometheus::CounterVec, created_message_size: prometheus::Gauge, + // V1+ + active_leaves_update: prometheus::Histogram, + share: prometheus::Histogram, + // V2+ + peer_rate_limit_request_drop: prometheus::Counter, + max_parallel_requests_reached: prometheus::Counter, } /// Statement Distribution metrics. @@ -114,6 +119,23 @@ impl Metrics { metrics.created_message_size.set(size as u64); } } + + /// Update sent dropped requests counter when request dropped because + /// of peer rate limit + pub fn on_request_dropped_peer_rate_limit(&self) { + if let Some(metrics) = &self.0 { + metrics.peer_rate_limit_request_drop.inc(); + } + } + + /// Update max parallel requests reached counter + /// This counter is updated when the maximum number of parallel requests is reached + /// and we are waiting for one of the requests to finish + pub fn on_max_parallel_requests_reached(&self) { + if let Some(metrics) = &self.0 { + metrics.max_parallel_requests_reached.inc(); + } + } } impl metrics::Metrics for Metrics { @@ -193,6 +215,20 @@ impl metrics::Metrics for Metrics { ))?, registry, )?, + peer_rate_limit_request_drop: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_statement_distribution_peer_rate_limit_request_drop_total", + "Number of statement distribution requests dropped because of the peer rate limiting.", + )?, + registry, + )?, + max_parallel_requests_reached: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_statement_distribution_max_parallel_requests_reached_total", + "Number of times the maximum number of parallel requests was reached.", + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index 118e34e92063..8579ac15cbc1 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -59,6 +59,8 @@ use sp_keystore::KeystorePtr; use fatality::Nested; use futures::{ channel::{mpsc, oneshot}, + future::FutureExt, + select, stream::FuturesUnordered, SinkExt, StreamExt, }; @@ -73,6 +75,7 @@ use std::{ use crate::{ error::{JfyiError, JfyiErrorResult}, + metrics::Metrics, LOG_TARGET, }; use candidates::{BadAdvertisement, Candidates, PostConfirmation}; @@ -3423,35 +3426,61 @@ pub(crate) struct ResponderMessage { pub(crate) async fn respond_task( mut receiver: IncomingRequestReceiver, mut sender: mpsc::Sender, + metrics: Metrics, ) { let mut pending_out = FuturesUnordered::new(); + let mut active_peers = HashSet::new(); + loop { - // Ensure we are not handling too many requests in parallel. - if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize { - // Wait for one to finish: - pending_out.next().await; - } + select! { + // New request + request_result = receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse() => { + let request = match request_result.into_nested() { + Ok(Ok(v)) => v, + Err(fatal) => { + gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder"); + return + }, + Ok(Err(jfyi)) => { + gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed"); + continue + }, + }; - let req = match receiver.recv(|| vec![COST_INVALID_REQUEST]).await.into_nested() { - Ok(Ok(v)) => v, - Err(fatal) => { - gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder"); - return + // If peer currently being served drop request + if active_peers.contains(&request.peer) { + gum::trace!(target: LOG_TARGET, "Peer already being served, dropping request"); + metrics.on_request_dropped_peer_rate_limit(); + continue + } + + // If we are over parallel limit wait for one to finish + if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize { + gum::trace!(target: LOG_TARGET, "Over max parallel requests, waiting for one to finish"); + metrics.on_max_parallel_requests_reached(); + let (_, peer) = pending_out.select_next_some().await; + active_peers.remove(&peer); + } + + // Start serving the request + let (pending_sent_tx, pending_sent_rx) = oneshot::channel(); + let peer = request.peer; + if let Err(err) = sender + .feed(ResponderMessage { request, sent_feedback: pending_sent_tx }) + .await + { + gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder"); + return + } + let future_with_peer = pending_sent_rx.map(move |result| (result, peer)); + pending_out.push(future_with_peer); + active_peers.insert(peer); }, - Ok(Err(jfyi)) => { - gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed"); - continue + // Request served/finished + result = pending_out.select_next_some() => { + let (_, peer) = result; + active_peers.remove(&peer); }, - }; - - let (pending_sent_tx, pending_sent_rx) = oneshot::channel(); - if let Err(err) = sender - .feed(ResponderMessage { request: req, sent_feedback: pending_sent_tx }) - .await - { - gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder"); - return } - pending_out.push(pending_sent_rx); } } diff --git a/polkadot/node/network/statement-distribution/src/v2/requests.rs b/polkadot/node/network/statement-distribution/src/v2/requests.rs index 1ed18ffd42a9..b8ed34d26c8a 100644 --- a/polkadot/node/network/statement-distribution/src/v2/requests.rs +++ b/polkadot/node/network/statement-distribution/src/v2/requests.rs @@ -366,6 +366,7 @@ impl RequestManager { id, &props, &peer_advertised, + &response_manager, ) { None => continue, Some(t) => t, @@ -387,14 +388,17 @@ impl RequestManager { ); let stored_id = id.clone(); - response_manager.push(Box::pin(async move { - TaggedResponse { - identifier: stored_id, - requested_peer: target, - props, - response: response_fut.await, - } - })); + response_manager.push( + Box::pin(async move { + TaggedResponse { + identifier: stored_id, + requested_peer: target, + props, + response: response_fut.await, + } + }), + target, + ); entry.in_flight = true; @@ -422,28 +426,35 @@ impl RequestManager { /// A manager for pending responses. pub struct ResponseManager { pending_responses: FuturesUnordered>, + active_peers: HashSet, } impl ResponseManager { pub fn new() -> Self { - Self { pending_responses: FuturesUnordered::new() } + Self { pending_responses: FuturesUnordered::new(), active_peers: HashSet::new() } } /// Await the next incoming response to a sent request, or immediately /// return `None` if there are no pending responses. pub async fn incoming(&mut self) -> Option { - self.pending_responses - .next() - .await - .map(|response| UnhandledResponse { response }) + self.pending_responses.next().await.map(|response| { + self.active_peers.remove(&response.requested_peer); + UnhandledResponse { response } + }) } fn len(&self) -> usize { self.pending_responses.len() } - fn push(&mut self, response: BoxFuture<'static, TaggedResponse>) { + fn push(&mut self, response: BoxFuture<'static, TaggedResponse>, target: PeerId) { self.pending_responses.push(response); + self.active_peers.insert(target); + } + + /// Returns true if we are currently sending a request to the peer. + fn is_sending_to(&self, peer: &PeerId) -> bool { + self.active_peers.contains(peer) } } @@ -471,10 +482,16 @@ fn find_request_target_with_update( candidate_identifier: &CandidateIdentifier, props: &RequestProperties, peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option, + response_manager: &ResponseManager, ) -> Option { let mut prune = Vec::new(); let mut target = None; for (i, p) in known_by.iter().enumerate() { + // If we are already sending to that peer, skip for now + if response_manager.is_sending_to(p) { + continue + } + let mut filter = match peer_advertised(candidate_identifier, p) { None => { prune.push(i); @@ -1002,7 +1019,8 @@ mod tests { candidate_receipt.descriptor.persisted_validation_data_hash = persisted_validation_data.hash(); let candidate = candidate_receipt.hash(); - let requested_peer = PeerId::random(); + let requested_peer_1 = PeerId::random(); + let requested_peer_2 = PeerId::random(); let identifier1 = request_manager .get_or_insert(relay_parent, candidate, 1.into()) @@ -1010,14 +1028,14 @@ mod tests { .clone(); request_manager .get_or_insert(relay_parent, candidate, 1.into()) - .add_peer(requested_peer); + .add_peer(requested_peer_1); let identifier2 = request_manager .get_or_insert(relay_parent, candidate, 2.into()) .identifier .clone(); request_manager .get_or_insert(relay_parent, candidate, 2.into()) - .add_peer(requested_peer); + .add_peer(requested_peer_2); assert_ne!(identifier1, identifier2); assert_eq!(request_manager.requests.len(), 2); @@ -1053,7 +1071,7 @@ mod tests { let response = UnhandledResponse { response: TaggedResponse { identifier: identifier1, - requested_peer, + requested_peer: requested_peer_1, props: request_properties.clone(), response: Ok(AttestedCandidateResponse { candidate_receipt: candidate_receipt.clone(), @@ -1076,13 +1094,13 @@ mod tests { assert_eq!( output, ResponseValidationOutput { - requested_peer, + requested_peer: requested_peer_1, request_status: CandidateRequestStatus::Complete { candidate: candidate_receipt.clone(), persisted_validation_data: persisted_validation_data.clone(), statements, }, - reputation_changes: vec![(requested_peer, BENEFIT_VALID_RESPONSE)], + reputation_changes: vec![(requested_peer_1, BENEFIT_VALID_RESPONSE)], } ); } @@ -1093,7 +1111,7 @@ mod tests { let response = UnhandledResponse { response: TaggedResponse { identifier: identifier2, - requested_peer, + requested_peer: requested_peer_2, props: request_properties, response: Ok(AttestedCandidateResponse { candidate_receipt: candidate_receipt.clone(), @@ -1115,12 +1133,14 @@ mod tests { assert_eq!( output, ResponseValidationOutput { - requested_peer, + requested_peer: requested_peer_2, request_status: CandidateRequestStatus::Outdated, reputation_changes: vec![], } ); } + + assert_eq!(request_manager.requests.len(), 0); } // Test case where we had a request in-flight and the request entry was garbage-collected on @@ -1293,4 +1313,140 @@ mod tests { assert_eq!(request_manager.requests.len(), 0); assert_eq!(request_manager.by_priority.len(), 0); } + + // Test case where we queue 2 requests to be sent to the same peer and 1 request to another + // peer. Same peer requests should be served one at a time but they should not block the other + // peer request. + #[test] + fn rate_limit_requests_to_same_peer() { + let mut request_manager = RequestManager::new(); + let mut response_manager = ResponseManager::new(); + + let relay_parent = Hash::from_low_u64_le(1); + + // Create 3 candidates + let mut candidate_receipt_1 = test_helpers::dummy_committed_candidate_receipt(relay_parent); + let persisted_validation_data_1 = dummy_pvd(); + candidate_receipt_1.descriptor.persisted_validation_data_hash = + persisted_validation_data_1.hash(); + let candidate_1 = candidate_receipt_1.hash(); + + let mut candidate_receipt_2 = test_helpers::dummy_committed_candidate_receipt(relay_parent); + let persisted_validation_data_2 = dummy_pvd(); + candidate_receipt_2.descriptor.persisted_validation_data_hash = + persisted_validation_data_2.hash(); + let candidate_2 = candidate_receipt_2.hash(); + + let mut candidate_receipt_3 = test_helpers::dummy_committed_candidate_receipt(relay_parent); + let persisted_validation_data_3 = dummy_pvd(); + candidate_receipt_3.descriptor.persisted_validation_data_hash = + persisted_validation_data_3.hash(); + let candidate_3 = candidate_receipt_3.hash(); + + // Create 2 peers + let requested_peer_1 = PeerId::random(); + let requested_peer_2 = PeerId::random(); + + let group_size = 3; + let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)]; + let unwanted_mask = StatementFilter::blank(group_size); + let disabled_mask: BitVec = Default::default(); + let request_properties = RequestProperties { unwanted_mask, backing_threshold: None }; + let request_props = |_identifier: &CandidateIdentifier| Some((&request_properties).clone()); + let peer_advertised = + |_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size)); + + // Add request for candidate 1 from peer 1 + let identifier1 = request_manager + .get_or_insert(relay_parent, candidate_1, 1.into()) + .identifier + .clone(); + request_manager + .get_or_insert(relay_parent, candidate_1, 1.into()) + .add_peer(requested_peer_1); + + // Add request for candidate 3 from peer 2 (this one can be served in parallel) + let _identifier3 = request_manager + .get_or_insert(relay_parent, candidate_3, 1.into()) + .identifier + .clone(); + request_manager + .get_or_insert(relay_parent, candidate_3, 1.into()) + .add_peer(requested_peer_2); + + // Successfully dispatch request for candidate 1 from peer 1 and candidate 3 from peer 2 + for _ in 0..2 { + let outgoing = + request_manager.next_request(&mut response_manager, request_props, peer_advertised); + assert!(outgoing.is_some()); + } + assert_eq!(response_manager.active_peers.len(), 2); + assert!(response_manager.is_sending_to(&requested_peer_1)); + assert!(response_manager.is_sending_to(&requested_peer_2)); + assert_eq!(request_manager.requests.len(), 2); + + // Add request for candidate 2 from peer 1 + let _identifier2 = request_manager + .get_or_insert(relay_parent, candidate_2, 1.into()) + .identifier + .clone(); + request_manager + .get_or_insert(relay_parent, candidate_2, 1.into()) + .add_peer(requested_peer_1); + + // Do not dispatch the request for the second candidate from peer 1 (already serving that + // peer) + let outgoing = + request_manager.next_request(&mut response_manager, request_props, peer_advertised); + assert!(outgoing.is_none()); + assert_eq!(response_manager.active_peers.len(), 2); + assert!(response_manager.is_sending_to(&requested_peer_1)); + assert!(response_manager.is_sending_to(&requested_peer_2)); + assert_eq!(request_manager.requests.len(), 3); + + // Manually mark response received (response future resolved) + response_manager.active_peers.remove(&requested_peer_1); + response_manager.pending_responses = FuturesUnordered::new(); + + // Validate first response (candidate 1 from peer 1) + { + let statements = vec![]; + let response = UnhandledResponse { + response: TaggedResponse { + identifier: identifier1, + requested_peer: requested_peer_1, + props: request_properties.clone(), + response: Ok(AttestedCandidateResponse { + candidate_receipt: candidate_receipt_1.clone(), + persisted_validation_data: persisted_validation_data_1.clone(), + statements, + }), + }, + }; + let validator_key_lookup = |_v| None; + let allowed_para_lookup = |_para, _g_index| true; + let _output = response.validate_response( + &mut request_manager, + group, + 0, + validator_key_lookup, + allowed_para_lookup, + disabled_mask.clone(), + ); + + // First request served successfully + assert_eq!(request_manager.requests.len(), 2); + assert_eq!(response_manager.active_peers.len(), 1); + assert!(response_manager.is_sending_to(&requested_peer_2)); + } + + // Check if the request that was ignored previously will be served now + let outgoing = + request_manager.next_request(&mut response_manager, request_props, peer_advertised); + assert!(outgoing.is_some()); + assert_eq!(response_manager.active_peers.len(), 2); + assert!(response_manager.is_sending_to(&requested_peer_1)); + assert!(response_manager.is_sending_to(&requested_peer_2)); + assert_eq!(request_manager.requests.len(), 2); + } } diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs index 8cf139802148..c9de42d2c468 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs @@ -1891,7 +1891,7 @@ fn local_node_sanity_checks_incoming_requests() { let mask = StatementFilter::blank(state.config.group_size + 1); let response = state .send_request( - peer_c, + peer_a, request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask }, ) .await diff --git a/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.toml b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.toml new file mode 100644 index 000000000000..14208425d62b --- /dev/null +++ b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.toml @@ -0,0 +1,43 @@ +[settings] +timeout = 1000 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config] + needed_approvals = 2 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params] + max_validators_per_core = 5 + +[relaychain] +default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}" +chain = "rococo-local" +default_command = "polkadot" + +[relaychain.default_resources] +limits = { memory = "4G", cpu = "2" } +requests = { memory = "2G", cpu = "1" } + + [[relaychain.node_groups]] + name = "honest" + count = 4 + args = ["-lparachain=debug,parachain::statement-distribution=trace"] + + [[relaychain.nodes]] + image = "{{MALUS_IMAGE}}" + name = "malus" + command = "malus spam-statement-requests" + args = [ "--alice", "-lparachain=debug,MALUS=trace", "--spam-factor=1000" ] + +{% for id in range(2000,2001) %} +[[parachains]] +id = {{id}} + [parachains.collator] + image = "{{COL_IMAGE}}" + name = "collator" + command = "undying-collator" + args = ["-lparachain=debug"] +{% endfor %} + +[types.Header] +number = "u64" +parent_hash = "Hash" +post_state = "Hash" diff --git a/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.zndsl b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.zndsl new file mode 100644 index 000000000000..9985dd24ee38 --- /dev/null +++ b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.zndsl @@ -0,0 +1,27 @@ +Description: Test if parachains progress when group is getting spammed by statement distribution requests. +Network: ./0012-spam-statement-distribution-requests.toml +Creds: config + +# Check authority status and peers. +malus: reports node_roles is 4 +honest: reports node_roles is 4 + +# Ensure parachains are registered. +honest: parachain 2000 is registered within 60 seconds + +# Ensure that malus is already attempting to DoS +malus: log line contains "😈 Duplicating AttestedCandidateV2 request" within 90 seconds + +# Ensure parachains made progress. +honest: parachain 2000 block height is at least 10 within 200 seconds + +# Ensure that honest nodes drop extra requests +honest: log line contains "Peer already being served, dropping request" within 60 seconds + +# Check lag - approval +honest: reports polkadot_parachain_approval_checking_finality_lag is 0 + +# Check lag - dispute conclusion +honest: reports polkadot_parachain_disputes_finality_lag is 0 + + diff --git a/prdoc/pr_3444.prdoc b/prdoc/pr_3444.prdoc new file mode 100644 index 000000000000..3afb38106417 --- /dev/null +++ b/prdoc/pr_3444.prdoc @@ -0,0 +1,25 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Rate-limiting of statement distribution v2 requests to 1 per peer + +doc: + - audience: Node Dev + description: | + A new malicious node variant that sends duplicate statement + distribution messages to spam other peers. + + - audience: Node Operator + description: | + Added rate-limiting in the statement distribution request-response + protocol. Requesters will not issue another request to a peer if one + is already pending with that peer and receiving nodes will reject + requests from peers that they are currently serving. + This should reduce the risk of validator-validator DoS attacks and + better load-balance statement distribution. + +crates: + - name: polkadot-test-malus + bump: minor + - name: polkadot-statement-distribution + bump: minor