Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Statement Distribution Per Peer Rate Limit #3444

Merged
merged 31 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1b1ba25
minimal scaffolding
Overkillus Feb 22, 2024
3345aa6
respond task select overhaul
Overkillus Feb 22, 2024
1734c09
test adjustment
Overkillus Feb 23, 2024
cb1d210
fmt
Overkillus Feb 23, 2024
58d828d
Rate limit sending from the requester side
Overkillus Feb 26, 2024
b5e7e06
sender side rate limiter test
Overkillus Mar 20, 2024
456a466
spammer malus variant
Overkillus Mar 21, 2024
975e3bd
Duplicate requests
Overkillus Apr 2, 2024
7056159
Filtering test
Overkillus Apr 2, 2024
3f234e1
fmt
Overkillus Apr 2, 2024
7e817fa
prdoc
Overkillus Apr 2, 2024
913b234
pipeline
Overkillus Apr 2, 2024
e812dcc
increment test
Overkillus Apr 2, 2024
c01be29
Merge branch 'master' into mkz-statement-distribution-rate-limit
Overkillus Apr 2, 2024
af234ef
crate bump
Overkillus Apr 2, 2024
fdaaf4a
clippy nits
Overkillus Apr 2, 2024
e988c60
zombienet test simplification
Overkillus Apr 19, 2024
83cf297
review fixes
Overkillus Apr 19, 2024
37fd618
debug -> trace
Overkillus Apr 30, 2024
adf84a2
cleanup
Overkillus Apr 30, 2024
f0a40d5
metric registered
Overkillus Apr 30, 2024
6099498
reverting paste mistake
Overkillus Apr 30, 2024
07d44ee
registering max parallel requests metric
Overkillus Apr 30, 2024
09e7367
updating metrics in statement distribution respond task
Overkillus Apr 30, 2024
d1f50c5
fmt
Overkillus Apr 30, 2024
e9680d1
Merge branch 'master' into mkz-statement-distribution-rate-limit
Overkillus May 1, 2024
678b141
param typo
Overkillus May 1, 2024
7ff1ddd
Bump test numbering
Overkillus May 1, 2024
f6e8def
remove unused malus param
Overkillus May 1, 2024
8e79dc4
tracing
Overkillus May 1, 2024
a38762b
fmt
Overkillus May 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitlab/pipeline/zombienet/polkadot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ zombienet-polkadot-elastic-scaling-0002-elastic-scaling-doesnt-break-parachains:
--local-dir="${LOCAL_DIR}/elastic_scaling"
--test="0002-elastic-scaling-doesnt-break-parachains.zndsl"

zombienet-polkadot-functional-0012-spam-statement-distribution-requests:
extends:
- .zombienet-polkadot-common
script:
- /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
--local-dir="${LOCAL_DIR}/functional"
--test="0012-spam-statement-distribution-requests.zndsl"

zombienet-polkadot-smoke-0001-parachains-smoke-test:
extends:
- .zombienet-polkadot-common
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions polkadot/node/malus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator"
polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" }
polkadot-node-core-backing = { path = "../core/backing" }
polkadot-node-primitives = { path = "../primitives" }
polkadot-node-network-protocol = { path = "../network/protocol" }
polkadot-primitives = { path = "../../primitives" }
color-eyre = { version = "0.6.1", default-features = false }
assert_matches = "1.5"
Expand Down
7 changes: 7 additions & 0 deletions polkadot/node/malus/src/malus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ enum NemesisVariant {
DisputeAncestor(DisputeAncestorOptions),
/// Delayed disputing of finalized candidates.
DisputeFinalizedCandidates(DisputeFinalizedCandidatesOptions),
/// Spam many request statements instead of sending a single one.
SpamStatementRequests(SpamStatementRequestsOptions),
}

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -98,6 +100,11 @@ impl MalusCli {
finality_delay,
)?
},
NemesisVariant::SpamStatementRequests(opts) => {
let SpamStatementRequestsOptions { spam_factor, cli } = opts;

polkadot_cli::run_node(cli, SpamStatementRequests { spam_factor }, finality_delay)?
},
}
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions polkadot/node/malus/src/variants/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ mod back_garbage_candidate;
mod common;
mod dispute_finalized_candidates;
mod dispute_valid_candidates;
mod spam_statement_requests;
mod suggest_garbage_candidate;
mod support_disabled;

