diff --git a/Cargo.lock b/Cargo.lock index f319a8db8e71..c2480647ccef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8257,6 +8257,7 @@ dependencies = [ "polkadot-node-core-dispute-coordinator", "polkadot-node-core-pvf-execute-worker", "polkadot-node-core-pvf-prepare-worker", + "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index a6a74da50480..608a948e9cd1 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -1269,7 +1269,7 @@ async fn handle_from_overseer( actions }, - ApprovalVotingMessage::CheckAndImportApproval(a, res) => + ApprovalVotingMessage::CheckAndImportApproval(a, res, sender_is_originator) => check_and_import_approval( ctx.sender(), state, @@ -1277,6 +1277,7 @@ async fn handle_from_overseer( session_info_provider, metrics, a, + sender_is_originator, |r| { let _ = res.send(r); }, @@ -1930,6 +1931,7 @@ async fn check_and_import_approval( session_info_provider: &mut RuntimeInfo, metrics: &Metrics, approval: IndirectSignedApprovalVote, + sender_is_originator: bool, with_response: impl FnOnce(ApprovalCheckResult) -> T, ) -> SubsystemResult<(Vec, T)> where @@ -1997,17 +1999,21 @@ where }; // Signature check: - match DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking).check_signature( - &pubkey, - approved_candidate_hash, - block_entry.session(), - &approval.signature, - ) { - Err(_) => respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::InvalidSignature( - approval.validator - ),)), - Ok(()) => {}, - }; + // We perform signatures checks just for the cases where the approvals weren't received directly from + // peer and we received them via gossipping. + if !sender_is_originator { + match DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking).check_signature( + &pubkey, + approved_candidate_hash, + block_entry.session(), + &approval.signature, + ) { + Err(_) => respond_early!(ApprovalCheckResult::Bad( + ApprovalCheckError::InvalidSignature(approval.validator), + )), + Ok(()) => {}, + }; + } let candidate_entry = match db.load_candidate_entry(&approved_candidate_hash)? { Some(c) => c, diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index f58e60c6a487..4ee3f193f6b1 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -590,6 +590,7 @@ async fn check_and_import_approval( msg: ApprovalVotingMessage::CheckAndImportApproval( IndirectSignedApprovalVote { block_hash, candidate_index, validator, signature }, tx, + false, ), }, ) diff --git a/node/core/dispute-coordinator/src/import.rs b/node/core/dispute-coordinator/src/import.rs index 912521834075..1e4e4dd19577 100644 --- a/node/core/dispute-coordinator/src/import.rs +++ b/node/core/dispute-coordinator/src/import.rs @@ -518,17 +518,26 @@ impl ImportResult { let (mut votes, _) = new_state.into_old_state(); for (index, sig) in approval_votes.into_iter() { - debug_assert!( - { - let pub_key = &env.session_info().validators.get(index).expect("indices are validated by approval-voting subsystem; qed"); - let candidate_hash = votes.candidate_receipt.hash(); - let session_index = env.session_index(); - DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking) - .check_signature(pub_key, candidate_hash, session_index, &sig) - .is_ok() - }, - "Signature check for imported approval votes failed! This is a serious bug. Session: {:?}, candidate hash: {:?}, validator index: {:?}", env.session_index(), votes.candidate_receipt.hash(), index - ); + let pub_key = &env + .session_info() + .validators + .get(index) + .expect("indices are validated by approval-voting subsystem; qed"); + let candidate_hash = votes.candidate_receipt.hash(); + let session_index = env.session_index(); + // The candidate sent us an invalid signature, so don't import it. + // This might happen because in approval-voting we do not checks signatures for votes received from the originator. + if !DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking) + .check_signature(pub_key, candidate_hash, session_index, &sig) + .is_ok() + { + gum::trace!( + target: LOG_TARGET, + "Approval checking signature was invalid, so just ignore it" + ); + continue + } + if votes.valid.insert_vote(index, ValidDisputeStatementKind::ApprovalChecking, sig) { imported_valid_votes += 1; imported_approval_votes += 1; diff --git a/node/malus/Cargo.toml b/node/malus/Cargo.toml index 8e23e623174f..0606fa6cd36e 100644 --- a/node/malus/Cargo.toml +++ b/node/malus/Cargo.toml @@ -24,6 +24,8 @@ polkadot-node-core-pvf-execute-worker = { path = "../core/pvf/execute-worker" } polkadot-node-core-pvf-prepare-worker = { path = "../core/pvf/prepare-worker" } polkadot-node-primitives = { path = "../primitives" } polkadot-primitives = { path = "../../primitives" } +polkadot-node-network-protocol = { path = "../network/protocol" } + color-eyre = { version = "0.6.1", default-features = false } assert_matches = "1.5" async-trait = "0.1.57" diff --git a/node/malus/src/interceptor.rs b/node/malus/src/interceptor.rs index cbf39bccd160..854100dbc3ae 100644 --- a/node/malus/src/interceptor.rs +++ b/node/malus/src/interceptor.rs @@ -40,7 +40,7 @@ where /// For non-trivial cases, the `sender` can be used to send /// multiple messages after doing some additional processing. fn intercept_incoming( - &self, + &mut self, _sender: &mut Sender, msg: FromOrchestra, ) -> Option> { diff --git a/node/malus/src/malus.rs b/node/malus/src/malus.rs index d09f8be990a4..0be3ae9e921c 100644 --- a/node/malus/src/malus.rs +++ b/node/malus/src/malus.rs @@ -36,6 +36,8 @@ enum NemesisVariant { BackGarbageCandidate(BackGarbageCandidateOptions), /// Delayed disputing of ancestors that are perfectly fine. DisputeAncestor(DisputeAncestorOptions), + /// Do not distribute approvals to all nodes + WitholdApprovalsDistribution(WitholdApprovalsDistributionOptions), #[allow(missing_docs)] #[command(name = "prepare-worker", hide = true)] @@ -59,6 +61,7 @@ impl MalusCli { /// Launch a malus node. fn launch(self) -> eyre::Result<()> { let finality_delay = self.finality_delay; + match self.variant { NemesisVariant::BackGarbageCandidate(opts) => { let BackGarbageCandidateOptions { percentage, cli } = opts; @@ -88,6 +91,15 @@ impl MalusCli { finality_delay, )? }, + NemesisVariant::WitholdApprovalsDistribution(WitholdApprovalsDistributionOptions { + num_network_groups, + assigned_network_group, + cli, + }) => polkadot_cli::run_node( + cli, + WitholdApprovalsDistribution { num_network_groups, assigned_network_group }, + finality_delay, + )?, NemesisVariant::PvfPrepareWorker(cmd) => { #[cfg(target_os = "android")] { diff --git a/node/malus/src/variants/common.rs b/node/malus/src/variants/common.rs index 4ea8b88b56a5..f07043e56025 100644 --- a/node/malus/src/variants/common.rs +++ b/node/malus/src/variants/common.rs @@ -205,7 +205,7 @@ where // Capture all (approval and backing) candidate validation requests and depending on configuration fail them. fn intercept_incoming( - &self, + &mut self, subsystem_sender: &mut Sender, msg: FromOrchestra, ) -> Option> { diff --git a/node/malus/src/variants/mod.rs b/node/malus/src/variants/mod.rs index 3789f33ac98b..8144665e9f66 100644 --- a/node/malus/src/variants/mod.rs +++ b/node/malus/src/variants/mod.rs @@ -20,10 +20,14 @@ mod back_garbage_candidate; mod common; mod dispute_valid_candidates; mod suggest_garbage_candidate; +mod withold_approvals_distribution; pub(crate) use self::{ back_garbage_candidate::{BackGarbageCandidateOptions, BackGarbageCandidates}, dispute_valid_candidates::{DisputeAncestorOptions, DisputeValidCandidates}, suggest_garbage_candidate::{SuggestGarbageCandidateOptions, SuggestGarbageCandidates}, + withold_approvals_distribution::{ + WitholdApprovalsDistribution, WitholdApprovalsDistributionOptions, + }, }; pub(crate) use common::*; diff --git a/node/malus/src/variants/suggest_garbage_candidate.rs b/node/malus/src/variants/suggest_garbage_candidate.rs index 049cfc2b153d..4326c0ce62bc 100644 --- a/node/malus/src/variants/suggest_garbage_candidate.rs +++ b/node/malus/src/variants/suggest_garbage_candidate.rs @@ -72,7 +72,7 @@ where /// Intercept incoming `Second` requests from the `collator-protocol` subsystem. fn intercept_incoming( - &self, + &mut self, subsystem_sender: &mut Sender, msg: FromOrchestra, ) -> Option> { diff --git a/node/malus/src/variants/withold_approvals_distribution.rs b/node/malus/src/variants/withold_approvals_distribution.rs new file mode 100644 index 000000000000..20e81ebe1800 --- /dev/null +++ b/node/malus/src/variants/withold_approvals_distribution.rs @@ -0,0 +1,311 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! This variant of Malus withold sending of approvals coming from approvals +//! distribution subsystem to just a subset of nodes. It is meant to be used for testing +//! finality is reached even nodes can not reach directly each other. +//! It enforces that peers are grouped in groups of fixed size, then it arranges +//! the groups in a ring topology. +//! +//! +//! Peers can send their messages only to peers in their group and to peers from next +//! group in the ring topology. +//! E.g If we 16 nodes split in 4 groups then: +//! (1, 2, 3, 4) -> (5, 6, 7, 8) -> (9, 10, 11, 12) -> (13, 14, 15, 16 ) +//! ^ | +//! |<------------------------------------------------------| +//! +//! Then node 5 will be able to send messages only to nodes in it's group (6, 7, 8) and to nodes +//! in the next group 9, 10, 11, 12 + +use polkadot_cli::{ + prepared_overseer_builder, + service::{ + AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer, + OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost, + ProvideRuntimeApi, + }, + Cli, +}; +use polkadot_node_subsystem::SpawnGlue; +use polkadot_node_subsystem_types::messages::network_bridge_event::PeerId; +use sp_core::traits::SpawnNamed; + +use crate::interceptor::*; +use polkadot_node_network_protocol::{v1 as protocol_v1, Versioned}; + +use std::sync::Arc; +const LOG_TARGET: &str = "parachain::withold-approvals"; + +#[derive(Debug, clap::Parser)] +#[clap(rename_all = "kebab-case")] +#[allow(missing_docs)] +pub struct WitholdApprovalsDistributionOptions { + /// Determines how many groups to create, the groups are connected in a ring shape, so + /// a node from group N can send messages only to groups from N+1 and will receive messages from N-1 + /// This should helps us test that approval distribution works correctly even in situations where + /// all nodes can't reach each other. + #[clap(short, long, ignore_case = true, default_value_t = 4, value_parser = clap::value_parser!(u8).range(0..=100))] + pub num_network_groups: u8, + + /// The group to which this node is assigned + #[clap(short, long, ignore_case = true, default_value_t = 0, value_parser = clap::value_parser!(u8).range(0..=100))] + pub assigned_network_group: u8, + + #[clap(flatten)] + pub cli: Cli, +} + +pub(crate) struct WitholdApprovalsDistribution { + pub num_network_groups: u8, + pub assigned_network_group: u8, +} + +impl OverseerGen for WitholdApprovalsDistribution { + fn generate<'a, Spawner, RuntimeClient>( + &self, + connector: OverseerConnector, + args: OverseerGenArgs<'a, Spawner, RuntimeClient>, + ) -> Result<(Overseer, Arc>, OverseerHandle), Error> + where + RuntimeClient: 'static + ProvideRuntimeApi + HeaderBackend + AuxStore, + RuntimeClient::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, + Spawner: 'static + SpawnNamed + Clone + Unpin, + { + let spawner = args.spawner.clone(); + let validation_filter = ApprovalsDistributionInterceptor::new( + SpawnGlue(spawner), + self.num_network_groups, + self.assigned_network_group, + ); + + prepared_overseer_builder(args)? + .replace_network_bridge_tx(move |cv_subsystem| { + InterceptedSubsystem::new(cv_subsystem, validation_filter) + }) + .build_with_connector(connector) + .map_err(|e| e.into()) + } +} + +#[derive(Clone, Debug)] +/// Replaces `NetworkBridgeTx`. +pub struct ApprovalsDistributionInterceptor { + spawner: Spawner, + known_peer_ids: Vec, + num_network_groups: u8, + assigned_network_group: u8, +} + +impl ApprovalsDistributionInterceptor +where + Spawner: overseer::gen::Spawner, +{ + pub fn new(spawner: Spawner, num_network_groups: u8, assigned_network_group: u8) -> Self { + Self { spawner, known_peer_ids: vec![], num_network_groups, assigned_network_group } + } + + fn can_send(&self, peer_id: PeerId) -> bool { + let group_size = self.known_peer_ids.len() / (self.num_network_groups as usize) + + if self.known_peer_ids.len() % self.num_network_groups as usize != 0 { 1 } else { 0 }; + + let my_group_in_ring = self + .known_peer_ids + .chunks(group_size) + .skip(self.assigned_network_group as usize) + .next(); + + let next_group_in_ring = self + .known_peer_ids + .chunks(group_size) + .skip((self.assigned_network_group as usize + 1) % self.num_network_groups as usize) + .next(); + + my_group_in_ring + .map(|my_group_peers| my_group_peers.contains(&peer_id)) + .unwrap_or(false) || + next_group_in_ring + .map(|next_group_peers| next_group_peers.contains(&peer_id)) + .unwrap_or(false) + } +} + +impl MessageInterceptor for ApprovalsDistributionInterceptor +where + Sender: overseer::NetworkBridgeTxSenderTrait + Clone + Send + 'static, + Spawner: overseer::gen::Spawner + Clone + 'static, +{ + type Message = NetworkBridgeTxMessage; + + // Capture all (approval and backing) candidate validation requests and depending on configuration fail them. + fn intercept_incoming( + &mut self, + _subsystem_sender: &mut Sender, + msg: FromOrchestra, + ) -> Option> { + match msg { + // Message sent by the approval voting subsystem + FromOrchestra::Communication { + msg: NetworkBridgeTxMessage::SendValidationMessage(peers, message), + } => { + let new_peers: Vec<&PeerId> = + peers.iter().filter(|peer_id| !self.known_peer_ids.contains(peer_id)).collect(); + if !new_peers.is_empty() { + self.known_peer_ids.extend(new_peers); + self.known_peer_ids.sort(); + } + + match &message { + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(approvals), + )) => { + let num_peers_we_wanted = peers.len(); + let peers_we_can_send: Vec = + peers.into_iter().filter(|peer_id| self.can_send(*peer_id)).collect(); + gum::info!( + target: LOG_TARGET, + "Malus message intercepted num_peers_we_can_send {:} num_peers_we_wanted_to_send {:} known_peers {:} peers {:} approval {:?}", + peers_we_can_send.len(), + num_peers_we_wanted, + self.known_peer_ids.len(), + peers_we_can_send + .clone() + .into_iter() + .fold(String::new(), |accumulator, peer| { + format!("{}:{}", accumulator, peer) + }), + approvals + ); + Some(FromOrchestra::Communication { + msg: NetworkBridgeTxMessage::SendValidationMessage( + peers_we_can_send, + message, + ), + }) + }, + _ => Some(FromOrchestra::Communication { + msg: NetworkBridgeTxMessage::SendValidationMessage(peers, message), + }), + } + }, + msg => Some(msg), + } + } + + fn intercept_outgoing( + &self, + msg: overseer::NetworkBridgeTxOutgoingMessages, + ) -> Option { + Some(msg) + } +} + +#[cfg(test)] +mod tests { + use polkadot_node_network_protocol::PeerId; + use polkadot_node_subsystem::Spawner; + + use super::ApprovalsDistributionInterceptor; + + #[derive(Debug, Clone)] + struct DummySpanner; + impl Spawner for DummySpanner { + fn spawn_blocking( + &self, + _name: &'static str, + _group: Option<&'static str>, + _future: futures::future::BoxFuture<'static, ()>, + ) { + todo!() + } + + fn spawn( + &self, + _name: &'static str, + _group: Option<&'static str>, + _future: futures::future::BoxFuture<'static, ()>, + ) { + todo!() + } + } + + #[test] + fn test_can_send() { + let test_topologies: Vec> = vec![ + (1..20).map(|_| PeerId::random()).collect(), + (1..21).map(|_| PeerId::random()).collect(), + ]; + + for peer_ids in test_topologies { + let withold_approval_distribution = ApprovalsDistributionInterceptor { + spawner: DummySpanner {}, + known_peer_ids: peer_ids.clone(), + assigned_network_group: 1, + num_network_groups: 4, + }; + + assert!(!withold_approval_distribution.can_send(peer_ids[0])); + assert!(!withold_approval_distribution.can_send(peer_ids[4])); + + assert!(withold_approval_distribution.can_send(peer_ids[5])); + assert!(withold_approval_distribution.can_send(peer_ids[9])); + + assert!(withold_approval_distribution.can_send(peer_ids[10])); + assert!(withold_approval_distribution.can_send(peer_ids[14])); + + assert!(!withold_approval_distribution.can_send(peer_ids[15])); + assert!(!withold_approval_distribution.can_send(peer_ids[18])); + + let withold_approval_distribution = ApprovalsDistributionInterceptor { + spawner: DummySpanner {}, + known_peer_ids: peer_ids.clone(), + assigned_network_group: 3, + num_network_groups: 4, + }; + + assert!(withold_approval_distribution.can_send(peer_ids[0])); + assert!(withold_approval_distribution.can_send(peer_ids[4])); + + assert!(!withold_approval_distribution.can_send(peer_ids[5])); + assert!(!withold_approval_distribution.can_send(peer_ids[9])); + + assert!(!withold_approval_distribution.can_send(peer_ids[10])); + assert!(!withold_approval_distribution.can_send(peer_ids[14])); + + assert!(withold_approval_distribution.can_send(peer_ids[15])); + assert!(withold_approval_distribution.can_send(peer_ids[18])); + + let withold_approval_distribution = ApprovalsDistributionInterceptor { + spawner: DummySpanner {}, + known_peer_ids: peer_ids.clone(), + assigned_network_group: 0, + num_network_groups: 4, + }; + + assert!(withold_approval_distribution.can_send(peer_ids[0])); + assert!(withold_approval_distribution.can_send(peer_ids[4])); + + assert!(withold_approval_distribution.can_send(peer_ids[5])); + assert!(withold_approval_distribution.can_send(peer_ids[9])); + + assert!(!withold_approval_distribution.can_send(peer_ids[10])); + assert!(!withold_approval_distribution.can_send(peer_ids[14])); + + assert!(!withold_approval_distribution.can_send(peer_ids[15])); + assert!(!withold_approval_distribution.can_send(peer_ids[18])); + } + } +} diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index 79aa090a140f..0e625ff7930b 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -48,6 +48,20 @@ use std::{ time::Duration, }; +// TODO: Disable will be removed in the final version and will be replaced with a runtime configuration +// const ACTIVATION_BLOCK_NUMBER: u32 = 10; + +fn disable_gossiping(_block: u32) -> bool { + // if block == ACTIVATION_BLOCK_NUMBER { + // gum::info!( + // target: LOG_TARGET, + // "Disable gossiping for nodes" + // ) + // } + // block > ACTIVATION_BLOCK_NUMBER + return true +} + use self::metrics::Metrics; mod metrics; @@ -210,34 +224,30 @@ struct Knowledge { // When there is no entry, this means the message is unknown // When there is an entry with `MessageKind::Assignment`, the assignment is known. // When there is an entry with `MessageKind::Approval`, the assignment and approval are known. - known_messages: HashMap, + known_messages: HashMap>, } impl Knowledge { fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool { - match (kind, self.known_messages.get(message)) { - (_, None) => false, - (MessageKind::Assignment, Some(_)) => true, - (MessageKind::Approval, Some(MessageKind::Assignment)) => false, - (MessageKind::Approval, Some(MessageKind::Approval)) => true, - } + self.known_messages + .get(message) + .map(|messages| messages.contains(&kind)) + .unwrap_or(false) } fn insert(&mut self, message: MessageSubject, kind: MessageKind) -> bool { match self.known_messages.entry(message) { hash_map::Entry::Vacant(vacant) => { - vacant.insert(kind); + vacant.insert(vec![kind]); true }, - hash_map::Entry::Occupied(mut occupied) => match (*occupied.get(), kind) { - (MessageKind::Assignment, MessageKind::Assignment) => false, - (MessageKind::Approval, MessageKind::Approval) => false, - (MessageKind::Approval, MessageKind::Assignment) => false, - (MessageKind::Assignment, MessageKind::Approval) => { - *occupied.get_mut() = MessageKind::Approval; + hash_map::Entry::Occupied(mut occupied) => + if !occupied.get().contains(&kind) { + occupied.get_mut().push(kind); true + } else { + false }, - }, } } } @@ -278,13 +288,15 @@ struct BlockEntry { enum ApprovalState { Assigned(AssignmentCert), Approved(AssignmentCert, ValidatorSignature), + UnassignedApproval(IndirectSignedApprovalVote, MessageSource), } impl ApprovalState { - fn assignment_cert(&self) -> &AssignmentCert { + fn assignment_cert(&self) -> Option<&AssignmentCert> { match *self { - ApprovalState::Assigned(ref cert) => cert, - ApprovalState::Approved(ref cert, _) => cert, + ApprovalState::Assigned(ref cert) => Some(cert), + ApprovalState::Approved(ref cert, _) => Some(cert), + ApprovalState::UnassignedApproval(_, _) => None, } } @@ -292,6 +304,7 @@ impl ApprovalState { match *self { ApprovalState::Assigned(_) => None, ApprovalState::Approved(_, ref sig) => Some(sig.clone()), + ApprovalState::UnassignedApproval(_, _) => None, } } } @@ -301,7 +314,10 @@ impl ApprovalState { // assignments preceding approvals in all cases. #[derive(Debug)] struct MessageState { - required_routing: RequiredRouting, + // Required routing for an approval + required_routing_assignment: RequiredRouting, + // Require routing for an assignment + required_routing_approval: RequiredRouting, local: bool, random_routing: RandomRouting, approval_state: ApprovalState, @@ -363,6 +379,7 @@ impl State { topology.session, topology.topology, topology.local_index, + metrics, ) .await; }, @@ -451,6 +468,8 @@ impl State { for (peer_id, view) in self.peer_views.iter() { let intersection = view.iter().filter(|h| new_hashes.contains(h)); let view_intersection = View::new(intersection.cloned(), view.finalized_number); + let should_trigger_aggression = + self.aggression_config.should_trigger_aggression(self.approval_checking_lag); Self::unify_with_peer( sender, metrics, @@ -460,6 +479,7 @@ impl State { *peer_id, view_intersection, rng, + should_trigger_aggression, ) .await; } @@ -529,6 +549,7 @@ impl State { session: SessionIndex, topology: SessionGridTopology, local_index: Option, + metrics: &Metrics, ) { if local_index.is_none() { // this subsystem only matters to validators. @@ -550,6 +571,7 @@ impl State { .required_routing_by_index(*validator_index, local); } }, + metrics, ) .await; } @@ -678,7 +700,8 @@ impl State { } }); } - + let should_trigger_aggression = + self.aggression_config.should_trigger_aggression(self.approval_checking_lag); Self::unify_with_peer( ctx.sender(), metrics, @@ -688,6 +711,7 @@ impl State { peer_id, view, rng, + should_trigger_aggression, ) .await; } @@ -774,7 +798,6 @@ impl State { return }, }; - // compute metadata on the assignment. let message_subject = MessageSubject(block_hash, claimed_candidate_index, validator_index); let message_kind = MessageKind::Assignment; @@ -872,7 +895,7 @@ impl State { BENEFIT_VALID_MESSAGE_FIRST, ) .await; - entry.knowledge.known_messages.insert(message_subject.clone(), message_kind); + entry.knowledge.insert(message_subject.clone(), message_kind); if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { peer_knowledge.received.insert(message_subject.clone(), message_kind); } @@ -950,32 +973,47 @@ impl State { let topology = self.topologies.get_topology(entry.session); let local = source == MessageSource::Local; - let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| { + let required_routing_assignment = topology.map_or(RequiredRouting::PendingTopology, |t| { t.local_grid_neighbors().required_routing_by_index(validator_index, local) }); - let message_state = match entry.candidates.get_mut(claimed_candidate_index as usize) { - Some(candidate_entry) => { - // set the approval state for validator_index to Assigned - // unless the approval state is set already - candidate_entry.messages.entry(validator_index).or_insert_with(|| MessageState { - required_routing, - local, - random_routing: Default::default(), - approval_state: ApprovalState::Assigned(assignment.cert.clone()), - }) - }, - None => { - gum::warn!( - target: LOG_TARGET, - hash = ?block_hash, - ?claimed_candidate_index, - "Expected a candidate entry on import_and_circulate_assignment", - ); + let (message_state, cached_approval_vote) = + match entry.candidates.get_mut(claimed_candidate_index as usize) { + Some(candidate_entry) => { + // We might have received the approval before the assignment, so save it for processing after the assignment + let cached_approval_vote = candidate_entry.messages.remove(&validator_index); + ( + candidate_entry.messages.entry(validator_index).or_insert_with(|| { + MessageState { + required_routing_assignment, + required_routing_approval: if disable_gossiping(entry.number) { + if local { + RequiredRouting::All + } else { + RequiredRouting::None + } + } else { + required_routing_assignment + }, + local, + random_routing: Default::default(), + approval_state: ApprovalState::Assigned(assignment.cert.clone()), + } + }), + cached_approval_vote, + ) + }, + None => { + gum::warn!( + target: LOG_TARGET, + hash = ?block_hash, + ?claimed_candidate_index, + "Expected a candidate entry on import_and_circulate_assignment", + ); - return - }, - }; + return + }, + }; // Dispatch the message to all peers in the routing set which // know the block. @@ -994,7 +1032,7 @@ impl State { if let Some(true) = topology .as_ref() - .map(|t| t.local_grid_neighbors().route_to_peer(required_routing, peer)) + .map(|t| t.local_grid_neighbors().route_to_peer(required_routing_assignment, peer)) { return true } @@ -1039,6 +1077,21 @@ impl State { )) .await; } + + if let Some(MessageState { + approval_state: ApprovalState::UnassignedApproval(vote, source), + .. + }) = cached_approval_vote + { + gum::debug!( + target: LOG_TARGET, + ?message_subject, + "Delayed approval processed" + ); + metrics.on_delayed_approval_processed(); + + self.import_and_circulate_approval(ctx, metrics, source, vote).await + } } async fn import_and_circulate_approval( @@ -1090,7 +1143,48 @@ impl State { let message_kind = MessageKind::Approval; if let Some(peer_id) = source.peer_id() { + let sender_matches_validator_index = self + .topologies + .get_topology(entry.session) + .map(|topology| { + topology.get().peer_id_matches_validator_index(validator_index, peer_id) + }) + .unwrap_or(false); + if !entry.knowledge.contains(&message_subject, MessageKind::Assignment) { + metrics.on_unassigned_approval(); + + match entry.candidates.get_mut(candidate_index as usize) { + // Approvals might arrive sooner than their corresponding assignment because we are sending them directly + // to all peers instead of relaying on them being gossiped, so we need to save them untill the assignment + // arrives. + // We could also receive gossiped assignments in the case when aggression is triggered, but in that + // case we do not care about saving them because they should arrive before their corresponding assignments, + // since they are sent on the same route. + Some(candidate_entry) + if sender_matches_validator_index && + !candidate_entry.messages.contains_key(&validator_index) => + { + candidate_entry.messages.entry(validator_index).or_insert_with(|| { + MessageState { + required_routing_assignment: RequiredRouting::None, + required_routing_approval: RequiredRouting::None, + local: false, + random_routing: Default::default(), + approval_state: ApprovalState::UnassignedApproval(vote, source), + } + }); + gum::debug!( + target: LOG_TARGET, + ?peer_id, + ?message_subject, + "Saving approval to process later on", + ); + return + }, + _ => {}, + }; + gum::debug!( target: LOG_TARGET, ?peer_id, @@ -1163,11 +1257,23 @@ impl State { } return } - + if !sender_matches_validator_index { + gum::debug!( + target: LOG_TARGET, + "Received gossiped approval topology some {:} peer_id {:}", + self.topologies.get_topology(entry.session).is_some(), + source.peer_id().map(|peer_id| peer_id.to_string()).unwrap_or("unknown".into()) + ); + metrics.on_gossipped_received_approval(); + } let (tx, rx) = oneshot::channel(); - ctx.send_message(ApprovalVotingMessage::CheckAndImportApproval(vote.clone(), tx)) - .await; + ctx.send_message(ApprovalVotingMessage::CheckAndImportApproval( + vote.clone(), + tx, + sender_matches_validator_index, + )) + .await; let timer = metrics.time_awaiting_approval_voting(); let result = match rx.await { @@ -1238,17 +1344,19 @@ impl State { // Invariant: to our knowledge, none of the peers except for the `source` know about the approval. metrics.on_approval_imported(); - - let required_routing = match entry.candidates.get_mut(candidate_index as usize) { + let should_trigger_aggression = + self.aggression_config.should_trigger_aggression(self.approval_checking_lag); + let required_routing_approval = match entry.candidates.get_mut(candidate_index as usize) { Some(candidate_entry) => { // set the approval state for validator_index to Approved // it should be in assigned state already match candidate_entry.messages.remove(&validator_index) { Some(MessageState { approval_state: ApprovalState::Assigned(cert), - required_routing, + required_routing_assignment, local, random_routing, + required_routing_approval, }) => { candidate_entry.messages.insert( validator_index, @@ -1257,13 +1365,28 @@ impl State { cert, vote.signature.clone(), ), - required_routing, + required_routing_assignment, local, random_routing, + required_routing_approval, }, ); - - required_routing + // When aggression is enabled gossip the approvals we received from peers, so that we reach finality. + if required_routing_approval == RequiredRouting::None && + should_trigger_aggression + { + self.topologies + .get_topology(entry.session) + .zip(source.peer_id()) + .map(|(topology, peer_id)| { + topology + .local_grid_neighbors() + .required_routing_by_peer_id(peer_id, local) + }) + .unwrap_or(RequiredRouting::None) + } else { + required_routing_approval + } }, Some(_) => { unreachable!( @@ -1304,12 +1427,14 @@ impl State { let source_peer = source.peer_id(); let message_subject = &message_subject; + let block_number = entry.number; let peer_filter = move |peer, knowledge: &PeerKnowledge| { if Some(peer) == source_peer.as_ref() { return false } - // Here we're leaning on a few behaviors of assignment propagation: + // When approvals are gossiped(l1 agression enabled) we are leaning on a few behaviors of + // assignment propagation: // 1. At this point, the only peer we're aware of which has the approval // message is the source peer. // 2. We have sent the assignment message to every peer in the required routing @@ -1318,9 +1443,12 @@ impl State { // the assignment to all aware peers in the required routing _except_ the original // source of the assignment. Hence the `in_topology_check`. // 3. Any randomly selected peers have been sent the assignment already. - let in_topology = topology - .map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer)); - in_topology || knowledge.sent.contains(message_subject, MessageKind::Assignment) + let in_topology = topology.map_or(false, |t| { + t.local_grid_neighbors().route_to_peer(required_routing_approval, peer) + }); + in_topology || + (knowledge.sent.contains(message_subject, MessageKind::Assignment) && + (should_trigger_aggression || !disable_gossiping(block_number))) }; let peers = entry @@ -1341,6 +1469,13 @@ impl State { if !peers.is_empty() { let approvals = vec![vote]; + if source.peer_id().is_some() { + gum::debug!( + target: LOG_TARGET, + "Gossiped approval sent num_peers {:} approvals {:?}", peers.len(), approvals, + ); + metrics.on_gossipped_sent_approval(); + } gum::trace!( target: LOG_TARGET, ?block_hash, @@ -1403,7 +1538,8 @@ impl State { candidate_entry.messages.iter().filter_map(|(validator_index, message_state)| { match &message_state.approval_state { ApprovalState::Approved(_, sig) => Some((*validator_index, sig.clone())), - ApprovalState::Assigned(_) => None, + ApprovalState::Assigned(_) | ApprovalState::UnassignedApproval(_, _) => + None, } }); all_sigs.extend(sigs); @@ -1420,6 +1556,7 @@ impl State { peer_id: PeerId, view: View, rng: &mut (impl CryptoRng + Rng), + should_trigger_aggression: bool, ) { metrics.on_unify_with_peer(); let _timer = metrics.time_unify_with_peer(); @@ -1454,39 +1591,36 @@ impl State { }) { // Propagate the message to all peers in the required routing set OR // randomly sample peers. - { - let random_routing = &mut message_state.random_routing; - let required_routing = message_state.required_routing; - let rng = &mut *rng; - let mut peer_filter = move |peer_id| { - let in_topology = topology.as_ref().map_or(false, |t| { - t.local_grid_neighbors().route_to_peer(required_routing, peer_id) - }); - in_topology || { - let route_random = random_routing.sample(total_peers, rng); - if route_random { - random_routing.inc_sent(); - } - - route_random + let random_routing = &mut message_state.random_routing; + let required_routing_assignment = message_state.required_routing_assignment; + let rng = &mut *rng; + let mut peer_filter_assignment = move |peer_id| { + let in_topology = topology.as_ref().map_or(false, |t| { + t.local_grid_neighbors() + .route_to_peer(required_routing_assignment, peer_id) + }); + in_topology || { + let route_random = random_routing.sample(total_peers, rng); + if route_random { + random_routing.inc_sent(); } - }; - - if !peer_filter(&peer_id) { - continue + route_random } - } + }; let message_subject = MessageSubject(block, candidate_index, *validator); - let assignment_message = ( - IndirectAssignmentCert { - block_hash: block, - validator: *validator, - cert: message_state.approval_state.assignment_cert().clone(), - }, - candidate_index, - ); + let assignment_message = + message_state.approval_state.assignment_cert().map(|assignmentcert| { + ( + IndirectAssignmentCert { + block_hash: block, + validator: *validator, + cert: assignmentcert.clone(), + }, + candidate_index, + ) + }); let approval_message = message_state.approval_state.approval_signature().map(|signature| { @@ -1498,19 +1632,48 @@ impl State { } }); - if !peer_knowledge.contains(&message_subject, MessageKind::Assignment) { - peer_knowledge - .sent - .insert(message_subject.clone(), MessageKind::Assignment); - assignments_to_send.push(assignment_message); + let mut assignment_sent = false; + if peer_filter_assignment(&peer_id) { + if let Some(assignment_message) = assignment_message { + if !peer_knowledge.contains(&message_subject, MessageKind::Assignment) { + peer_knowledge + .sent + .insert(message_subject.clone(), MessageKind::Assignment); + assignments_to_send.push(assignment_message); + assignment_sent = true; + } + } } + let block_number = entry.number; + let peer_filter_approval = move |peer_id, required_routing, local| { + let in_topology = topology.as_ref().map_or(false, |t| { + t.local_grid_neighbors().route_to_peer(required_routing, peer_id) + }); - if let Some(approval_message) = approval_message { - if !peer_knowledge.contains(&message_subject, MessageKind::Approval) { - peer_knowledge - .sent - .insert(message_subject.clone(), MessageKind::Approval); - approvals_to_send.push(approval_message); + in_topology || + local || ((should_trigger_aggression || !disable_gossiping(block_number)) && + assignment_sent) + }; + + if peer_filter_approval( + &peer_id, + message_state.required_routing_approval, + message_state.local, + ) { + if let Some(approval_message) = approval_message { + if !peer_knowledge.contains(&message_subject, MessageKind::Approval) { + if !message_state.local { + gum::debug!( + target: LOG_TARGET, + "Approval gossiped in unify with peer", + ); + metrics.on_gossipped_sent_approval(); + } + peer_knowledge + .sent + .insert(message_subject.clone(), MessageKind::Approval); + approvals_to_send.push(approval_message); + } } } } @@ -1596,6 +1759,7 @@ impl State { } }, |_, _, _| {}, + metrics, ) .await; @@ -1640,6 +1804,7 @@ impl State { } } }, + metrics, ) .await; } @@ -1664,6 +1829,7 @@ async fn adjust_required_routing_and_propagate bool, RoutingModifier: Fn(&mut RequiredRouting, bool, &ValidatorIndex), @@ -1685,9 +1851,15 @@ async fn adjust_required_routing_and_propagate); struct MetricsInner { assignments_imported_total: prometheus::Counter, approvals_imported_total: prometheus::Counter, + + unassigned_approval_total: prometheus::Counter, + delayed_approvals_processed_total: prometheus::Counter, + gossipped_approval_received_total: prometheus::Counter, + gossipped_approval_sent_total: prometheus::Counter, + unified_with_peer_total: prometheus::Counter, aggression_l1_messages_total: prometheus::Counter, aggression_l2_messages_total: prometheus::Counter, @@ -46,6 +52,30 @@ impl Metrics { } } + pub(crate) fn on_unassigned_approval(&self) { + if let Some(metrics) = &self.0 { + metrics.unassigned_approval_total.inc(); + } + } + + pub(crate) fn on_gossipped_received_approval(&self) { + if let Some(metrics) = &self.0 { + metrics.gossipped_approval_received_total.inc(); + } + } + + pub(crate) fn on_gossipped_sent_approval(&self) { + if let Some(metrics) = &self.0 { + metrics.gossipped_approval_sent_total.inc(); + } + } + + pub(crate) fn on_delayed_approval_processed(&self) { + if let Some(metrics) = &self.0 { + metrics.delayed_approvals_processed_total.inc(); + } + } + pub(crate) fn on_unify_with_peer(&self) { if let Some(metrics) = &self.0 { metrics.unified_with_peer_total.inc(); @@ -95,6 +125,34 @@ impl MetricsTrait for Metrics { )?, registry, )?, + unassigned_approval_total: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_unassigned_approval_total", + "Number of approvals received for which we didn't received the approval yet.", + )?, + registry, + )?, + delayed_approvals_processed_total: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_delayed_approvals_processedtotal", + "Number of approvals processed with delay after we received the assignment.", + )?, + registry, + )?, + gossipped_approval_received_total: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_gossipped_approvals_receivedtotal", + "Number of approvals processed that were gossiped", + )?, + registry, + )?, + gossipped_approval_sent_total: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_gossipped_approvals_senttotal", + "Number of approvals processed that were gossiped", + )?, + registry, + )?, approvals_imported_total: prometheus::register( prometheus::Counter::new( "polkadot_parachain_approvals_imported_total", diff --git a/node/network/approval-distribution/src/tests.rs b/node/network/approval-distribution/src/tests.rs index 979f0ada4ee6..9bc8366c2fa7 100644 --- a/node/network/approval-distribution/src/tests.rs +++ b/node/network/approval-distribution/src/tests.rs @@ -689,26 +689,15 @@ fn import_approval_happy_path() { AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval( vote, tx, + _, )) => { assert_eq!(vote, approval); tx.send(ApprovalCheckResult::Accepted).unwrap(); } ); - + // We expect only reputation changes, because approvals from peers are not gossiped unless agression is enabled. expect_reputation_change(overseer, &peer_b, BENEFIT_VALID_MESSAGE_FIRST).await; - assert_matches!( - overseer_recv(overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( - peers, - Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( - protocol_v1::ApprovalDistributionMessage::Approvals(approvals) - )) - )) => { - assert_eq!(peers.len(), 1); - assert_eq!(approvals.len(), 1); - } - ); virtual_overseer }); } @@ -783,6 +772,7 @@ fn import_approval_bad() { AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval( vote, tx, + false, )) => { assert_eq!(vote, approval); tx.send(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownBlock(hash))).unwrap(); @@ -1094,6 +1084,7 @@ fn import_remotely_then_locally() { AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval( vote, tx, + false )) => { assert_eq!(vote, approval); tx.send(ApprovalCheckResult::Accepted).unwrap(); @@ -1333,7 +1324,7 @@ fn propagates_locally_generated_assignment_to_both_dimensions() { let assignments = vec![(cert.clone(), candidate_index)]; let approvals = vec![approval.clone()]; - let assignment_sent_peers = assert_matches!( + let _assignment_sent_peers = assert_matches!( overseer_recv(overseer).await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( sent_peers, @@ -1363,7 +1354,10 @@ fn propagates_locally_generated_assignment_to_both_dimensions() { )) )) => { // Random sampling is reused from the assignment. - assert_eq!(sent_peers, assignment_sent_peers); + for peer in &peers { + assert!(sent_peers.contains(&peer.0)); + } + assert_eq!(sent_peers.len(), peers.len()); assert_eq!(sent_approvals, approvals); } ); @@ -1519,12 +1513,11 @@ fn propagates_to_required_after_connect() { let hash = Hash::repeat_byte(0xAA); let peers = make_peers_and_authority_ids(100); + let omitted = [0, 10, 50, 51]; let _ = test_harness(State::default(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; - let omitted = [0, 10, 50, 51]; - // Connect all peers except omitted. for (i, (peer, _)) in peers.iter().enumerate() { if !omitted.contains(&i) { @@ -1581,7 +1574,7 @@ fn propagates_to_required_after_connect() { let assignments = vec![(cert.clone(), candidate_index)]; let approvals = vec![approval.clone()]; - let assignment_sent_peers = assert_matches!( + let _assignment_sent_peers = assert_matches!( overseer_recv(overseer).await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( sent_peers, @@ -1610,8 +1603,10 @@ fn propagates_to_required_after_connect() { protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) )) )) => { - // Random sampling is reused from the assignment. - assert_eq!(sent_peers, assignment_sent_peers); + // Approvals should be sent to all peers. + for (_, peer) in peers.iter().enumerate().filter(|(index, _)| !omitted.contains(index)){ + assert!(sent_peers.contains(&peer.0)); + } assert_eq!(sent_approvals, approvals); } ); @@ -1707,7 +1702,12 @@ fn sends_to_more_peers_after_getting_topology() { let approvals = vec![approval.clone()]; let mut expected_indices = vec![0, 10, 20, 30, 50, 51, 52, 53]; - let assignment_sent_peers = assert_matches!( + + // Approvals are sent to all peers + let mut expected_indices_approvals: Vec = (0..peers.len()).collect(); + + // We sent only assignment when we don't a topology set yet. + let _assignment_sent_peers = assert_matches!( overseer_recv(overseer).await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( sent_peers, @@ -1723,26 +1723,13 @@ fn sends_to_more_peers_after_getting_topology() { // Remove them from the expected indices so we don't expect // them to get the messages again after the assignment. expected_indices.retain(|&i2| i2 != i); + } assert_eq!(sent_assignments, assignments); sent_peers } ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( - sent_peers, - Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( - protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) - )) - )) => { - // Random sampling is reused from the assignment. - assert_eq!(sent_peers, assignment_sent_peers); - assert_eq!(sent_approvals, approvals); - } - ); - // Set up a gossip topology. setup_gossip_topology( overseer, @@ -1751,7 +1738,6 @@ fn sends_to_more_peers_after_getting_topology() { .await; let mut expected_indices_assignments = expected_indices.clone(); - let mut expected_indices_approvals = expected_indices.clone(); for _ in 0..expected_indices_assignments.len() { assert_matches!( @@ -1790,7 +1776,6 @@ fn sends_to_more_peers_after_getting_topology() { let pos = expected_indices_approvals.iter() .position(|i| &peers[*i].0 == &sent_peers[0]) .unwrap(); - expected_indices_approvals.remove(pos); } ); @@ -1879,14 +1864,18 @@ fn originator_aggression_l1() { } ); - assert_matches!( + let prev_sent_approvals = assert_matches!( overseer_recv(overseer).await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( - _, + sent_peers, Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(_) )) - )) => { } + )) => { + sent_peers.into_iter() + .filter_map(|sp| peers.iter().position(|p| &p.0 == &sp)) + .collect::>() + } ); // Add blocks until aggression L1 is triggered. @@ -1917,6 +1906,10 @@ fn originator_aggression_l1() { let unsent_indices = (0..peers.len()).filter(|i| !prev_sent_indices.contains(&i)).collect::>(); + let unsent_indices_approvals = (0..peers.len()) + .filter(|i| !prev_sent_approvals.contains(&i)) + .collect::>(); + for _ in 0..unsent_indices.len() { assert_matches!( overseer_recv(overseer).await, @@ -1937,7 +1930,7 @@ fn originator_aggression_l1() { ); } - for _ in 0..unsent_indices.len() { + for _ in 0..unsent_indices_approvals.len() { assert_matches!( overseer_recv(overseer).await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( diff --git a/node/network/protocol/src/grid_topology.rs b/node/network/protocol/src/grid_topology.rs index 1b356f67617b..3469c0f216e8 100644 --- a/node/network/protocol/src/grid_topology.rs +++ b/node/network/protocol/src/grid_topology.rs @@ -108,6 +108,42 @@ impl SessionGridTopology { Some(grid_subset) } + + /// Gets the list of known peer Ids for a given validator index + /// + /// Returns `None` if the validator index is out of bound + pub fn get_known_peer_ids_by_validator_index( + &self, + validator_index: ValidatorIndex, + ) -> Option<&Vec> { + let peer_index = self.shuffled_indices.get(validator_index.0 as usize)?; + + self.canonical_shuffling + .get(*peer_index) + .map(|topology_peer_info| &topology_peer_info.peer_ids) + } + + /// Checks if the passed validator_index matches the peer_id + /// + /// Returns true if they match and false otherwise + pub fn peer_id_matches_validator_index( + &self, + validator_index: ValidatorIndex, + peer_id: PeerId, + ) -> bool { + self.get_known_peer_ids_by_validator_index(validator_index) + .map(|known_peer_ids| { + let res = known_peer_ids.contains(&peer_id); + if !res { + gum::debug!( + target: LOG_TARGET, + "Received gossiped approval peer id {:?} known_peer_ids {:?}", peer_id, known_peer_ids + ); + } + res + }) + .unwrap_or(false) + } } struct MatrixNeighbors { diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index afc0ce320610..4d5f4d2c91b0 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -471,6 +471,7 @@ where let lag = initial_leaf_number.saturating_sub(subchain_number); self.metrics.note_approval_checking_finality_lag(lag); + gum::debug!(target: LOG_TARGET, ?subchain_head, "Approval checking lag {:}", lag); // Messages sent to `approval-distrbution` are known to have high `ToF`, we need to spawn a task for sending // the message to not block here and delay finality. diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 8419763789dc..6294670f8db7 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -827,8 +827,13 @@ pub enum ApprovalVotingMessage { /// Check if the approval vote is valid and can be accepted by our view of the /// protocol. /// + /// * `IndirectSignedApprovalVote` - The vote to be imported + /// * `oneshot::Sender` - The channel used for sending the reply back + /// * `bool` - If the sender of the messages is the originator of it, when false it means + /// the message has been gossipped. + /// /// Should not be sent unless the block hash within the indirect vote is known. - CheckAndImportApproval(IndirectSignedApprovalVote, oneshot::Sender), + CheckAndImportApproval(IndirectSignedApprovalVote, oneshot::Sender, bool), /// Returns the highest possible ancestor hash of the provided block hash which is /// acceptable to vote on finality for. /// The `BlockNumber` provided is the number of the block's ancestor which is the diff --git a/zombienet_tests/functional/0005-parachains-aggression-enabled.toml b/zombienet_tests/functional/0005-parachains-aggression-enabled.toml new file mode 100644 index 000000000000..1d94465d00d3 --- /dev/null +++ b/zombienet_tests/functional/0005-parachains-aggression-enabled.toml @@ -0,0 +1,130 @@ +[settings] +timeout = 1000 + +[relaychain.genesis.runtime.runtime_genesis_config.configuration.config] + needed_approvals = 8 + +[relaychain] +default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}" +chain = "rococo-local" +chain_spec_command = "polkadot build-spec --chain rococo-local --disable-default-bootnode" + +[relaychain.default_resources] +limits = { memory = "4G", cpu = "2" } +requests = { memory = "2G", cpu = "1" } + + [[relaychain.nodes]] + name = "alice" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=0" + args = [ "--alice", "-lparachain=debug,runtime=debug" ] + + [[relaychain.nodes]] + name = "bob" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=0" + args = [ "--bob", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "charlie" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=0" + args = [ "--charlie", "-lparachain=debug,runtime=debug" ] + + [[relaychain.nodes]] + name = "dave" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=0" + args = [ "--dave", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "ferdie" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=1" + args = [ "--ferdie", "-lparachain=debug,runtime=debug" ] + + [[relaychain.nodes]] + name = "eve" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=1" + args = [ "--eve", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "one" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=1" + args = [ "--one", "-lparachain=debug,runtime=debug" ] + + [[relaychain.nodes]] + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=1" + name = "two" + args = [ "--two", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode8" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=2" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode9" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=2" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode10" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=2" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode11" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=2" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode12" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=3" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode13" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=3" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode14" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=3" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode15" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=3" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode16" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=4" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode17" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=4" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode18" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=4" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + + [[relaychain.nodes]] + name = "newnode19" + command = "malus withold-approvals-distribution --num-network-groups=5 --assigned-network-group=4" + args = [ "--validator", "-lparachain=debug,runtime=debug"] + +[[parachains]] +id = 2000 +addToGenesis = true +genesis_state_generator = "undying-collator export-genesis-state --pov-size=100000 --pvf-complexity=1" + + [parachains.collator] + name = "collator01" + image = "{{COL_IMAGE}}" + command = "undying-collator" + args = ["-lparachain=debug", "--pov-size=100000", "--pvf-complexity=1", "--parachain-id=2000"] + +[types.Header] +number = "u64" +parent_hash = "Hash" +post_state = "Hash" \ No newline at end of file diff --git a/zombienet_tests/functional/0005-parachains-aggression-enabled.zndsl b/zombienet_tests/functional/0005-parachains-aggression-enabled.zndsl new file mode 100644 index 000000000000..c8c7956d69c6 --- /dev/null +++ b/zombienet_tests/functional/0005-parachains-aggression-enabled.zndsl @@ -0,0 +1,21 @@ +Description: PVF preparation & execution time +Network: ./0005-parachains-aggression-enabled.toml +Creds: config + +# Check authority status. +alice: reports node_roles is 4 +bob: reports node_roles is 4 +charlie: reports node_roles is 4 +dave: reports node_roles is 4 +eve: reports node_roles is 4 +ferdie: reports node_roles is 4 +one: reports node_roles is 4 +two: reports node_roles is 4 + +# Ensure parachains are registered. +alice: parachain 2000 is registered within 60 seconds + + +# Ensure parachains made progress. +# alice: parachain 2000 block height is at least 30 within 600 seconds +alice: reports substrate_block_height{status="finalized"} is at least 40 within 400 seconds \ No newline at end of file