diff --git a/.gitlab/pipeline/zombienet/polkadot.yml b/.gitlab/pipeline/zombienet/polkadot.yml
index 6b72075c513b..38c5332f3097 100644
--- a/.gitlab/pipeline/zombienet/polkadot.yml
+++ b/.gitlab/pipeline/zombienet/polkadot.yml
@@ -176,6 +176,14 @@ zombienet-polkadot-elastic-scaling-0002-elastic-scaling-doesnt-break-parachains:
--local-dir="${LOCAL_DIR}/elastic_scaling"
--test="0002-elastic-scaling-doesnt-break-parachains.zndsl"
+zombienet-polkadot-functional-0012-spam-statement-distribution-requests:
+ extends:
+ - .zombienet-polkadot-common
+ script:
+ - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
+ --local-dir="${LOCAL_DIR}/functional"
+ --test="0012-spam-statement-distribution-requests.zndsl"
+
zombienet-polkadot-smoke-0001-parachains-smoke-test:
extends:
- .zombienet-polkadot-common
diff --git a/Cargo.lock b/Cargo.lock
index 1fe4012070a0..c3bf3f26385b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -14272,6 +14272,7 @@ dependencies = [
"polkadot-node-core-pvf-common",
"polkadot-node-core-pvf-execute-worker",
"polkadot-node-core-pvf-prepare-worker",
+ "polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
diff --git a/polkadot/node/malus/Cargo.toml b/polkadot/node/malus/Cargo.toml
index 2f63c2f0938d..750074fa9b3c 100644
--- a/polkadot/node/malus/Cargo.toml
+++ b/polkadot/node/malus/Cargo.toml
@@ -37,6 +37,7 @@ polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator"
polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" }
polkadot-node-core-backing = { path = "../core/backing" }
polkadot-node-primitives = { path = "../primitives" }
+polkadot-node-network-protocol = { path = "../network/protocol" }
polkadot-primitives = { path = "../../primitives" }
color-eyre = { version = "0.6.1", default-features = false }
assert_matches = "1.5"
diff --git a/polkadot/node/malus/src/malus.rs b/polkadot/node/malus/src/malus.rs
index 7a9e320e2736..6257201537c8 100644
--- a/polkadot/node/malus/src/malus.rs
+++ b/polkadot/node/malus/src/malus.rs
@@ -40,6 +40,8 @@ enum NemesisVariant {
DisputeAncestor(DisputeAncestorOptions),
/// Delayed disputing of finalized candidates.
DisputeFinalizedCandidates(DisputeFinalizedCandidatesOptions),
+ /// Spam many request statements instead of sending a single one.
+ SpamStatementRequests(SpamStatementRequestsOptions),
}
#[derive(Debug, Parser)]
@@ -98,6 +100,11 @@ impl MalusCli {
finality_delay,
)?
},
+ NemesisVariant::SpamStatementRequests(opts) => {
+ let SpamStatementRequestsOptions { spam_factor, cli } = opts;
+
+ polkadot_cli::run_node(cli, SpamStatementRequests { spam_factor }, finality_delay)?
+ },
}
Ok(())
}
diff --git a/polkadot/node/malus/src/variants/mod.rs b/polkadot/node/malus/src/variants/mod.rs
index 3ca1bf4b4696..ec945ae19457 100644
--- a/polkadot/node/malus/src/variants/mod.rs
+++ b/polkadot/node/malus/src/variants/mod.rs
@@ -20,6 +20,7 @@ mod back_garbage_candidate;
mod common;
mod dispute_finalized_candidates;
mod dispute_valid_candidates;
+mod spam_statement_requests;
mod suggest_garbage_candidate;
mod support_disabled;
@@ -27,6 +28,7 @@ pub(crate) use self::{
back_garbage_candidate::{BackGarbageCandidateOptions, BackGarbageCandidates},
dispute_finalized_candidates::{DisputeFinalizedCandidates, DisputeFinalizedCandidatesOptions},
dispute_valid_candidates::{DisputeAncestorOptions, DisputeValidCandidates},
+ spam_statement_requests::{SpamStatementRequests, SpamStatementRequestsOptions},
suggest_garbage_candidate::{SuggestGarbageCandidateOptions, SuggestGarbageCandidates},
support_disabled::{SupportDisabled, SupportDisabledOptions},
};
diff --git a/polkadot/node/malus/src/variants/spam_statement_requests.rs b/polkadot/node/malus/src/variants/spam_statement_requests.rs
new file mode 100644
index 000000000000..c5970c988ac2
--- /dev/null
+++ b/polkadot/node/malus/src/variants/spam_statement_requests.rs
@@ -0,0 +1,155 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! A malicious node variant that attempts spam statement requests.
+//!
+//! This malus variant behaves honestly in everything except when propagating statement distribution
+//! requests through the network bridge subsystem. Instead of sending a single request when it needs
+//! something it attempts to spam the peer with multiple requests.
+//!
+//! Attention: For usage with `zombienet` only!
+
+#![allow(missing_docs)]
+
+use polkadot_cli::{
+ service::{
+ AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
+ OverseerGenArgs, OverseerHandle,
+ },
+ validator_overseer_builder, Cli,
+};
+use polkadot_node_network_protocol::request_response::{outgoing::Requests, OutgoingRequest};
+use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, SpawnGlue};
+use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
+use sp_core::traits::SpawnNamed;
+
+// Filter wrapping related types.
+use crate::{interceptor::*, shared::MALUS};
+
+use std::sync::Arc;
+
+/// Wraps around network bridge and replaces it.
+#[derive(Clone)]
+struct RequestSpammer {
+ spam_factor: u32, // How many statement distribution requests to send.
+}
+
+impl MessageInterceptor for RequestSpammer
+where
+ Sender: overseer::NetworkBridgeTxSenderTrait + Clone + Send + 'static,
+{
+ type Message = NetworkBridgeTxMessage;
+
+ /// Intercept NetworkBridgeTxMessage::SendRequests with Requests::AttestedCandidateV2 inside and
+ /// duplicate that request
+ fn intercept_incoming(
+ &self,
+ _subsystem_sender: &mut Sender,
+ msg: FromOrchestra,
+ ) -> Option> {
+ match msg {
+ FromOrchestra::Communication {
+ msg: NetworkBridgeTxMessage::SendRequests(requests, if_disconnected),
+ } => {
+ let mut new_requests = Vec::new();
+
+ for request in requests {
+ match request {
+ Requests::AttestedCandidateV2(ref req) => {
+ // Temporarily store peer and payload for duplication
+ let peer_to_duplicate = req.peer.clone();
+ let payload_to_duplicate = req.payload.clone();
+ // Push the original request
+ new_requests.push(request);
+
+ // Duplicate for spam purposes
+ gum::info!(
+ target: MALUS,
+ "😈 Duplicating AttestedCandidateV2 request extra {:?} times to peer: {:?}.", self.spam_factor, peer_to_duplicate,
+ );
+ new_requests.extend((0..self.spam_factor - 1).map(|_| {
+ let (new_outgoing_request, _) = OutgoingRequest::new(
+ peer_to_duplicate.clone(),
+ payload_to_duplicate.clone(),
+ );
+ Requests::AttestedCandidateV2(new_outgoing_request)
+ }));
+ },
+ _ => {
+ new_requests.push(request);
+ },
+ }
+ }
+
+ // Passthrough the message with a potentially modified number of requests
+ Some(FromOrchestra::Communication {
+ msg: NetworkBridgeTxMessage::SendRequests(new_requests, if_disconnected),
+ })
+ },
+ FromOrchestra::Communication { msg } => Some(FromOrchestra::Communication { msg }),
+ FromOrchestra::Signal(signal) => Some(FromOrchestra::Signal(signal)),
+ }
+ }
+}
+
+//----------------------------------------------------------------------------------
+
+#[derive(Debug, clap::Parser)]
+#[clap(rename_all = "kebab-case")]
+#[allow(missing_docs)]
+pub struct SpamStatementRequestsOptions {
+ /// How many statement distribution requests to send.
+ #[clap(long, ignore_case = true, default_value_t = 1000, value_parser = clap::value_parser!(u32).range(0..=10000000))]
+ pub spam_factor: u32,
+
+ #[clap(flatten)]
+ pub cli: Cli,
+}
+
+/// SpamStatementRequests implementation wrapper which implements `OverseerGen` glue.
+pub(crate) struct SpamStatementRequests {
+ /// How many statement distribution requests to send.
+ pub spam_factor: u32,
+}
+
+impl OverseerGen for SpamStatementRequests {
+ fn generate(
+ &self,
+ connector: OverseerConnector,
+ args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
+ ext_args: Option,
+ ) -> Result<(Overseer, Arc>, OverseerHandle), Error>
+ where
+ RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
+ Spawner: 'static + SpawnNamed + Clone + Unpin,
+ {
+ gum::info!(
+ target: MALUS,
+ "😈 Started Malus node that duplicates each statement distribution request spam_factor = {:?} times.",
+ &self.spam_factor,
+ );
+
+ let request_spammer = RequestSpammer { spam_factor: self.spam_factor };
+
+ validator_overseer_builder(
+ args,
+ ext_args.expect("Extended arguments required to build validator overseer are provided"),
+ )?
+ .replace_network_bridge_tx(move |cb| InterceptedSubsystem::new(cb, request_spammer))
+ .build_with_connector(connector)
+ .map_err(|e| e.into())
+ }
+}
diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs
index 7e91d2849120..4ca199c3378b 100644
--- a/polkadot/node/network/statement-distribution/src/lib.rs
+++ b/polkadot/node/network/statement-distribution/src/lib.rs
@@ -207,6 +207,7 @@ impl StatementDistributionSubsystem {
v2::respond_task(
self.req_receiver.take().expect("Mandatory argument to new. qed"),
res_sender.clone(),
+ self.metrics.clone(),
)
.boxed(),
)
diff --git a/polkadot/node/network/statement-distribution/src/metrics.rs b/polkadot/node/network/statement-distribution/src/metrics.rs
index b9a51dc89d61..1bc994174263 100644
--- a/polkadot/node/network/statement-distribution/src/metrics.rs
+++ b/polkadot/node/network/statement-distribution/src/metrics.rs
@@ -24,14 +24,19 @@ const HISTOGRAM_LATENCY_BUCKETS: &[f64] = &[
#[derive(Clone)]
struct MetricsInner {
+ // V1
statements_distributed: prometheus::Counter,
sent_requests: prometheus::Counter,
received_responses: prometheus::CounterVec,
- active_leaves_update: prometheus::Histogram,
- share: prometheus::Histogram,
network_bridge_update: prometheus::HistogramVec,
statements_unexpected: prometheus::CounterVec,
created_message_size: prometheus::Gauge,
+ // V1+
+ active_leaves_update: prometheus::Histogram,
+ share: prometheus::Histogram,
+ // V2+
+ peer_rate_limit_request_drop: prometheus::Counter,
+ max_parallel_requests_reached: prometheus::Counter,
}
/// Statement Distribution metrics.
@@ -114,6 +119,23 @@ impl Metrics {
metrics.created_message_size.set(size as u64);
}
}
+
+ /// Update sent dropped requests counter when request dropped because
+ /// of peer rate limit
+ pub fn on_request_dropped_peer_rate_limit(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.peer_rate_limit_request_drop.inc();
+ }
+ }
+
+ /// Update max parallel requests reached counter
+ /// This counter is updated when the maximum number of parallel requests is reached
+ /// and we are waiting for one of the requests to finish
+ pub fn on_max_parallel_requests_reached(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.max_parallel_requests_reached.inc();
+ }
+ }
}
impl metrics::Metrics for Metrics {
@@ -193,6 +215,20 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
+ peer_rate_limit_request_drop: prometheus::register(
+ prometheus::Counter::new(
+ "polkadot_parachain_statement_distribution_peer_rate_limit_request_drop_total",
+ "Number of statement distribution requests dropped because of the peer rate limiting.",
+ )?,
+ registry,
+ )?,
+ max_parallel_requests_reached: prometheus::register(
+ prometheus::Counter::new(
+ "polkadot_parachain_statement_distribution_max_parallel_requests_reached_total",
+ "Number of times the maximum number of parallel requests was reached.",
+ )?,
+ registry,
+ )?,
};
Ok(Metrics(Some(metrics)))
}
diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs
index 118e34e92063..8579ac15cbc1 100644
--- a/polkadot/node/network/statement-distribution/src/v2/mod.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs
@@ -59,6 +59,8 @@ use sp_keystore::KeystorePtr;
use fatality::Nested;
use futures::{
channel::{mpsc, oneshot},
+ future::FutureExt,
+ select,
stream::FuturesUnordered,
SinkExt, StreamExt,
};
@@ -73,6 +75,7 @@ use std::{
use crate::{
error::{JfyiError, JfyiErrorResult},
+ metrics::Metrics,
LOG_TARGET,
};
use candidates::{BadAdvertisement, Candidates, PostConfirmation};
@@ -3423,35 +3426,61 @@ pub(crate) struct ResponderMessage {
pub(crate) async fn respond_task(
mut receiver: IncomingRequestReceiver,
mut sender: mpsc::Sender,
+ metrics: Metrics,
) {
let mut pending_out = FuturesUnordered::new();
+ let mut active_peers = HashSet::new();
+
loop {
- // Ensure we are not handling too many requests in parallel.
- if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
- // Wait for one to finish:
- pending_out.next().await;
- }
+ select! {
+ // New request
+ request_result = receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse() => {
+ let request = match request_result.into_nested() {
+ Ok(Ok(v)) => v,
+ Err(fatal) => {
+ gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder");
+ return
+ },
+ Ok(Err(jfyi)) => {
+ gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed");
+ continue
+ },
+ };
- let req = match receiver.recv(|| vec![COST_INVALID_REQUEST]).await.into_nested() {
- Ok(Ok(v)) => v,
- Err(fatal) => {
- gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder");
- return
+ // If peer currently being served drop request
+ if active_peers.contains(&request.peer) {
+ gum::trace!(target: LOG_TARGET, "Peer already being served, dropping request");
+ metrics.on_request_dropped_peer_rate_limit();
+ continue
+ }
+
+ // If we are over parallel limit wait for one to finish
+ if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
+ gum::trace!(target: LOG_TARGET, "Over max parallel requests, waiting for one to finish");
+ metrics.on_max_parallel_requests_reached();
+ let (_, peer) = pending_out.select_next_some().await;
+ active_peers.remove(&peer);
+ }
+
+ // Start serving the request
+ let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
+ let peer = request.peer;
+ if let Err(err) = sender
+ .feed(ResponderMessage { request, sent_feedback: pending_sent_tx })
+ .await
+ {
+ gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder");
+ return
+ }
+ let future_with_peer = pending_sent_rx.map(move |result| (result, peer));
+ pending_out.push(future_with_peer);
+ active_peers.insert(peer);
},
- Ok(Err(jfyi)) => {
- gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed");
- continue
+ // Request served/finished
+ result = pending_out.select_next_some() => {
+ let (_, peer) = result;
+ active_peers.remove(&peer);
},
- };
-
- let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
- if let Err(err) = sender
- .feed(ResponderMessage { request: req, sent_feedback: pending_sent_tx })
- .await
- {
- gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder");
- return
}
- pending_out.push(pending_sent_rx);
}
}
diff --git a/polkadot/node/network/statement-distribution/src/v2/requests.rs b/polkadot/node/network/statement-distribution/src/v2/requests.rs
index 1ed18ffd42a9..b8ed34d26c8a 100644
--- a/polkadot/node/network/statement-distribution/src/v2/requests.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/requests.rs
@@ -366,6 +366,7 @@ impl RequestManager {
id,
&props,
&peer_advertised,
+ &response_manager,
) {
None => continue,
Some(t) => t,
@@ -387,14 +388,17 @@ impl RequestManager {
);
let stored_id = id.clone();
- response_manager.push(Box::pin(async move {
- TaggedResponse {
- identifier: stored_id,
- requested_peer: target,
- props,
- response: response_fut.await,
- }
- }));
+ response_manager.push(
+ Box::pin(async move {
+ TaggedResponse {
+ identifier: stored_id,
+ requested_peer: target,
+ props,
+ response: response_fut.await,
+ }
+ }),
+ target,
+ );
entry.in_flight = true;
@@ -422,28 +426,35 @@ impl RequestManager {
/// A manager for pending responses.
pub struct ResponseManager {
pending_responses: FuturesUnordered>,
+ active_peers: HashSet,
}
impl ResponseManager {
pub fn new() -> Self {
- Self { pending_responses: FuturesUnordered::new() }
+ Self { pending_responses: FuturesUnordered::new(), active_peers: HashSet::new() }
}
/// Await the next incoming response to a sent request, or immediately
/// return `None` if there are no pending responses.
pub async fn incoming(&mut self) -> Option {
- self.pending_responses
- .next()
- .await
- .map(|response| UnhandledResponse { response })
+ self.pending_responses.next().await.map(|response| {
+ self.active_peers.remove(&response.requested_peer);
+ UnhandledResponse { response }
+ })
}
fn len(&self) -> usize {
self.pending_responses.len()
}
- fn push(&mut self, response: BoxFuture<'static, TaggedResponse>) {
+ fn push(&mut self, response: BoxFuture<'static, TaggedResponse>, target: PeerId) {
self.pending_responses.push(response);
+ self.active_peers.insert(target);
+ }
+
+ /// Returns true if we are currently sending a request to the peer.
+ fn is_sending_to(&self, peer: &PeerId) -> bool {
+ self.active_peers.contains(peer)
}
}
@@ -471,10 +482,16 @@ fn find_request_target_with_update(
candidate_identifier: &CandidateIdentifier,
props: &RequestProperties,
peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option,
+ response_manager: &ResponseManager,
) -> Option {
let mut prune = Vec::new();
let mut target = None;
for (i, p) in known_by.iter().enumerate() {
+ // If we are already sending to that peer, skip for now
+ if response_manager.is_sending_to(p) {
+ continue
+ }
+
let mut filter = match peer_advertised(candidate_identifier, p) {
None => {
prune.push(i);
@@ -1002,7 +1019,8 @@ mod tests {
candidate_receipt.descriptor.persisted_validation_data_hash =
persisted_validation_data.hash();
let candidate = candidate_receipt.hash();
- let requested_peer = PeerId::random();
+ let requested_peer_1 = PeerId::random();
+ let requested_peer_2 = PeerId::random();
let identifier1 = request_manager
.get_or_insert(relay_parent, candidate, 1.into())
@@ -1010,14 +1028,14 @@ mod tests {
.clone();
request_manager
.get_or_insert(relay_parent, candidate, 1.into())
- .add_peer(requested_peer);
+ .add_peer(requested_peer_1);
let identifier2 = request_manager
.get_or_insert(relay_parent, candidate, 2.into())
.identifier
.clone();
request_manager
.get_or_insert(relay_parent, candidate, 2.into())
- .add_peer(requested_peer);
+ .add_peer(requested_peer_2);
assert_ne!(identifier1, identifier2);
assert_eq!(request_manager.requests.len(), 2);
@@ -1053,7 +1071,7 @@ mod tests {
let response = UnhandledResponse {
response: TaggedResponse {
identifier: identifier1,
- requested_peer,
+ requested_peer: requested_peer_1,
props: request_properties.clone(),
response: Ok(AttestedCandidateResponse {
candidate_receipt: candidate_receipt.clone(),
@@ -1076,13 +1094,13 @@ mod tests {
assert_eq!(
output,
ResponseValidationOutput {
- requested_peer,
+ requested_peer: requested_peer_1,
request_status: CandidateRequestStatus::Complete {
candidate: candidate_receipt.clone(),
persisted_validation_data: persisted_validation_data.clone(),
statements,
},
- reputation_changes: vec![(requested_peer, BENEFIT_VALID_RESPONSE)],
+ reputation_changes: vec![(requested_peer_1, BENEFIT_VALID_RESPONSE)],
}
);
}
@@ -1093,7 +1111,7 @@ mod tests {
let response = UnhandledResponse {
response: TaggedResponse {
identifier: identifier2,
- requested_peer,
+ requested_peer: requested_peer_2,
props: request_properties,
response: Ok(AttestedCandidateResponse {
candidate_receipt: candidate_receipt.clone(),
@@ -1115,12 +1133,14 @@ mod tests {
assert_eq!(
output,
ResponseValidationOutput {
- requested_peer,
+ requested_peer: requested_peer_2,
request_status: CandidateRequestStatus::Outdated,
reputation_changes: vec![],
}
);
}
+
+ assert_eq!(request_manager.requests.len(), 0);
}
// Test case where we had a request in-flight and the request entry was garbage-collected on
@@ -1293,4 +1313,140 @@ mod tests {
assert_eq!(request_manager.requests.len(), 0);
assert_eq!(request_manager.by_priority.len(), 0);
}
+
+ // Test case where we queue 2 requests to be sent to the same peer and 1 request to another
+ // peer. Same peer requests should be served one at a time but they should not block the other
+ // peer request.
+ #[test]
+ fn rate_limit_requests_to_same_peer() {
+ let mut request_manager = RequestManager::new();
+ let mut response_manager = ResponseManager::new();
+
+ let relay_parent = Hash::from_low_u64_le(1);
+
+ // Create 3 candidates
+ let mut candidate_receipt_1 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
+ let persisted_validation_data_1 = dummy_pvd();
+ candidate_receipt_1.descriptor.persisted_validation_data_hash =
+ persisted_validation_data_1.hash();
+ let candidate_1 = candidate_receipt_1.hash();
+
+ let mut candidate_receipt_2 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
+ let persisted_validation_data_2 = dummy_pvd();
+ candidate_receipt_2.descriptor.persisted_validation_data_hash =
+ persisted_validation_data_2.hash();
+ let candidate_2 = candidate_receipt_2.hash();
+
+ let mut candidate_receipt_3 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
+ let persisted_validation_data_3 = dummy_pvd();
+ candidate_receipt_3.descriptor.persisted_validation_data_hash =
+ persisted_validation_data_3.hash();
+ let candidate_3 = candidate_receipt_3.hash();
+
+ // Create 2 peers
+ let requested_peer_1 = PeerId::random();
+ let requested_peer_2 = PeerId::random();
+
+ let group_size = 3;
+ let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
+ let unwanted_mask = StatementFilter::blank(group_size);
+ let disabled_mask: BitVec = Default::default();
+ let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
+ let request_props = |_identifier: &CandidateIdentifier| Some((&request_properties).clone());
+ let peer_advertised =
+ |_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
+
+ // Add request for candidate 1 from peer 1
+ let identifier1 = request_manager
+ .get_or_insert(relay_parent, candidate_1, 1.into())
+ .identifier
+ .clone();
+ request_manager
+ .get_or_insert(relay_parent, candidate_1, 1.into())
+ .add_peer(requested_peer_1);
+
+ // Add request for candidate 3 from peer 2 (this one can be served in parallel)
+ let _identifier3 = request_manager
+ .get_or_insert(relay_parent, candidate_3, 1.into())
+ .identifier
+ .clone();
+ request_manager
+ .get_or_insert(relay_parent, candidate_3, 1.into())
+ .add_peer(requested_peer_2);
+
+ // Successfully dispatch request for candidate 1 from peer 1 and candidate 3 from peer 2
+ for _ in 0..2 {
+ let outgoing =
+ request_manager.next_request(&mut response_manager, request_props, peer_advertised);
+ assert!(outgoing.is_some());
+ }
+ assert_eq!(response_manager.active_peers.len(), 2);
+ assert!(response_manager.is_sending_to(&requested_peer_1));
+ assert!(response_manager.is_sending_to(&requested_peer_2));
+ assert_eq!(request_manager.requests.len(), 2);
+
+ // Add request for candidate 2 from peer 1
+ let _identifier2 = request_manager
+ .get_or_insert(relay_parent, candidate_2, 1.into())
+ .identifier
+ .clone();
+ request_manager
+ .get_or_insert(relay_parent, candidate_2, 1.into())
+ .add_peer(requested_peer_1);
+
+ // Do not dispatch the request for the second candidate from peer 1 (already serving that
+ // peer)
+ let outgoing =
+ request_manager.next_request(&mut response_manager, request_props, peer_advertised);
+ assert!(outgoing.is_none());
+ assert_eq!(response_manager.active_peers.len(), 2);
+ assert!(response_manager.is_sending_to(&requested_peer_1));
+ assert!(response_manager.is_sending_to(&requested_peer_2));
+ assert_eq!(request_manager.requests.len(), 3);
+
+ // Manually mark response received (response future resolved)
+ response_manager.active_peers.remove(&requested_peer_1);
+ response_manager.pending_responses = FuturesUnordered::new();
+
+ // Validate first response (candidate 1 from peer 1)
+ {
+ let statements = vec![];
+ let response = UnhandledResponse {
+ response: TaggedResponse {
+ identifier: identifier1,
+ requested_peer: requested_peer_1,
+ props: request_properties.clone(),
+ response: Ok(AttestedCandidateResponse {
+ candidate_receipt: candidate_receipt_1.clone(),
+ persisted_validation_data: persisted_validation_data_1.clone(),
+ statements,
+ }),
+ },
+ };
+ let validator_key_lookup = |_v| None;
+ let allowed_para_lookup = |_para, _g_index| true;
+ let _output = response.validate_response(
+ &mut request_manager,
+ group,
+ 0,
+ validator_key_lookup,
+ allowed_para_lookup,
+ disabled_mask.clone(),
+ );
+
+ // First request served successfully
+ assert_eq!(request_manager.requests.len(), 2);
+ assert_eq!(response_manager.active_peers.len(), 1);
+ assert!(response_manager.is_sending_to(&requested_peer_2));
+ }
+
+ // Check if the request that was ignored previously will be served now
+ let outgoing =
+ request_manager.next_request(&mut response_manager, request_props, peer_advertised);
+ assert!(outgoing.is_some());
+ assert_eq!(response_manager.active_peers.len(), 2);
+ assert!(response_manager.is_sending_to(&requested_peer_1));
+ assert!(response_manager.is_sending_to(&requested_peer_2));
+ assert_eq!(request_manager.requests.len(), 2);
+ }
}
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
index 8cf139802148..c9de42d2c468 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
@@ -1891,7 +1891,7 @@ fn local_node_sanity_checks_incoming_requests() {
let mask = StatementFilter::blank(state.config.group_size + 1);
let response = state
.send_request(
- peer_c,
+ peer_a,
request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask },
)
.await
diff --git a/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.toml b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.toml
new file mode 100644
index 000000000000..14208425d62b
--- /dev/null
+++ b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.toml
@@ -0,0 +1,43 @@
+[settings]
+timeout = 1000
+
+[relaychain.genesis.runtimeGenesis.patch.configuration.config]
+ needed_approvals = 2
+
+[relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params]
+ max_validators_per_core = 5
+
+[relaychain]
+default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}"
+chain = "rococo-local"
+default_command = "polkadot"
+
+[relaychain.default_resources]
+limits = { memory = "4G", cpu = "2" }
+requests = { memory = "2G", cpu = "1" }
+
+ [[relaychain.node_groups]]
+ name = "honest"
+ count = 4
+ args = ["-lparachain=debug,parachain::statement-distribution=trace"]
+
+ [[relaychain.nodes]]
+ image = "{{MALUS_IMAGE}}"
+ name = "malus"
+ command = "malus spam-statement-requests"
+ args = [ "--alice", "-lparachain=debug,MALUS=trace", "--spam-factor=1000" ]
+
+{% for id in range(2000,2001) %}
+[[parachains]]
+id = {{id}}
+ [parachains.collator]
+ image = "{{COL_IMAGE}}"
+ name = "collator"
+ command = "undying-collator"
+ args = ["-lparachain=debug"]
+{% endfor %}
+
+[types.Header]
+number = "u64"
+parent_hash = "Hash"
+post_state = "Hash"
diff --git a/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.zndsl b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.zndsl
new file mode 100644
index 000000000000..9985dd24ee38
--- /dev/null
+++ b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.zndsl
@@ -0,0 +1,27 @@
+Description: Test if parachains progress when group is getting spammed by statement distribution requests.
+Network: ./0012-spam-statement-distribution-requests.toml
+Creds: config
+
+# Check authority status and peers.
+malus: reports node_roles is 4
+honest: reports node_roles is 4
+
+# Ensure parachains are registered.
+honest: parachain 2000 is registered within 60 seconds
+
+# Ensure that malus is already attempting to DoS
+malus: log line contains "😈 Duplicating AttestedCandidateV2 request" within 90 seconds
+
+# Ensure parachains made progress.
+honest: parachain 2000 block height is at least 10 within 200 seconds
+
+# Ensure that honest nodes drop extra requests
+honest: log line contains "Peer already being served, dropping request" within 60 seconds
+
+# Check lag - approval
+honest: reports polkadot_parachain_approval_checking_finality_lag is 0
+
+# Check lag - dispute conclusion
+honest: reports polkadot_parachain_disputes_finality_lag is 0
+
+
diff --git a/prdoc/pr_3444.prdoc b/prdoc/pr_3444.prdoc
new file mode 100644
index 000000000000..3afb38106417
--- /dev/null
+++ b/prdoc/pr_3444.prdoc
@@ -0,0 +1,25 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: Rate-limiting of statement distribution v2 requests to 1 per peer
+
+doc:
+ - audience: Node Dev
+ description: |
+ A new malicious node variant that sends duplicate statement
+ distribution messages to spam other peers.
+
+ - audience: Node Operator
+ description: |
+ Added rate-limiting in the statement distribution request-response
+ protocol. Requesters will not issue another request to a peer if one
+ is already pending with that peer and receiving nodes will reject
+ requests from peers that they are currently serving.
+ This should reduce the risk of validator-validator DoS attacks and
+ better load-balance statement distribution.
+
+crates:
+ - name: polkadot-test-malus
+ bump: minor
+ - name: polkadot-statement-distribution
+ bump: minor