pub(crate) use self::{
back_garbage_candidate::{BackGarbageCandidateOptions, BackGarbageCandidates},
dispute_finalized_candidates::{DisputeFinalizedCandidates, DisputeFinalizedCandidatesOptions},
dispute_valid_candidates::{DisputeAncestorOptions, DisputeValidCandidates},
spam_statement_requests::{SpamStatementRequests, SpamStatementRequestsOptions},
suggest_garbage_candidate::{SuggestGarbageCandidateOptions, SuggestGarbageCandidates},
support_disabled::{SupportDisabled, SupportDisabledOptions},
};
Expand Down
155 changes: 155 additions & 0 deletions polkadot/node/malus/src/variants/spam_statement_requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! A malicious node variant that attempts spam statement requests.
//!
//! This malus variant behaves honestly in everything except when propagating statement distribution
//! requests through the network bridge subsystem. Instead of sending a single request when it needs
//! something it attempts to spam the peer with multiple requests.
//!
//! Attention: For usage with `zombienet` only!
#![allow(missing_docs)]

use polkadot_cli::{
service::{
AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
OverseerGenArgs, OverseerHandle,
},
validator_overseer_builder, Cli,
};
use polkadot_node_network_protocol::request_response::{outgoing::Requests, OutgoingRequest};
use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, SpawnGlue};
use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
use sp_core::traits::SpawnNamed;

// Filter wrapping related types.
use crate::{interceptor::*, shared::MALUS};

use std::sync::Arc;

/// Wraps around network bridge and replaces it.
#[derive(Clone)]
struct RequestSpammer {
spam_factor: u32, // How many statement distribution requests to send.
}

