From e1a51b2becf42caaa0eee00b597380f27d855db5 Mon Sep 17 00:00:00 2001 From: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Date: Wed, 27 Mar 2024 16:44:10 +0200 Subject: [PATCH] collation-generation + collator-protocol: collate on multiple assigned cores (#3795) This works only for collators that implement the `collator_fn` allowing `collation-generation` subsystem to pull collations triggered on new heads. Also enables `request_v2::CollationFetchingResponse::CollationWithParentHeadData` for test adder/undying collators. TODO: - [x] fix tests - [x] new tests - [x] PR doc --------- Signed-off-by: Andrei Sandu --- .../consensus/aura/src/collators/lookahead.rs | 39 ++- .../node/collation-generation/src/error.rs | 2 + polkadot/node/collation-generation/src/lib.rs | 183 +++++++----- .../node/collation-generation/src/tests.rs | 261 +++++++++++++++++- .../src/collator_side/mod.rs | 129 ++++++--- .../src/collator_side/tests/mod.rs | 1 + .../tests/prospective_parachains.rs | 2 + .../src/collator_side/validators_buffer.rs | 18 +- polkadot/node/primitives/src/lib.rs | 6 +- polkadot/node/subsystem-types/src/messages.rs | 2 + polkadot/node/subsystem-util/src/lib.rs | 3 +- .../test-parachains/adder/collator/Cargo.toml | 2 +- .../undying/collator/Cargo.toml | 2 +- polkadot/runtime/test-runtime/src/lib.rs | 34 ++- prdoc/pr_3795.prdoc | 14 + 15 files changed, 557 insertions(+), 141 deletions(-) create mode 100644 prdoc/pr_3795.prdoc diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs index 161f10d55a193..580058336174d 100644 --- a/cumulus/client/consensus/aura/src/collators/lookahead.rs +++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs @@ -49,7 +49,7 @@ use polkadot_node_subsystem::messages::{ CollationGenerationMessage, RuntimeApiMessage, RuntimeApiRequest, }; use polkadot_overseer::Handle as OverseerHandle; -use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption}; +use polkadot_primitives::{CollatorPair, CoreIndex, Id as ParaId, OccupiedCoreAssumption}; use futures::{channel::oneshot, prelude::*}; use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf}; @@ -184,7 +184,15 @@ where while let Some(relay_parent_header) = import_notifications.next().await { let relay_parent = relay_parent_header.hash(); - if !is_para_scheduled(relay_parent, params.para_id, &mut params.overseer_handle).await { + // TODO: Currently we use just the first core here, but for elastic scaling + // we iterate and build on all of the cores returned. + let core_index = if let Some(core_index) = + cores_scheduled_for_para(relay_parent, params.para_id, &mut params.overseer_handle) + .await + .get(0) + { + *core_index + } else { tracing::trace!( target: crate::LOG_TARGET, ?relay_parent, @@ -193,7 +201,7 @@ where ); continue - } + }; let max_pov_size = match params .relay_client @@ -396,6 +404,7 @@ where parent_head: parent_header.encode().into(), validation_code_hash, result_sender: None, + core_index, }, ), "SubmitCollation", @@ -480,14 +489,12 @@ async fn max_ancestry_lookback( } } -// Checks if there exists a scheduled core for the para at the provided relay parent. -// -// Falls back to `false` in case of an error. -async fn is_para_scheduled( +// Return all the cores assigned to the para at the provided relay parent. +async fn cores_scheduled_for_para( relay_parent: PHash, para_id: ParaId, overseer_handle: &mut OverseerHandle, -) -> bool { +) -> Vec { let (tx, rx) = oneshot::channel(); let request = RuntimeApiRequest::AvailabilityCores(tx); overseer_handle @@ -503,7 +510,7 @@ async fn is_para_scheduled( ?relay_parent, "Failed to query availability cores runtime API", ); - return false + return Vec::new() }, Err(oneshot::Canceled) => { tracing::error!( @@ -511,9 +518,19 @@ async fn is_para_scheduled( ?relay_parent, "Sender for availability cores runtime request dropped", ); - return false + return Vec::new() }, }; - cores.iter().any(|core| core.para_id() == Some(para_id)) + cores + .iter() + .enumerate() + .filter_map(|(index, core)| { + if core.para_id() == Some(para_id) { + Some(CoreIndex(index as u32)) + } else { + None + } + }) + .collect() } diff --git a/polkadot/node/collation-generation/src/error.rs b/polkadot/node/collation-generation/src/error.rs index ac5db6cd7f285..852c50f306825 100644 --- a/polkadot/node/collation-generation/src/error.rs +++ b/polkadot/node/collation-generation/src/error.rs @@ -28,6 +28,8 @@ pub enum Error { Util(#[from] polkadot_node_subsystem_util::Error), #[error(transparent)] Erasure(#[from] polkadot_erasure_coding::Error), + #[error("Parachain backing state not available in runtime.")] + MissingParaBackingState, } pub type Result = std::result::Result; diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 3b1a8f5ff2305..3164f6078bc07 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -44,8 +44,8 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_util::{ has_required_runtime, request_async_backing_params, request_availability_cores, - request_claim_queue, request_persisted_validation_data, request_validation_code, - request_validation_code_hash, request_validators, + request_claim_queue, request_para_backing_state, request_persisted_validation_data, + request_validation_code, request_validation_code_hash, request_validators, }; use polkadot_primitives::{ collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt, @@ -212,6 +212,7 @@ async fn handle_new_activations( if config.collator.is_none() { return Ok(()) } + let para_id = config.para_id; let _overall_timer = metrics.time_new_activations(); @@ -225,25 +226,23 @@ async fn handle_new_activations( ); let availability_cores = availability_cores??; - let n_validators = validators??.len(); let async_backing_params = async_backing_params?.ok(); + let n_validators = validators??.len(); let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?; - for (core_idx, core) in availability_cores.into_iter().enumerate() { - let _availability_core_timer = metrics.time_new_activations_availability_core(); + // The loop bellow will fill in cores that the para is allowed to build on. + let mut cores_to_build_on = Vec::new(); - let (scheduled_core, assumption) = match core { - CoreState::Scheduled(scheduled_core) => - (scheduled_core, OccupiedCoreAssumption::Free), + for (core_idx, core) in availability_cores.into_iter().enumerate() { + let scheduled_core = match core { + CoreState::Scheduled(scheduled_core) => scheduled_core, CoreState::Occupied(occupied_core) => match async_backing_params { Some(params) if params.max_candidate_depth >= 1 => { // maximum candidate depth when building on top of a block // pending availability is necessarily 1 - the depth of the // pending block is 0 so the child has depth 1. - // TODO [now]: this assumes that next up == current. - // in practice we should only set `OccupiedCoreAssumption::Included` - // when the candidate occupying the core is also of the same para. + // Use claim queue if available, or fallback to `next_up_on_available` let res = match maybe_claim_queue { Some(ref claim_queue) => { // read what's in the claim queue for this core @@ -257,8 +256,7 @@ async fn handle_new_activations( // `next_up_on_available` occupied_core.next_up_on_available }, - } - .map(|scheduled| (scheduled, OccupiedCoreAssumption::Included)); + }; match res { Some(res) => res, @@ -279,7 +277,7 @@ async fn handle_new_activations( gum::trace!( target: LOG_TARGET, core_idx = %core_idx, - "core is free. Keep going.", + "core is not assigned to any para. Keep going.", ); continue }, @@ -294,64 +292,90 @@ async fn handle_new_activations( their_para = %scheduled_core.para_id, "core is not assigned to our para. Keep going.", ); - continue + } else { + // Accumulate cores for building collation(s) outside the loop. + cores_to_build_on.push(CoreIndex(core_idx as u32)); } + } - // we get validation data and validation code synchronously for each core instead of - // within the subtask loop, because we have only a single mutable handle to the - // context, so the work can't really be distributed - - let validation_data = match request_persisted_validation_data( - relay_parent, - scheduled_core.para_id, - assumption, - ctx.sender(), - ) - .await - .await?? - { - Some(v) => v, - None => { - gum::trace!( - target: LOG_TARGET, - core_idx = %core_idx, - relay_parent = ?relay_parent, - our_para = %config.para_id, - their_para = %scheduled_core.para_id, - "validation data is not available", - ); - continue - }, - }; + // Skip to next relay parent if there is no core assigned to us. + if cores_to_build_on.is_empty() { + continue + } - let validation_code_hash = match obtain_validation_code_hash_with_assumption( - relay_parent, - scheduled_core.para_id, - assumption, - ctx.sender(), - ) - .await? - { - Some(v) => v, - None => { - gum::trace!( - target: LOG_TARGET, - core_idx = %core_idx, - relay_parent = ?relay_parent, - our_para = %config.para_id, - their_para = %scheduled_core.para_id, - "validation code hash is not found.", - ); - continue - }, - }; + let para_backing_state = + request_para_backing_state(relay_parent, config.para_id, ctx.sender()) + .await + .await?? + .ok_or(crate::error::Error::MissingParaBackingState)?; + + // We are being very optimistic here, but one of the cores could pend availability some more + // block, ore even time out. + // For timeout assumption the collator can't really know because it doesn't receive bitfield + // gossip. + let assumption = if para_backing_state.pending_availability.is_empty() { + OccupiedCoreAssumption::Free + } else { + OccupiedCoreAssumption::Included + }; + + gum::debug!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + our_para = %config.para_id, + ?assumption, + "Occupied core(s) assumption", + ); + + let mut validation_data = match request_persisted_validation_data( + relay_parent, + config.para_id, + assumption, + ctx.sender(), + ) + .await + .await?? + { + Some(v) => v, + None => { + gum::debug!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + our_para = %config.para_id, + "validation data is not available", + ); + continue + }, + }; - let task_config = config.clone(); - let metrics = metrics.clone(); - let mut task_sender = ctx.sender().clone(); - ctx.spawn( - "collation-builder", - Box::pin(async move { + let validation_code_hash = match obtain_validation_code_hash_with_assumption( + relay_parent, + config.para_id, + assumption, + ctx.sender(), + ) + .await? + { + Some(v) => v, + None => { + gum::debug!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + our_para = %config.para_id, + "validation code hash is not found.", + ); + continue + }, + }; + + let task_config = config.clone(); + let metrics = metrics.clone(); + let mut task_sender = ctx.sender().clone(); + + ctx.spawn( + "chained-collation-builder", + Box::pin(async move { + for core_index in cores_to_build_on { let collator_fn = match task_config.collator.as_ref() { Some(x) => x, None => return, @@ -363,21 +387,23 @@ async fn handle_new_activations( None => { gum::debug!( target: LOG_TARGET, - para_id = %scheduled_core.para_id, + ?para_id, "collator returned no collation on collate", ); return }, }; + let parent_head = collation.head_data.clone(); construct_and_distribute_receipt( PreparedCollation { collation, - para_id: scheduled_core.para_id, + para_id, relay_parent, - validation_data, + validation_data: validation_data.clone(), validation_code_hash, n_validators, + core_index, }, task_config.key.clone(), &mut task_sender, @@ -385,9 +411,13 @@ async fn handle_new_activations( &metrics, ) .await; - }), - )?; - } + + // Chain the collations. All else stays the same as we build the chained + // collation on same relay parent. + validation_data.parent_head = parent_head; + } + }), + )?; } Ok(()) @@ -408,6 +438,7 @@ async fn handle_submit_collation( parent_head, validation_code_hash, result_sender, + core_index, } = params; let validators = request_validators(relay_parent, ctx.sender()).await.await??; @@ -444,6 +475,7 @@ async fn handle_submit_collation( validation_data, validation_code_hash, n_validators, + core_index, }; construct_and_distribute_receipt( @@ -465,6 +497,7 @@ struct PreparedCollation { validation_data: PersistedValidationData, validation_code_hash: ValidationCodeHash, n_validators: usize, + core_index: CoreIndex, } /// Takes a prepared collation, along with its context, and produces a candidate receipt @@ -483,6 +516,7 @@ async fn construct_and_distribute_receipt( validation_data, validation_code_hash, n_validators, + core_index, } = collation; let persisted_validation_data_hash = validation_data.hash(); @@ -578,6 +612,7 @@ async fn construct_and_distribute_receipt( pov, parent_head_data, result_sender, + core_index, }) .await; } diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs index 9b16980e6af43..3cb3e61a35a1c 100644 --- a/polkadot/node/collation-generation/src/tests.rs +++ b/polkadot/node/collation-generation/src/tests.rs @@ -30,13 +30,16 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::{ - AsyncBackingParams, CollatorPair, HeadData, Id as ParaId, Id, PersistedValidationData, + async_backing::{BackingState, CandidatePendingAvailability}, + AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData, ScheduledCore, ValidationCode, }; use rstest::rstest; use sp_keyring::sr25519::Keyring as Sr25519Keyring; use std::pin::Pin; -use test_helpers::{dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator}; +use test_helpers::{ + dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator, make_candidate, +}; type VirtualOverseer = TestSubsystemContextHandle; @@ -105,9 +108,9 @@ impl Future for TestCollator { impl Unpin for TestCollator {} -async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { - const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000); +const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000); +async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { overseer .recv() .timeout(TIMEOUT) @@ -135,6 +138,41 @@ fn scheduled_core_for>(para_id: Id) -> ScheduledCore { ScheduledCore { para_id: para_id.into(), collator: None } } +fn dummy_candidate_pending_availability( + para_id: ParaId, + candidate_relay_parent: Hash, + relay_parent_number: BlockNumber, +) -> CandidatePendingAvailability { + let (candidate, _pvd) = make_candidate( + candidate_relay_parent, + relay_parent_number, + para_id, + dummy_head_data(), + HeadData(vec![1]), + ValidationCode(vec![1, 2, 3]).hash(), + ); + let candidate_hash = candidate.hash(); + + CandidatePendingAvailability { + candidate_hash, + descriptor: candidate.descriptor, + commitments: candidate.commitments, + relay_parent_number, + max_pov_size: 5 * 1024 * 1024, + } +} + +fn dummy_backing_state(pending_availability: Vec) -> BackingState { + let constraints = helpers::dummy_constraints( + 0, + vec![0], + dummy_head_data(), + ValidationCodeHash::from(Hash::repeat_byte(42)), + ); + + BackingState { constraints, pending_availability } +} + #[rstest] #[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)] #[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)] @@ -176,6 +214,12 @@ fn requests_availability_per_relay_parent(#[case] runtime_version: u32) { ))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => { tx.send(Ok(BTreeMap::new())).unwrap(); }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ParaBackingState(_para_id, tx), + ))) => { + tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap(); + }, Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg), } } @@ -273,6 +317,12 @@ fn requests_validation_data_for_scheduled_matches(#[case] runtime_version: u32) ))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => { tx.send(Ok(BTreeMap::new())).unwrap(); }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ParaBackingState(_para_id, tx), + ))) => { + tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap(); + }, Some(msg) => { panic!("didn't expect any other overseer requests; got {:?}", msg) }, @@ -384,6 +434,12 @@ fn sends_distribute_collation_message(#[case] runtime_version: u32) { ))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => { tx.send(Ok(BTreeMap::new())).unwrap(); }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ParaBackingState(_para_id, tx), + ))) => { + tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap(); + }, Some(msg @ AllMessages::CollatorProtocol(_)) => { inner_to_collator_protocol.lock().await.push(msg); }, @@ -564,6 +620,12 @@ fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) { let res = BTreeMap::>::new(); tx.send(Ok(res)).unwrap(); }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ParaBackingState(_para_id, tx), + ))) => { + tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap(); + }, Some(msg) => { panic!("didn't expect any other overseer requests; got {:?}", msg) }, @@ -611,6 +673,7 @@ fn submit_collation_is_no_op_before_initialization() { parent_head: vec![1, 2, 3].into(), validation_code_hash: Hash::repeat_byte(1).into(), result_sender: None, + core_index: CoreIndex(0), }), }) .await; @@ -647,6 +710,7 @@ fn submit_collation_leads_to_distribution() { parent_head: vec![1, 2, 3].into(), validation_code_hash, result_sender: None, + core_index: CoreIndex(0), }), }) .await; @@ -721,6 +785,9 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run test_harness(|mut virtual_overseer| async move { helpers::initialize_collator(&mut virtual_overseer, para_id).await; helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; + + let pending_availability = + vec![dummy_candidate_pending_availability(para_id, activated_hash, 1)]; helpers::handle_runtime_calls_on_new_head_activation( &mut virtual_overseer, activated_hash, @@ -728,14 +795,140 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run cores, runtime_version, claim_queue, + pending_availability, + ) + .await; + helpers::handle_core_processing_for_a_leaf( + &mut virtual_overseer, + activated_hash, + para_id, + // `CoreState` is `Occupied` => `OccupiedCoreAssumption` is `Included` + OccupiedCoreAssumption::Included, + 1, + ) + .await; + + virtual_overseer + }); +} + +// There are variable number of cores of cores in `Occupied` state and async backing is enabled. +// On new head activation `CollationGeneration` should produce and distribute a new collation +// with proper assumption about the para candidate chain availability at next block. +#[rstest] +#[case(0)] +#[case(1)] +#[case(2)] +fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elastic_scaling( + #[case] candidates_pending_avail: u32, +) { + let activated_hash: Hash = [1; 32].into(); + let para_id = ParaId::from(5); + + let cores = (0..candidates_pending_avail) + .into_iter() + .map(|idx| { + CoreState::Occupied(polkadot_primitives::OccupiedCore { + next_up_on_available: Some(ScheduledCore { para_id, collator: None }), + occupied_since: 0, + time_out_at: 10, + next_up_on_time_out: Some(ScheduledCore { para_id, collator: None }), + availability: Default::default(), // doesn't matter + group_responsible: polkadot_primitives::GroupIndex(idx as u32), + candidate_hash: Default::default(), + candidate_descriptor: dummy_candidate_descriptor(dummy_hash()), + }) + }) + .collect::>(); + + let pending_availability = (0..candidates_pending_avail) + .into_iter() + .map(|_idx| dummy_candidate_pending_availability(para_id, activated_hash, 0)) + .collect::>(); + + let claim_queue = cores + .iter() + .enumerate() + .map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id]))) + .collect::>(); + let total_cores = cores.len(); + + test_harness(|mut virtual_overseer| async move { + helpers::initialize_collator(&mut virtual_overseer, para_id).await; + helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; + helpers::handle_runtime_calls_on_new_head_activation( + &mut virtual_overseer, + activated_hash, + AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 }, + cores, + // Using latest runtime with the fancy claim queue exposed. + RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT, + claim_queue, + pending_availability, ) .await; + helpers::handle_core_processing_for_a_leaf( &mut virtual_overseer, activated_hash, para_id, // `CoreState` is `Occupied` => `OccupiedCoreAssumption` is `Included` OccupiedCoreAssumption::Included, + total_cores, + ) + .await; + + virtual_overseer + }); +} + +// There are variable number of cores of cores in `Free` state and async backing is enabled. +// On new head activation `CollationGeneration` should produce and distribute a new collation +// with proper assumption about the para candidate chain availability at next block. +#[rstest] +#[case(0)] +#[case(1)] +#[case(2)] +fn distribute_collation_for_free_cores_with_async_backing_enabled_and_elastic_scaling( + #[case] candidates_pending_avail: u32, +) { + let activated_hash: Hash = [1; 32].into(); + let para_id = ParaId::from(5); + + let cores = (0..candidates_pending_avail) + .into_iter() + .map(|_idx| CoreState::Scheduled(ScheduledCore { para_id, collator: None })) + .collect::>(); + + let claim_queue = cores + .iter() + .enumerate() + .map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id]))) + .collect::>(); + let total_cores = cores.len(); + + test_harness(|mut virtual_overseer| async move { + helpers::initialize_collator(&mut virtual_overseer, para_id).await; + helpers::activate_new_head(&mut virtual_overseer, activated_hash).await; + helpers::handle_runtime_calls_on_new_head_activation( + &mut virtual_overseer, + activated_hash, + AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 }, + cores, + // Using latest runtime with the fancy claim queue exposed. + RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT, + claim_queue, + vec![], + ) + .await; + + helpers::handle_core_processing_for_a_leaf( + &mut virtual_overseer, + activated_hash, + para_id, + // `CoreState` is `Free` => `OccupiedCoreAssumption` is `Free` + OccupiedCoreAssumption::Free, + total_cores, ) .await; @@ -777,6 +970,7 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled( cores, runtime_version, claim_queue, + vec![], ) .await; @@ -785,8 +979,38 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled( } mod helpers { + use polkadot_primitives::{ + async_backing::{Constraints, InboundHrmpLimitations}, + BlockNumber, + }; + use super::*; + // A set for dummy constraints for `ParaBackingState`` + pub(crate) fn dummy_constraints( + min_relay_parent_number: BlockNumber, + valid_watermarks: Vec, + required_parent: HeadData, + validation_code_hash: ValidationCodeHash, + ) -> Constraints { + Constraints { + min_relay_parent_number, + max_pov_size: 5 * 1024 * 1024, + max_code_size: 1_000_000, + ump_remaining: 10, + ump_remaining_bytes: 1_000, + max_ump_num_per_candidate: 10, + dmp_remaining_messages: vec![], + hrmp_inbound: InboundHrmpLimitations { valid_watermarks }, + hrmp_channels_out: vec![], + max_hrmp_num_per_candidate: 0, + required_parent, + validation_code_hash, + upgrade_restriction: None, + future_validation_code: None, + } + } + // Sends `Initialize` with a collator config pub async fn initialize_collator(virtual_overseer: &mut VirtualOverseer, para_id: ParaId) { virtual_overseer @@ -822,7 +1046,8 @@ mod helpers { async_backing_params: AsyncBackingParams, cores: Vec, runtime_version: u32, - claim_queue: BTreeMap>, + claim_queue: BTreeMap>, + pending_availability: Vec, ) { assert_matches!( overseer_recv(virtual_overseer).await, @@ -857,6 +1082,25 @@ mod helpers { } ); + // Process the `ParaBackingState` message, and return some dummy state. + let message = overseer_recv(virtual_overseer).await; + let para_id = match message { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::ParaBackingState(p_id, _), + )) => p_id, + _ => panic!("received unexpected message {:?}", message), + }; + + assert_matches!( + message, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::ParaBackingState(p_id, tx)) + ) if parent == activated_hash && p_id == para_id => { + tx.send(Ok(Some(dummy_backing_state(pending_availability)))).unwrap(); + } + ); + assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( @@ -889,7 +1133,14 @@ mod helpers { activated_hash: Hash, para_id: ParaId, expected_occupied_core_assumption: OccupiedCoreAssumption, + cores_assigned: usize, ) { + // Expect no messages if no cores is assigned to the para + if cores_assigned == 0 { + assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + return + } + // Some hardcoded data - if needed, extract to parameters let validation_code_hash = ValidationCodeHash::from(Hash::repeat_byte(42)); let parent_head = HeadData::from(vec![1, 2, 3]); diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 9f306f288a162..e6aa55235b7a8 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -203,20 +203,40 @@ struct PeerData { version: CollationVersion, } +/// A type wrapping a collation and it's designated core index. +struct CollationWithCoreIndex(Collation, CoreIndex); + +impl CollationWithCoreIndex { + /// Returns inner collation ref. + pub fn collation(&self) -> &Collation { + &self.0 + } + + /// Returns inner collation mut ref. + pub fn collation_mut(&mut self) -> &mut Collation { + &mut self.0 + } + + /// Returns inner core index. + pub fn core_index(&self) -> &CoreIndex { + &self.1 + } +} + struct PerRelayParent { prospective_parachains_mode: ProspectiveParachainsMode, - /// Validators group responsible for backing candidates built + /// Per core index validators group responsible for backing candidates built /// on top of this relay parent. - validator_group: ValidatorGroup, + validator_group: HashMap, /// Distributed collations. - collations: HashMap, + collations: HashMap, } impl PerRelayParent { fn new(mode: ProspectiveParachainsMode) -> Self { Self { prospective_parachains_mode: mode, - validator_group: ValidatorGroup::default(), + validator_group: HashMap::default(), collations: HashMap::new(), } } @@ -350,6 +370,7 @@ async fn distribute_collation( pov: PoV, parent_head_data: HeadData, result_sender: Option>, + core_index: CoreIndex, ) -> Result<()> { let candidate_relay_parent = receipt.descriptor.relay_parent; let candidate_hash = receipt.hash(); @@ -422,7 +443,22 @@ async fn distribute_collation( ); } - let our_core = our_cores[0]; + // Double check that the specified `core_index` is among the ones our para has assignments for. + if !our_cores.iter().any(|assigned_core| assigned_core == &core_index) { + gum::warn!( + target: LOG_TARGET, + para_id = %id, + relay_parent = ?candidate_relay_parent, + cores = ?our_cores, + ?core_index, + "Attempting to distribute collation for a core we are not assigned to ", + ); + + return Ok(()) + } + + let our_core = core_index; + // Determine the group on that core. // // When prospective parachains are disabled, candidate relay parent here is @@ -464,10 +500,12 @@ async fn distribute_collation( "Accepted collation, connecting to validators." ); - let validators_at_relay_parent = &mut per_relay_parent.validator_group.validators; - if validators_at_relay_parent.is_empty() { - *validators_at_relay_parent = validators; - } + // Insert validator group for the `core_index` at relay parent. + per_relay_parent.validator_group.entry(core_index).or_insert_with(|| { + let mut group = ValidatorGroup::default(); + group.validators = validators; + group + }); // Update a set of connected validators if necessary. connect_to_validators(ctx, &state.validator_groups_buf).await; @@ -484,7 +522,10 @@ async fn distribute_collation( per_relay_parent.collations.insert( candidate_hash, - Collation { receipt, pov, parent_head_data, status: CollationStatus::Created }, + CollationWithCoreIndex( + Collation { receipt, pov, parent_head_data, status: CollationStatus::Created }, + core_index, + ), ); // If prospective parachains are disabled, a leaf should be known to peer. @@ -690,7 +731,10 @@ async fn advertise_collation( advertisement_timeouts: &mut FuturesUnordered, metrics: &Metrics, ) { - for (candidate_hash, collation) in per_relay_parent.collations.iter_mut() { + for (candidate_hash, collation_and_core) in per_relay_parent.collations.iter_mut() { + let core_index = *collation_and_core.core_index(); + let collation = collation_and_core.collation_mut(); + // Check that peer will be able to request the collation. if let CollationVersion::V1 = protocol_version { if per_relay_parent.prospective_parachains_mode.is_enabled() { @@ -704,11 +748,17 @@ async fn advertise_collation( } } - let should_advertise = - per_relay_parent - .validator_group - .should_advertise_to(candidate_hash, peer_ids, &peer); + let Some(validator_group) = per_relay_parent.validator_group.get_mut(&core_index) else { + gum::debug!( + target: LOG_TARGET, + ?relay_parent, + ?core_index, + "Skipping advertising to validator, validator group for core not found", + ); + return + }; + let should_advertise = validator_group.should_advertise_to(candidate_hash, peer_ids, &peer); match should_advertise { ShouldAdvertiseTo::Yes => {}, ShouldAdvertiseTo::NotAuthority | ShouldAdvertiseTo::AlreadyAdvertised => { @@ -756,9 +806,7 @@ async fn advertise_collation( )) .await; - per_relay_parent - .validator_group - .advertised_to_peer(candidate_hash, &peer_ids, peer); + validator_group.advertised_to_peer(candidate_hash, &peer_ids, peer); advertisement_timeouts.push(ResetInterestTimeout::new( *candidate_hash, @@ -790,6 +838,7 @@ async fn process_msg( pov, parent_head_data, result_sender, + core_index, } => { let _span1 = state .span_per_relay_parent @@ -820,6 +869,7 @@ async fn process_msg( pov, parent_head_data, result_sender, + core_index, ) .await?; }, @@ -1053,7 +1103,7 @@ async fn handle_incoming_request( }; let mode = per_relay_parent.prospective_parachains_mode; - let collation = match &req { + let collation_with_core = match &req { VersionedCollationRequest::V1(_) if !mode.is_enabled() => per_relay_parent.collations.values_mut().next(), VersionedCollationRequest::V2(req) => @@ -1070,22 +1120,24 @@ async fn handle_incoming_request( return Ok(()) }, }; - let (receipt, pov, parent_head_data) = if let Some(collation) = collation { - collation.status.advance_to_requested(); - ( - collation.receipt.clone(), - collation.pov.clone(), - collation.parent_head_data.clone(), - ) - } else { - gum::warn!( - target: LOG_TARGET, - relay_parent = %relay_parent, - "received a `RequestCollation` for a relay parent we don't have collation stored.", - ); + let (receipt, pov, parent_head_data) = + if let Some(collation_with_core) = collation_with_core { + let collation = collation_with_core.collation_mut(); + collation.status.advance_to_requested(); + ( + collation.receipt.clone(), + collation.pov.clone(), + collation.parent_head_data.clone(), + ) + } else { + gum::warn!( + target: LOG_TARGET, + relay_parent = %relay_parent, + "received a `RequestCollation` for a relay parent we don't have collation stored.", + ); - return Ok(()) - }; + return Ok(()) + }; state.metrics.on_collation_sent_requested(); @@ -1340,7 +1392,9 @@ where .remove(removed) .map(|per_relay_parent| per_relay_parent.collations) .unwrap_or_default(); - for collation in collations.into_values() { + for collation_with_core in collations.into_values() { + let collation = collation_with_core.collation(); + let candidate_hash = collation.receipt.hash(); state.collation_result_senders.remove(&candidate_hash); state.validator_groups_buf.remove_candidate(&candidate_hash); @@ -1477,7 +1531,7 @@ async fn run_inner( continue }; - let next_collation = { + let next_collation_with_core = { let per_relay_parent = match state.per_relay_parent.get(&relay_parent) { Some(per_relay_parent) => per_relay_parent, None => continue, @@ -1497,7 +1551,8 @@ async fn run_inner( } }; - if let Some(collation) = next_collation { + if let Some(collation_with_core) = next_collation_with_core { + let collation = collation_with_core.collation(); let receipt = collation.receipt.clone(); let pov = collation.pov.clone(); let parent_head_data = collation.parent_head_data.clone(); diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 38e6780eb7d20..bcf0b34e631f9 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -377,6 +377,7 @@ async fn distribute_collation_with_receipt( pov: pov.clone(), parent_head_data: HeadData(vec![1, 2, 3]), result_sender: None, + core_index: CoreIndex(0), }, ) .await; diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs index e419cd5444f5a..707053545630a 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs @@ -277,6 +277,7 @@ fn distribute_collation_from_implicit_view() { pov: pov.clone(), parent_head_data: HeadData(vec![1, 2, 3]), result_sender: None, + core_index: CoreIndex(0), }, ) .await; @@ -358,6 +359,7 @@ fn distribute_collation_up_to_limit() { pov: pov.clone(), parent_head_data: HeadData(vec![1, 2, 3]), result_sender: None, + core_index: CoreIndex(0), }, ) .await; diff --git a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs index 1533f2eda5a57..fbb3ff4328a51 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -45,14 +45,22 @@ use futures::FutureExt; use polkadot_node_network_protocol::PeerId; use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, GroupIndex, SessionIndex}; +/// Elastic scaling: how many candidates per relay chain block the collator supports building. +pub const MAX_CHAINED_CANDIDATES_PER_RCB: NonZeroUsize = match NonZeroUsize::new(3) { + Some(cap) => cap, + None => panic!("max candidates per rcb cannot be zero"), +}; + /// The ring buffer stores at most this many unique validator groups. /// /// This value should be chosen in way that all groups assigned to our para -/// in the view can fit into the buffer. -pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) { - Some(cap) => cap, - None => panic!("buffer capacity must be non-zero"), -}; +/// in the view can fit into the buffer multiplied by amount of candidates we support per relay +/// chain block in the case of elastic scaling. +pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = + match NonZeroUsize::new(3 * MAX_CHAINED_CANDIDATES_PER_RCB.get()) { + Some(cap) => cap, + None => panic!("buffer capacity must be non-zero"), + }; /// Unique identifier of a validators group. #[derive(Debug)] diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs index b102cf06c38f6..b127d87d4ea4d 100644 --- a/polkadot/node/primitives/src/lib.rs +++ b/polkadot/node/primitives/src/lib.rs @@ -31,8 +31,8 @@ use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use polkadot_primitives::{ BlakeTwo256, BlockNumber, CandidateCommitments, CandidateHash, CollatorPair, - CommittedCandidateReceipt, CompactStatement, EncodeAs, Hash, HashT, HeadData, Id as ParaId, - PersistedValidationData, SessionIndex, Signed, UncheckedSigned, ValidationCode, + CommittedCandidateReceipt, CompactStatement, CoreIndex, EncodeAs, Hash, HashT, HeadData, + Id as ParaId, PersistedValidationData, SessionIndex, Signed, UncheckedSigned, ValidationCode, ValidationCodeHash, ValidatorIndex, MAX_CODE_SIZE, MAX_POV_SIZE, }; pub use sp_consensus_babe::{ @@ -524,6 +524,8 @@ pub struct SubmitCollationParams { /// okay to just drop it. However, if it is called, it should be called with the signed /// statement of a parachain validator seconding the collation. pub result_sender: Option>, + /// The core index on which the resulting candidate should be backed + pub core_index: CoreIndex, } /// This is the data we keep available for each candidate included in the relay chain. diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 5d05d2b56ed08..d84b0b6dd1412 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -228,6 +228,8 @@ pub enum CollatorProtocolMessage { /// The result sender should be informed when at least one parachain validator seconded the /// collation. It is also completely okay to just drop the sender. result_sender: Option>, + /// The core index where the candidate should be backed. + core_index: CoreIndex, }, /// Report a collator as having provided an invalid collation. This should lead to disconnect /// and blacklist of the collator. diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 6ff09ed5f2200..83b046f0bf0ac 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -30,7 +30,7 @@ use polkadot_node_subsystem::{ messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender}, overseer, SubsystemSender, }; -use polkadot_primitives::{slashing, CoreIndex, ExecutorParams}; +use polkadot_primitives::{async_backing::BackingState, slashing, CoreIndex, ExecutorParams}; pub use overseer::{ gen::{OrchestraError as OverseerError, Timeout}, @@ -308,6 +308,7 @@ specialize_requests! { fn request_disabled_validators() -> Vec; DisabledValidators; fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams; fn request_claim_queue() -> BTreeMap>; ClaimQueue; + fn request_para_backing_state(para_id: ParaId) -> Option; ParaBackingState; } /// Requests executor parameters from the runtime effective at given relay-parent. First obtains diff --git a/polkadot/parachain/test-parachains/adder/collator/Cargo.toml b/polkadot/parachain/test-parachains/adder/collator/Cargo.toml index cb283c271191f..30bce806f9ffa 100644 --- a/polkadot/parachain/test-parachains/adder/collator/Cargo.toml +++ b/polkadot/parachain/test-parachains/adder/collator/Cargo.toml @@ -24,7 +24,7 @@ log = { workspace = true, default-features = true } test-parachain-adder = { path = ".." } polkadot-primitives = { path = "../../../../primitives" } polkadot-cli = { path = "../../../../cli" } -polkadot-service = { path = "../../../../node/service", features = ["rococo-native"] } +polkadot-service = { path = "../../../../node/service", features = ["elastic-scaling-experimental", "rococo-native"] } polkadot-node-primitives = { path = "../../../../node/primitives" } polkadot-node-subsystem = { path = "../../../../node/subsystem" } diff --git a/polkadot/parachain/test-parachains/undying/collator/Cargo.toml b/polkadot/parachain/test-parachains/undying/collator/Cargo.toml index 238b98a66801c..bede10a7673be 100644 --- a/polkadot/parachain/test-parachains/undying/collator/Cargo.toml +++ b/polkadot/parachain/test-parachains/undying/collator/Cargo.toml @@ -24,7 +24,7 @@ log = { workspace = true, default-features = true } test-parachain-undying = { path = ".." } polkadot-primitives = { path = "../../../../primitives" } polkadot-cli = { path = "../../../../cli" } -polkadot-service = { path = "../../../../node/service", features = ["rococo-native"] } +polkadot-service = { path = "../../../../node/service", features = ["elastic-scaling-experimental", "rococo-native"] } polkadot-node-primitives = { path = "../../../../node/primitives" } polkadot-node-subsystem = { path = "../../../../node/subsystem" } diff --git a/polkadot/runtime/test-runtime/src/lib.rs b/polkadot/runtime/test-runtime/src/lib.rs index 8a3cd9309dbd8..62c3741c56d6e 100644 --- a/polkadot/runtime/test-runtime/src/lib.rs +++ b/polkadot/runtime/test-runtime/src/lib.rs @@ -27,10 +27,11 @@ use sp_std::{collections::btree_map::BTreeMap, prelude::*}; use polkadot_runtime_parachains::{ assigner_parachains as parachains_assigner_parachains, configuration as parachains_configuration, disputes as parachains_disputes, - disputes::slashing as parachains_slashing, dmp as parachains_dmp, hrmp as parachains_hrmp, - inclusion as parachains_inclusion, initializer as parachains_initializer, - origin as parachains_origin, paras as parachains_paras, - paras_inherent as parachains_paras_inherent, runtime_api_impl::v7 as runtime_impl, + disputes::slashing as parachains_slashing, + dmp as parachains_dmp, hrmp as parachains_hrmp, inclusion as parachains_inclusion, + initializer as parachains_initializer, origin as parachains_origin, paras as parachains_paras, + paras_inherent as parachains_paras_inherent, + runtime_api_impl::{v7 as runtime_impl, vstaging as staging_runtime_impl}, scheduler as parachains_scheduler, session_info as parachains_session_info, shared as parachains_shared, }; @@ -829,6 +830,7 @@ sp_api::impl_runtime_apis! { } } + #[api_version(10)] impl primitives::runtime_api::ParachainHost for Runtime { fn validators() -> Vec { runtime_impl::validators::() @@ -956,6 +958,30 @@ sp_api::impl_runtime_apis! { key_ownership_proof, ) } + + fn minimum_backing_votes() -> u32 { + runtime_impl::minimum_backing_votes::() + } + + fn para_backing_state(para_id: ParaId) -> Option { + runtime_impl::backing_state::(para_id) + } + + fn async_backing_params() -> primitives::AsyncBackingParams { + runtime_impl::async_backing_params::() + } + + fn approval_voting_params() -> primitives::vstaging::ApprovalVotingParams { + staging_runtime_impl::approval_voting_params::() + } + + fn disabled_validators() -> Vec { + staging_runtime_impl::disabled_validators::() + } + + fn node_features() -> primitives::vstaging::NodeFeatures { + staging_runtime_impl::node_features::() + } } impl beefy_primitives::BeefyApi for Runtime { diff --git a/prdoc/pr_3795.prdoc b/prdoc/pr_3795.prdoc new file mode 100644 index 0000000000000..da01fcbec821c --- /dev/null +++ b/prdoc/pr_3795.prdoc @@ -0,0 +1,14 @@ +title: Enable collators to build on multiple cores + +doc: + - audience: Node Dev + description: | + Introduces a `CoreIndex` parameter in `SubmitCollationParams`. This enables + the collators to make use of potentially multiple cores assigned at some relay + chain block. This extra parameter is used by the collator protocol and collation + generation subsystems to forward the collation to the approapriate backing group. + +crates: +- name: polkadot-node-collation-generation +- name: polkadot-collator-protocol + bump: minor