From e3b5192908341466818782280f74bafeba3b77d5 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 28 Mar 2024 14:47:40 +0200 Subject: [PATCH 01/16] refactor: move claim queue fetch utils to utils crate Signed-off-by: Andrei Sandu --- .../node/collation-generation/src/error.rs | 2 + polkadot/node/collation-generation/src/lib.rs | 54 ++++--------------- polkadot/node/subsystem-util/src/vstaging.rs | 46 +++++++++++++++- 3 files changed, 56 insertions(+), 46 deletions(-) diff --git a/polkadot/node/collation-generation/src/error.rs b/polkadot/node/collation-generation/src/error.rs index 852c50f30682..f04e3c4f20b4 100644 --- a/polkadot/node/collation-generation/src/error.rs +++ b/polkadot/node/collation-generation/src/error.rs @@ -27,6 +27,8 @@ pub enum Error { #[error(transparent)] Util(#[from] polkadot_node_subsystem_util::Error), #[error(transparent)] + UtilRuntime(#[from] polkadot_node_subsystem_util::runtime::Error), + #[error(transparent)] Erasure(#[from] polkadot_erasure_coding::Error), #[error("Parachain backing state not available in runtime.")] MissingParaBackingState, diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 3164f6078bc0..fb82871bb15a 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -38,25 +38,23 @@ use polkadot_node_primitives::{ SubmitCollationParams, }; use polkadot_node_subsystem::{ - messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiRequest}, + messages::{CollationGenerationMessage, CollatorProtocolMessage}, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem, SubsystemContext, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::{ - has_required_runtime, request_async_backing_params, request_availability_cores, - request_claim_queue, request_para_backing_state, request_persisted_validation_data, - request_validation_code, request_validation_code_hash, request_validators, + request_async_backing_params, request_availability_cores, request_para_backing_state, + request_persisted_validation_data, request_validation_code, request_validation_code_hash, + request_validators, + vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core}, }; use polkadot_primitives::{ collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt, CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption, - PersistedValidationData, ScheduledCore, ValidationCodeHash, + PersistedValidationData, ValidationCodeHash, }; use sp_core::crypto::Pair; -use std::{ - collections::{BTreeMap, VecDeque}, - sync::Arc, -}; +use std::sync::Arc; mod error; @@ -228,7 +226,9 @@ async fn handle_new_activations( let availability_cores = availability_cores??; let async_backing_params = async_backing_params?.ok(); let n_validators = validators??.len(); - let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?; + let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent) + .await + .map_err(crate::error::Error::UtilRuntime)?; // The loop bellow will fill in cores that the para is allowed to build on. let mut cores_to_build_on = Vec::new(); @@ -655,37 +655,3 @@ fn erasure_root( let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?; Ok(polkadot_erasure_coding::branches(&chunks).root()) } - -// Checks if the runtime supports `request_claim_queue` and executes it. Returns `Ok(None)` -// otherwise. Any [`RuntimeApiError`]s are bubbled up to the caller. -async fn fetch_claim_queue( - sender: &mut impl overseer::CollationGenerationSenderTrait, - relay_parent: Hash, -) -> crate::error::Result>>> { - if has_required_runtime( - sender, - relay_parent, - RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT, - ) - .await - { - let res = request_claim_queue(relay_parent, sender).await.await??; - Ok(Some(res)) - } else { - gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`"); - Ok(None) - } -} - -// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`. -// This function is supposed to be used in `handle_new_activations` hence the return type. -fn fetch_next_scheduled_on_core( - claim_queue: &BTreeMap>, - core_idx: CoreIndex, -) -> Option { - claim_queue - .get(&core_idx)? - .front() - .cloned() - .map(|para_id| ScheduledCore { para_id, collator: None }) -} diff --git a/polkadot/node/subsystem-util/src/vstaging.rs b/polkadot/node/subsystem-util/src/vstaging.rs index 3e807eff5387..206ca07644ef 100644 --- a/polkadot/node/subsystem-util/src/vstaging.rs +++ b/polkadot/node/subsystem-util/src/vstaging.rs @@ -19,14 +19,19 @@ //! This module is intended to contain common boiler plate code handling unreleased runtime API //! calls. +use std::collections::{BTreeMap, VecDeque}; + use polkadot_node_subsystem_types::messages::{RuntimeApiMessage, RuntimeApiRequest}; use polkadot_overseer::SubsystemSender; -use polkadot_primitives::{Hash, ValidatorIndex}; +use polkadot_primitives::{CoreIndex, Hash, Id as ParaId, ScheduledCore, ValidatorIndex}; -use crate::{has_required_runtime, request_disabled_validators, runtime}; +use crate::{has_required_runtime, request_claim_queue, request_disabled_validators, runtime}; const LOG_TARGET: &'static str = "parachain::subsystem-util-vstaging"; +/// A snapshot of the runtime claim queue at an arbitrare relay chain block. +pub type ClaimQueueSnapshot = BTreeMap>; + // TODO: https://github.com/paritytech/polkadot-sdk/issues/1940 /// Returns disabled validators list if the runtime supports it. Otherwise logs a debug messages and /// returns an empty vec. @@ -54,3 +59,40 @@ pub async fn get_disabled_validators_with_fallback, + relay_parent: Hash, +) -> Result, runtime::Error> { + if has_required_runtime( + sender, + relay_parent, + RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT, + ) + .await + { + let res = request_claim_queue(relay_parent, sender) + .await + .await + .map_err(runtime::Error::RuntimeRequestCanceled)??; + Ok(Some(res)) + } else { + gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`"); + Ok(None) + } +} + +/// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`. +pub fn fetch_next_scheduled_on_core( + claim_queue: &ClaimQueueSnapshot, + core_idx: CoreIndex, +) -> Option { + claim_queue + .get(&core_idx)? + .front() + .cloned() + .map(|para_id| ScheduledCore { para_id, collator: None }) +} From 01ceb3b902cecab75969db7ce3be5a83a93ca652 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 28 Mar 2024 14:49:46 +0200 Subject: [PATCH 02/16] cache all groups per para and remove para:group 1:1 assumption Signed-off-by: Andrei Sandu --- .../statement-distribution/src/error.rs | 3 + .../statement-distribution/src/v2/mod.rs | 107 ++++++++++++------ 2 files changed, 78 insertions(+), 32 deletions(-) diff --git a/polkadot/node/network/statement-distribution/src/error.rs b/polkadot/node/network/statement-distribution/src/error.rs index a712ab6da436..d7f52162fe23 100644 --- a/polkadot/node/network/statement-distribution/src/error.rs +++ b/polkadot/node/network/statement-distribution/src/error.rs @@ -81,6 +81,9 @@ pub enum Error { #[error("Fetching validator groups failed {0:?}")] FetchValidatorGroups(RuntimeApiError), + #[error("Fetching claim queue failed {0:?}")] + FetchClaimQueue(runtime::Error), + #[error("Attempted to share statement when not a validator or not assigned")] InvalidShare, diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index d782e37f10b4..23e6a52b2731 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -46,6 +46,7 @@ use polkadot_node_subsystem_util::{ backing_implicit_view::View as ImplicitView, reputation::ReputationAggregator, runtime::{request_min_backing_votes, ProspectiveParachainsMode}, + vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core, ClaimQueueSnapshot}, }; use polkadot_primitives::{ AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex, @@ -149,10 +150,9 @@ pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1); struct PerRelayParentState { local_validator: Option, statement_store: StatementStore, - availability_cores: Vec, - group_rotation_info: GroupRotationInfo, seconding_limit: usize, session: SessionIndex, + groups_per_para: HashMap>, } impl PerRelayParentState { @@ -693,15 +693,24 @@ pub(crate) async fn handle_active_leaves_update( } }); + let maybe_claim_queue = fetch_claim_queue(ctx.sender(), new_relay_parent) + .await + .map_err(JfyiError::FetchClaimQueue)?; + + let groups_per_para = determine_groups_per_para( + &availability_cores, + &group_rotation_info, + &maybe_claim_queue, + seconding_limit - 1, + ); state.per_relay_parent.insert( new_relay_parent, PerRelayParentState { local_validator, statement_store: StatementStore::new(&per_session.groups), - availability_cores, - group_rotation_info, seconding_limit, session: session_index, + groups_per_para, }, ); } @@ -2126,17 +2135,54 @@ async fn provide_candidate_to_grid( } } -fn group_for_para( +// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings. +fn determine_groups_per_para( availability_cores: &[CoreState], group_rotation_info: &GroupRotationInfo, - para_id: ParaId, -) -> Option { - // Note: this won't work well for on-demand parachains as it assumes that core assignments are - // fixed across blocks. - let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id)); + maybe_claim_queue: &Option, + max_candidate_depth: usize, +) -> HashMap> { + // Determine the core indices occupied by each para at the current relay parent. To support + // on-demand parachains we also consider the core indices at next block if core has a candidate + // pending availability. + let para_core_indices = availability_cores.iter().enumerate().filter_map(|(index, core)| { + match core { + CoreState::Scheduled(scheduled_core) => + Some((scheduled_core.para_id, CoreIndex(index as u32))), + CoreState::Occupied(occupied_core) => { + if max_candidate_depth >= 1 { + // Use claim queue if available, or fallback to `next_up_on_available` + let maybe_scheduled_core = match maybe_claim_queue { + Some(claim_queue) => { + // What's up next on this core ? + fetch_next_scheduled_on_core(claim_queue, CoreIndex(index as u32)) + }, + None => { + // Runtime doesn't support claim queue runtime api. Fallback to + // `next_up_on_available` + occupied_core.next_up_on_available.clone() + }, + }; + + maybe_scheduled_core + .filter(|scheduled_core| scheduled_core.para_id == occupied_core.para_id()) + .map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32))) + } else { + None + } + }, + CoreState::Free => None, + } + }); - core_index - .map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len())) + let mut groups_per_para = HashMap::new(); + // Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`. + for (para, core_index) in para_core_indices { + let group_index = group_rotation_info.group_for_core(core_index, availability_cores.len()); + groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index) + } + + groups_per_para } #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] @@ -2192,18 +2238,14 @@ async fn fragment_tree_update_inner( let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash); let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent); if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) { - let group_index = group_for_para( - &prs.availability_cores, - &prs.group_rotation_info, - receipt.descriptor().para_id, - ); - let per_session = state.per_session.get(&prs.session); - if let (Some(per_session), Some(group_index)) = (per_session, group_index) { + // TODO(maybe for sanity): perform an extra check on the candidate backing group + // index all allowed + if let Some(per_session) = per_session { send_backing_fresh_statements( ctx, candidate_hash, - group_index, + confirmed.group_index(), &receipt.descriptor().relay_parent, prs, confirmed, @@ -2311,13 +2353,14 @@ async fn handle_incoming_manifest_common<'a, Context>( Some(x) => x, }; - let expected_group = group_for_para( - &relay_parent_state.availability_cores, - &relay_parent_state.group_rotation_info, - para_id, - ); + let expected_groups = relay_parent_state.groups_per_para.get(¶_id); - if expected_group != Some(manifest_summary.claimed_group_index) { + if expected_groups.is_none() || + !expected_groups + .expect("checked is_some(); qed") + .iter() + .any(|g| g == &manifest_summary.claimed_group_index) + { modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await; return None } @@ -3037,13 +3080,13 @@ pub(crate) async fn handle_response( relay_parent_state.session, |v| per_session.session_info.validators.get(v).map(|x| x.clone()), |para, g_index| { - let expected_group = group_for_para( - &relay_parent_state.availability_cores, - &relay_parent_state.group_rotation_info, - para, - ); + let expected_groups = relay_parent_state.groups_per_para.get(¶); - Some(g_index) == expected_group + expected_groups.is_some() && + expected_groups + .expect("checked is_some(); qed") + .iter() + .any(|g| g == &g_index) }, disabled_mask, ); From 5f0872f02aab122b99a665eb01203cab002fb247 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 28 Mar 2024 16:35:42 +0200 Subject: [PATCH 03/16] typo Signed-off-by: Andrei Sandu --- polkadot/node/subsystem-util/src/vstaging.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/subsystem-util/src/vstaging.rs b/polkadot/node/subsystem-util/src/vstaging.rs index 206ca07644ef..096da40222ee 100644 --- a/polkadot/node/subsystem-util/src/vstaging.rs +++ b/polkadot/node/subsystem-util/src/vstaging.rs @@ -29,7 +29,7 @@ use crate::{has_required_runtime, request_claim_queue, request_disabled_validato const LOG_TARGET: &'static str = "parachain::subsystem-util-vstaging"; -/// A snapshot of the runtime claim queue at an arbitrare relay chain block. +/// A snapshot of the runtime claim queue at an arbitrary relay chain block. pub type ClaimQueueSnapshot = BTreeMap>; // TODO: https://github.com/paritytech/polkadot-sdk/issues/1940 From 034b61a3293cb45891b6941aaad98c7206f23e9a Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 28 Mar 2024 16:37:35 +0200 Subject: [PATCH 04/16] fix test build Signed-off-by: Andrei Sandu --- polkadot/node/collation-generation/src/tests.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs index 3cb3e61a35a1..150df19c1ee2 100644 --- a/polkadot/node/collation-generation/src/tests.rs +++ b/polkadot/node/collation-generation/src/tests.rs @@ -28,7 +28,7 @@ use polkadot_node_subsystem::{ ActivatedLeaf, }; use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle}; -use polkadot_node_subsystem_util::TimeoutExt; +use polkadot_node_subsystem_util::{vstaging::ClaimQueueSnapshot, TimeoutExt}; use polkadot_primitives::{ async_backing::{BackingState, CandidatePendingAvailability}, AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData, @@ -36,7 +36,10 @@ use polkadot_primitives::{ }; use rstest::rstest; use sp_keyring::sr25519::Keyring as Sr25519Keyring; -use std::pin::Pin; +use std::{ + collections::{BTreeMap, VecDeque}, + pin::Pin, +}; use test_helpers::{ dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator, make_candidate, }; @@ -617,7 +620,7 @@ fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) { _hash, RuntimeApiRequest::ClaimQueue(tx), ))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => { - let res = BTreeMap::>::new(); + let res = ClaimQueueSnapshot::new(); tx.send(Ok(res)).unwrap(); }, Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( @@ -780,7 +783,7 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run candidate_hash: Default::default(), candidate_descriptor: dummy_candidate_descriptor(dummy_hash()), })]; - let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]); + let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]); test_harness(|mut virtual_overseer| async move { helpers::initialize_collator(&mut virtual_overseer, para_id).await; @@ -958,7 +961,7 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled( candidate_hash: Default::default(), candidate_descriptor: dummy_candidate_descriptor(dummy_hash()), })]; - let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]); + let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]); test_harness(|mut virtual_overseer| async move { helpers::initialize_collator(&mut virtual_overseer, para_id).await; @@ -1046,7 +1049,7 @@ mod helpers { async_backing_params: AsyncBackingParams, cores: Vec, runtime_version: u32, - claim_queue: BTreeMap>, + claim_queue: ClaimQueueSnapshot, pending_availability: Vec, ) { assert_matches!( From 9c9119384e5035db2e9544bc47a4394a9d5d6e9c Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 28 Mar 2024 18:56:09 +0200 Subject: [PATCH 05/16] testing some local test run weirdness Signed-off-by: Andrei Sandu --- polkadot/node/collation-generation/src/tests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs index 150df19c1ee2..695717e85c07 100644 --- a/polkadot/node/collation-generation/src/tests.rs +++ b/polkadot/node/collation-generation/src/tests.rs @@ -883,6 +883,7 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti virtual_overseer }); + panic!("Just testing ..."); } // There are variable number of cores of cores in `Free` state and async backing is enabled. From f87a624e46c198525af51d14132d1e5d34946531 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 28 Mar 2024 18:59:15 +0200 Subject: [PATCH 06/16] fix doctest Signed-off-by: Andrei Sandu --- polkadot/node/subsystem-util/src/vstaging.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/subsystem-util/src/vstaging.rs b/polkadot/node/subsystem-util/src/vstaging.rs index 096da40222ee..25ea7ce7c9bd 100644 --- a/polkadot/node/subsystem-util/src/vstaging.rs +++ b/polkadot/node/subsystem-util/src/vstaging.rs @@ -62,7 +62,7 @@ pub async fn get_disabled_validators_with_fallback, relay_parent: Hash, From 70213c5ebca754faff9e39c3d8f96dbb75385406 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 28 Mar 2024 19:54:29 +0200 Subject: [PATCH 07/16] remove panic Signed-off-by: Andrei Sandu --- polkadot/node/collation-generation/src/tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs index 695717e85c07..150df19c1ee2 100644 --- a/polkadot/node/collation-generation/src/tests.rs +++ b/polkadot/node/collation-generation/src/tests.rs @@ -883,7 +883,6 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti virtual_overseer }); - panic!("Just testing ..."); } // There are variable number of cores of cores in `Free` state and async backing is enabled. From 4c660019b38eddb55aa3c0135552837fe82d66fb Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 29 Mar 2024 13:51:00 +0200 Subject: [PATCH 08/16] add new cluster test Signed-off-by: Andrei Sandu --- .../src/v2/tests/cluster.rs | 60 +++++++++++++++++++ .../src/v2/tests/mod.rs | 29 +++++++-- 2 files changed, 84 insertions(+), 5 deletions(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs index a944a9cd6d02..78943e8d2778 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs @@ -312,6 +312,66 @@ fn useful_cluster_statement_from_non_cluster_peer_rejected() { }); } +// Both validators in the test are part of backing groups assigned to same parachain +#[test] +fn elastic_scaling_useful_cluster_statement_from_non_cluster_peer_rejected() { + let config = TestConfig { + validator_count: 20, + group_size: 3, + local_validator: LocalRole::Validator, + async_backing_params: None, + }; + + let relay_parent = Hash::repeat_byte(1); + let peer_a = PeerId::random(); + + test_harness(config, |state, mut overseer| async move { + let candidate_hash = CandidateHash(Hash::repeat_byte(42)); + + let test_leaf = state.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, 3); + + // Peer A is not in our group, but it's group is assigned to same para as we are. + let not_our_group = GroupIndex(1); + + let that_group_validators = state.group_validators(not_our_group, false); + let v_non = that_group_validators[0]; + + connect_peer( + &mut overseer, + peer_a.clone(), + Some(vec![state.discovery_id(v_non)].into_iter().collect()), + ) + .await; + + send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await; + activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await; + + let statement = state + .sign_statement( + v_non, + CompactStatement::Seconded(candidate_hash), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(); + + send_peer_message( + &mut overseer, + peer_a.clone(), + protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement), + ) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r))) + if p == peer_a && r == COST_UNEXPECTED_STATEMENT_INVALID_SENDER.into() => { } + ); + + overseer + }); +} + #[test] fn statement_from_non_cluster_originator_unexpected() { let config = TestConfig { diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs index 82986a0330ec..e98b11079312 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs @@ -177,20 +177,39 @@ impl TestState { } fn make_dummy_leaf(&self, relay_parent: Hash) -> TestLeaf { + self.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, 1) + } + + fn make_dummy_leaf_with_multiple_cores_per_para( + &self, + relay_parent: Hash, + groups_for_first_para: usize, + ) -> TestLeaf { TestLeaf { number: 1, hash: relay_parent, parent_hash: Hash::repeat_byte(0), session: 1, availability_cores: self.make_availability_cores(|i| { - CoreState::Scheduled(ScheduledCore { - para_id: ParaId::from(i as u32), - collator: None, - }) + let para_id = if i < groups_for_first_para { + ParaId::from(0u32) + } else { + ParaId::from(i as u32) + }; + + CoreState::Scheduled(ScheduledCore { para_id, collator: None }) }), disabled_validators: Default::default(), para_data: (0..self.session_info.validator_groups.len()) - .map(|i| (ParaId::from(i as u32), PerParaData::new(1, vec![1, 2, 3].into()))) + .map(|i| { + let para_id = if i < groups_for_first_para { + ParaId::from(0u32) + } else { + ParaId::from(i as u32) + }; + + (para_id, PerParaData::new(1, vec![1, 2, 3].into())) + }) .collect(), minimum_backing_votes: 2, } From 21ae63dbff71b2f9f026b179d86924ba3a42bdab Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 29 Mar 2024 14:46:38 +0200 Subject: [PATCH 09/16] add grid test Signed-off-by: Andrei Sandu --- .../src/v2/tests/grid.rs | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs index 38a12cf32e3b..392bf5574a46 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs @@ -1829,9 +1829,7 @@ fn advertisement_not_re_sent_when_peer_re_enters_view() { }); } -// Grid statements imported to backing once candidate enters hypothetical frontier. -#[test] -fn grid_statements_imported_to_backing() { +fn inner_grid_statements_imported_to_backing(groups_for_first_para: usize) { let validator_count = 6; let group_size = 3; let config = TestConfig { @@ -1851,9 +1849,12 @@ fn grid_statements_imported_to_backing() { let local_group_index = local_validator.group_index.unwrap(); let other_group = next_group_index(local_group_index, validator_count, group_size); - let other_para = ParaId::from(other_group.0); + + // Other para is same para for elastic scaling test (groups_for_first_para > 1) + let other_para = ParaId::from((groups_for_first_para == 1) as u32); - let test_leaf = state.make_dummy_leaf(relay_parent); + let test_leaf = + state.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, groups_for_first_para); let (candidate, pvd) = make_candidate( relay_parent, @@ -2018,6 +2019,18 @@ fn grid_statements_imported_to_backing() { overseer }); } +// Grid statements imported to backing once candidate enters hypothetical frontier. +#[test] +fn grid_statements_imported_to_backing() { + inner_grid_statements_imported_to_backing(1); +} + +// Grid statements imported to backing once candidate enters hypothetical frontier. +// All statements are for candidates of the same parachain but from different backing groups. +#[test] +fn elastic_scaling_grid_statements_imported_to_backing() { + inner_grid_statements_imported_to_backing(2); +} #[test] fn advertisements_rejected_from_incorrect_peers() { From 27ee14594cd74e2098e82e02a0b7c30e05b1e4b4 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 29 Mar 2024 14:49:57 +0200 Subject: [PATCH 10/16] fmt Signed-off-by: Andrei Sandu --- .../node/network/statement-distribution/src/v2/tests/grid.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs index 392bf5574a46..9d00a92e742b 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs @@ -1849,7 +1849,7 @@ fn inner_grid_statements_imported_to_backing(groups_for_first_para: usize) { let local_group_index = local_validator.group_index.unwrap(); let other_group = next_group_index(local_group_index, validator_count, group_size); - + // Other para is same para for elastic scaling test (groups_for_first_para > 1) let other_para = ParaId::from((groups_for_first_para == 1) as u32); From be1a2c7ed86b8a09e9249b4b018cc12220ffaa95 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 1 Apr 2024 14:35:37 +0300 Subject: [PATCH 11/16] explicit max_candidate_depth Signed-off-by: Andrei Sandu --- .../node/network/statement-distribution/src/v2/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index 23e6a52b2731..0aba03dda864 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -563,11 +563,13 @@ pub(crate) async fn handle_active_leaves_update( activated: &ActivatedLeaf, leaf_mode: ProspectiveParachainsMode, ) -> JfyiErrorResult<()> { - let seconding_limit = match leaf_mode { + let max_candidate_depth = match leaf_mode { ProspectiveParachainsMode::Disabled => return Ok(()), - ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth + 1, + ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth, }; + let seconding_limit = max_candidate_depth + 1; + state .implicit_view .activate_leaf(ctx.sender(), activated.hash) @@ -701,7 +703,7 @@ pub(crate) async fn handle_active_leaves_update( &availability_cores, &group_rotation_info, &maybe_claim_queue, - seconding_limit - 1, + max_candidate_depth, ); state.per_relay_parent.insert( new_relay_parent, From b9f1afcb121e6692fd9eeb379f11718c9b91d396 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 1 Apr 2024 14:39:14 +0300 Subject: [PATCH 12/16] fix test comment Signed-off-by: Andrei Sandu --- .../node/network/statement-distribution/src/v2/tests/cluster.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs index 78943e8d2778..4fb033e08ce3 100644 --- a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs +++ b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs @@ -330,7 +330,7 @@ fn elastic_scaling_useful_cluster_statement_from_non_cluster_peer_rejected() { let test_leaf = state.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, 3); - // Peer A is not in our group, but it's group is assigned to same para as we are. + // Peer A is not in our group, but its group is assigned to same para as we are. let not_our_group = GroupIndex(1); let that_group_validators = state.group_validators(not_our_group, false); From 17c40257cb0ddc0f2d671e607ac24de8af6ad228 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 1 Apr 2024 16:37:43 +0300 Subject: [PATCH 13/16] remove check for scheduled != occupied Signed-off-by: Andrei Sandu --- polkadot/node/network/statement-distribution/src/v2/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index 0aba03dda864..57dc5d93abd0 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -2167,7 +2167,6 @@ fn determine_groups_per_para( }; maybe_scheduled_core - .filter(|scheduled_core| scheduled_core.para_id == occupied_core.para_id()) .map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32))) } else { None From a1853903cdebe892815b158b54a5285276882341 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 2 Apr 2024 14:09:21 +0300 Subject: [PATCH 14/16] review feedback Signed-off-by: Andrei Sandu --- .../node/network/statement-distribution/src/v2/mod.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index 57dc5d93abd0..41ca5ff77b44 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -3081,13 +3081,11 @@ pub(crate) async fn handle_response( relay_parent_state.session, |v| per_session.session_info.validators.get(v).map(|x| x.clone()), |para, g_index| { - let expected_groups = relay_parent_state.groups_per_para.get(¶); + let Some(expected_groups) = relay_parent_state.groups_per_para.get(¶) else { + return false + }; - expected_groups.is_some() && - expected_groups - .expect("checked is_some(); qed") - .iter() - .any(|g| g == &g_index) + expected_groups.iter().any(|g| g == &g_index) }, disabled_mask, ); From d14384c57088621f3816c1bda74317b180b2a937 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 2 Apr 2024 14:39:14 +0300 Subject: [PATCH 15/16] review Signed-off-by: Andrei Sandu --- .../statement-distribution/src/v2/mod.rs | 75 ++++++++++--------- 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index 41ca5ff77b44..69fcd61f30b1 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -700,9 +700,9 @@ pub(crate) async fn handle_active_leaves_update( .map_err(JfyiError::FetchClaimQueue)?; let groups_per_para = determine_groups_per_para( - &availability_cores, - &group_rotation_info, - &maybe_claim_queue, + availability_cores, + group_rotation_info, + maybe_claim_queue, max_candidate_depth, ); state.per_relay_parent.insert( @@ -2139,47 +2139,52 @@ async fn provide_candidate_to_grid( // Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings. fn determine_groups_per_para( - availability_cores: &[CoreState], - group_rotation_info: &GroupRotationInfo, - maybe_claim_queue: &Option, + availability_cores: Vec, + group_rotation_info: GroupRotationInfo, + maybe_claim_queue: Option, max_candidate_depth: usize, ) -> HashMap> { + let n_cores = availability_cores.len(); + // Determine the core indices occupied by each para at the current relay parent. To support // on-demand parachains we also consider the core indices at next block if core has a candidate // pending availability. - let para_core_indices = availability_cores.iter().enumerate().filter_map(|(index, core)| { - match core { - CoreState::Scheduled(scheduled_core) => - Some((scheduled_core.para_id, CoreIndex(index as u32))), - CoreState::Occupied(occupied_core) => { - if max_candidate_depth >= 1 { - // Use claim queue if available, or fallback to `next_up_on_available` - let maybe_scheduled_core = match maybe_claim_queue { - Some(claim_queue) => { - // What's up next on this core ? - fetch_next_scheduled_on_core(claim_queue, CoreIndex(index as u32)) - }, - None => { - // Runtime doesn't support claim queue runtime api. Fallback to - // `next_up_on_available` - occupied_core.next_up_on_available.clone() - }, - }; - - maybe_scheduled_core - .map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32))) - } else { - None - } - }, - CoreState::Free => None, - } - }); + let para_core_indices: Vec<_> = if let Some(claim_queue) = maybe_claim_queue { + claim_queue + .keys() + .filter_map(|core_index| { + let Some(scheduled_core) = fetch_next_scheduled_on_core(&claim_queue, *core_index) + else { + return None + }; + + Some((scheduled_core.para_id, *core_index)) + }) + .collect() + } else { + availability_cores + .into_iter() + .enumerate() + .filter_map(|(index, core)| match core { + CoreState::Scheduled(scheduled_core) => + Some((scheduled_core.para_id, CoreIndex(index as u32))), + CoreState::Occupied(occupied_core) => + if max_candidate_depth >= 1 { + occupied_core + .next_up_on_available + .map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32))) + } else { + None + }, + CoreState::Free => None, + }) + .collect() + }; let mut groups_per_para = HashMap::new(); // Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`. for (para, core_index) in para_core_indices { - let group_index = group_rotation_info.group_for_core(core_index, availability_cores.len()); + let group_index = group_rotation_info.group_for_core(core_index, n_cores); groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index) } From 9de1a044a8dc04915047a4941e094bc160ace2cf Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 2 Apr 2024 19:46:52 +0300 Subject: [PATCH 16/16] review Signed-off-by: Andrei Sandu --- .../statement-distribution/src/v2/mod.rs | 66 +++++++++++-------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index 69fcd61f30b1..b9f6f705ed8f 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -46,7 +46,7 @@ use polkadot_node_subsystem_util::{ backing_implicit_view::View as ImplicitView, reputation::ReputationAggregator, runtime::{request_min_backing_votes, ProspectiveParachainsMode}, - vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core, ClaimQueueSnapshot}, + vstaging::fetch_claim_queue, }; use polkadot_primitives::{ AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex, @@ -695,16 +695,15 @@ pub(crate) async fn handle_active_leaves_update( } }); - let maybe_claim_queue = fetch_claim_queue(ctx.sender(), new_relay_parent) - .await - .map_err(JfyiError::FetchClaimQueue)?; - let groups_per_para = determine_groups_per_para( + ctx.sender(), + new_relay_parent, availability_cores, group_rotation_info, - maybe_claim_queue, max_candidate_depth, - ); + ) + .await; + state.per_relay_parent.insert( new_relay_parent, PerRelayParentState { @@ -2138,12 +2137,25 @@ async fn provide_candidate_to_grid( } // Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings. -fn determine_groups_per_para( +async fn determine_groups_per_para( + sender: &mut impl overseer::StatementDistributionSenderTrait, + relay_parent: Hash, availability_cores: Vec, group_rotation_info: GroupRotationInfo, - maybe_claim_queue: Option, max_candidate_depth: usize, ) -> HashMap> { + let maybe_claim_queue = fetch_claim_queue(sender, relay_parent) + .await + .unwrap_or_else(|err| { + gum::debug!( + target: LOG_TARGET, + ?relay_parent, + ?err, + "determine_groups_per_para: `claim_queue` API not available, falling back to iterating availability cores" + ); + None + }); + let n_cores = availability_cores.len(); // Determine the core indices occupied by each para at the current relay parent. To support @@ -2151,15 +2163,8 @@ fn determine_groups_per_para( // pending availability. let para_core_indices: Vec<_> = if let Some(claim_queue) = maybe_claim_queue { claim_queue - .keys() - .filter_map(|core_index| { - let Some(scheduled_core) = fetch_next_scheduled_on_core(&claim_queue, *core_index) - else { - return None - }; - - Some((scheduled_core.para_id, *core_index)) - }) + .into_iter() + .filter_map(|(core_index, paras)| Some((*paras.front()?, core_index))) .collect() } else { availability_cores @@ -2245,8 +2250,17 @@ async fn fragment_tree_update_inner( let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent); if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) { let per_session = state.per_session.get(&prs.session); - // TODO(maybe for sanity): perform an extra check on the candidate backing group - // index all allowed + let group_index = confirmed.group_index(); + + // Sanity check if group_index is valid for this para at relay parent. + let Some(expected_groups) = prs.groups_per_para.get(&receipt.descriptor().para_id) + else { + continue + }; + if !expected_groups.iter().any(|g| *g == group_index) { + continue + } + if let Some(per_session) = per_session { send_backing_fresh_statements( ctx, @@ -2359,14 +2373,12 @@ async fn handle_incoming_manifest_common<'a, Context>( Some(x) => x, }; - let expected_groups = relay_parent_state.groups_per_para.get(¶_id); + let Some(expected_groups) = relay_parent_state.groups_per_para.get(¶_id) else { + modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await; + return None + }; - if expected_groups.is_none() || - !expected_groups - .expect("checked is_some(); qed") - .iter() - .any(|g| g == &manifest_summary.claimed_group_index) - { + if !expected_groups.iter().any(|g| g == &manifest_summary.claimed_group_index) { modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await; return None }