impl<Sender> MessageInterceptor<Sender> for RequestSpammer
where
Sender: overseer::NetworkBridgeTxSenderTrait + Clone + Send + 'static,
{
type Message = NetworkBridgeTxMessage;

/// Intercept NetworkBridgeTxMessage::SendRequests with Requests::AttestedCandidateV2 inside and
/// duplicate that request
fn intercept_incoming(
&self,
_subsystem_sender: &mut Sender,
msg: FromOrchestra<Self::Message>,
) -> Option<FromOrchestra<Self::Message>> {
match msg {
FromOrchestra::Communication {
msg: NetworkBridgeTxMessage::SendRequests(requests, if_disconnected),
} => {
let mut new_requests = Vec::new();

for request in requests {
match request {
Requests::AttestedCandidateV2(ref req) => {
// Temporarily store peer and payload for duplication
let peer_to_duplicate = req.peer.clone();
let payload_to_duplicate = req.payload.clone();
// Push the original request
new_requests.push(request);

// Duplicate for spam purposes
gum::info!(
target: MALUS,
"😈 Duplicating AttestedCandidateV2 request extra {:?} times to peer: {:?}.", self.spam_factor, peer_to_duplicate,
);
new_requests.extend((0..self.spam_factor - 1).map(|_| {
let (new_outgoing_request, _) = OutgoingRequest::new(
peer_to_duplicate.clone(),
payload_to_duplicate.clone(),
);
Requests::AttestedCandidateV2(new_outgoing_request)
}));
},
_ => {
new_requests.push(request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can dedup this line by moving it above the match.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is somewhat of a conscious trade-off but if you see a graceful way of doing it feel free to suggest it.

Problem is Request does not implement copy/clone. I could possibly track the index where I push it but this will make it even less readable imo.

},
}
}

// Passthrough the message with a potentially modified number of requests
Some(FromOrchestra::Communication {
msg: NetworkBridgeTxMessage::SendRequests(new_requests, if_disconnected),
})
},
FromOrchestra::Communication { msg } => Some(FromOrchestra::Communication { msg }),
FromOrchestra::Signal(signal) => Some(FromOrchestra::Signal(signal)),
}
}
}

//----------------------------------------------------------------------------------

#[derive(Debug, clap::Parser)]
#[clap(rename_all = "kebab-case")]
#[allow(missing_docs)]
pub struct SpamStatementRequestsOptions {
/// How many statement distribution requests to send.
#[clap(long, ignore_case = true, default_value_t = 1000, value_parser = clap::value_parser!(u32).range(0..=10000000))]
pub spam_factor: u32,

#[clap(flatten)]
pub cli: Cli,
}

/// SpamStatementRequests implementation wrapper which implements `OverseerGen` glue.
pub(crate) struct SpamStatementRequests {
/// How many statement distribution requests to send.
pub spam_factor: u32,
}

impl OverseerGen for SpamStatementRequests {
fn generate<Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
ext_args: Option<ExtendedOverseerGenArgs>,
) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
where
RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
gum::info!(
target: MALUS,
"😈 Started Malus node that duplicates each statement distribution request spam_factor = {:?} times.",
&self.spam_factor,
);

let request_spammer = RequestSpammer { spam_factor: self.spam_factor };

validator_overseer_builder(
args,
ext_args.expect("Extended arguments required to build validator overseer are provided"),
)?
.replace_network_bridge_tx(move |cb| InterceptedSubsystem::new(cb, request_spammer))
.build_with_connector(connector)
.map_err(|e| e.into())
}
}
1 change: 1 addition & 0 deletions polkadot/node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
v2::respond_task(
self.req_receiver.take().expect("Mandatory argument to new. qed"),
res_sender.clone(),
self.metrics.clone(),
)
.boxed(),
)
Expand Down
40 changes: 38 additions & 2 deletions polkadot/node/network/statement-distribution/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@ const HISTOGRAM_LATENCY_BUCKETS: &[f64] = &[

#[derive(Clone)]
struct MetricsInner {
// V1
statements_distributed: prometheus::Counter<prometheus::U64>,
sent_requests: prometheus::Counter<prometheus::U64>,
received_responses: prometheus::CounterVec<prometheus::U64>,
active_leaves_update: prometheus::Histogram,
share: prometheus::Histogram,
network_bridge_update: prometheus::HistogramVec,
statements_unexpected: prometheus::CounterVec<prometheus::U64>,
created_message_size: prometheus::Gauge<prometheus::U64>,
// V1+
active_leaves_update: prometheus::Histogram,
share: prometheus::Histogram,
// V2+
peer_rate_limit_request_drop: prometheus::Counter<prometheus::U64>,
max_parallel_requests_reached: prometheus::Counter<prometheus::U64>,
}

/// Statement Distribution metrics.
Expand Down Expand Up @@ -114,6 +119,23 @@ impl Metrics {
metrics.created_message_size.set(size as u64);
}
}

/// Update sent dropped requests counter when request dropped because
/// of peer rate limit
pub fn on_request_dropped_peer_rate_limit(&self) {
if let Some(metrics) = &self.0 {
metrics.peer_rate_limit_request_drop.inc();
}
}

/// Update max parallel requests reached counter
/// This counter is updated when the maximum number of parallel requests is reached
/// and we are waiting for one of the requests to finish
pub fn on_max_parallel_requests_reached(&self) {
if let Some(metrics) = &self.0 {
metrics.max_parallel_requests_reached.inc();
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -193,6 +215,20 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
peer_rate_limit_request_drop: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_statement_distribution_peer_rate_limit_request_drop_total",
"Number of statement distribution requests dropped because of the peer rate limiting.",
)?,
registry,
)?,
max_parallel_requests_reached: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_statement_distribution_max_parallel_requests_reached_total",
"Number of times the maximum number of parallel requests was reached.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down
Loading
Loading