diff --git a/.gitlab/pipeline/zombienet/cumulus.yml b/.gitlab/pipeline/zombienet/cumulus.yml index a7f321505bac..6e2b53fae619 100644 --- a/.gitlab/pipeline/zombienet/cumulus.yml +++ b/.gitlab/pipeline/zombienet/cumulus.yml @@ -149,3 +149,27 @@ zombienet-cumulus-0007-full_node_warp_sync: --local-dir="${LOCAL_DIR}" --concurrency=1 --test="0007-full_node_warp_sync.zndsl" + +zombienet-cumulus-0008-elastic_authoring: + extends: + - .zombienet-cumulus-common + - .zombienet-refs + - .zombienet-before-script + - .zombienet-after-script + script: + - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh + --local-dir="${LOCAL_DIR}" + --concurrency=1 + --test="0008-elastic_authoring.zndsl" + +zombienet-cumulus-0009-elastic_pov_recovery: + extends: + - .zombienet-cumulus-common + - .zombienet-refs + - .zombienet-before-script + - .zombienet-after-script + script: + - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh + --local-dir="${LOCAL_DIR}" + --concurrency=1 + --test="0009-elastic_pov_recovery.zndsl" diff --git a/Cargo.lock b/Cargo.lock index 7b08e96f4bbe..f3808b7eaa53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3772,6 +3772,7 @@ dependencies = [ "sc-consensus-babe", "sc-consensus-slots", "sc-telemetry", + "sc-utils", "schnellru", "sp-api", "sp-application-crypto", @@ -3786,6 +3787,7 @@ dependencies = [ "sp-state-machine", "sp-timestamp", "substrate-prometheus-endpoint", + "tokio", "tracing", ] @@ -3906,7 +3908,6 @@ dependencies = [ "cumulus-test-relay-sproof-builder", "parity-scale-codec", "sc-client-api", - "scale-info", "sp-api", "sp-crypto-hashing", "sp-inherents", @@ -4333,15 +4334,8 @@ dependencies = [ "cumulus-relay-chain-interface", "cumulus-relay-chain-rpc-interface", "futures", - "parking_lot 0.12.3", - "polkadot-availability-recovery", - "polkadot-collator-protocol", "polkadot-core-primitives", "polkadot-network-bridge", - "polkadot-node-collation-generation", - "polkadot-node-core-chain-api", - "polkadot-node-core-prospective-parachains", - "polkadot-node-core-runtime-api", "polkadot-node-network-protocol", "polkadot-node-subsystem-util", "polkadot-overseer", @@ -4535,7 +4529,6 @@ dependencies = [ "polkadot-test-service", "portpicker", "rand", - "rococo-parachain-runtime", "sc-basic-authorship", "sc-block-builder", "sc-chain-spec", @@ -4560,7 +4553,6 @@ dependencies = [ "sp-blockchain", "sp-consensus", "sp-consensus-aura", - "sp-consensus-grandpa", "sp-core", "sp-io", "sp-keyring", diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml index 5ab3e6f25129..01e07cb395a9 100644 --- a/cumulus/client/consensus/aura/Cargo.toml +++ b/cumulus/client/consensus/aura/Cargo.toml @@ -16,6 +16,7 @@ futures = { workspace = true } parking_lot = { workspace = true } tracing = { workspace = true, default-features = true } schnellru = { workspace = true } +tokio = { workspace = true, features = ["macros"] } # Substrate sc-client-api = { workspace = true, default-features = true } @@ -23,6 +24,7 @@ sc-consensus = { workspace = true, default-features = true } sc-consensus-aura = { workspace = true, default-features = true } sc-consensus-babe = { workspace = true, default-features = true } sc-consensus-slots = { workspace = true, default-features = true } +sc-utils = { workspace = true, default-features = true } sc-telemetry = { workspace = true, default-features = true } sp-api = { workspace = true, default-features = true } sp-application-crypto = { workspace = true, default-features = true } diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index 776052215d93..dc830e463a4f 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -156,15 +156,8 @@ where Ok((paras_inherent_data, other_inherent_data)) } - /// Propose, seal, and import a block, packaging it into a collation. - /// - /// Provide the slot to build at as well as any other necessary pre-digest logs, - /// the inherent data, and the proposal duration and PoV size limits. - /// - /// The Aura pre-digest should not be explicitly provided and is set internally. - /// - /// This does not announce the collation to the parachain network or the relay chain. - pub async fn collate( + /// Build and import a parachain block on the given parent header, using the given slot claim. + pub async fn build_block_and_import( &mut self, parent_header: &Block::Header, slot_claim: &SlotClaim, @@ -172,10 +165,7 @@ where inherent_data: (ParachainInherentData, InherentData), proposal_duration: Duration, max_pov_size: usize, - ) -> Result< - Option<(Collation, ParachainBlockData, Block::Hash)>, - Box, - > { + ) -> Result>, Box> { let mut digest = additional_pre_digest.into().unwrap_or_default(); digest.push(slot_claim.pre_digest.clone()); @@ -205,7 +195,6 @@ where ) .map_err(|e| e as Box)?; - let post_hash = sealed_importable.post_hash(); let block = Block::new( sealed_importable.post_header(), sealed_importable @@ -220,11 +209,46 @@ where .map_err(|e| Box::new(e) as Box) .await?; - if let Some((collation, block_data)) = self.collator_service.build_collation( - parent_header, - post_hash, - ParachainCandidate { block, proof: proposal.proof }, - ) { + Ok(Some(ParachainCandidate { block, proof: proposal.proof })) + } + + /// Propose, seal, import a block and packaging it into a collation. + /// + /// Provide the slot to build at as well as any other necessary pre-digest logs, + /// the inherent data, and the proposal duration and PoV size limits. + /// + /// The Aura pre-digest should not be explicitly provided and is set internally. + /// + /// This does not announce the collation to the parachain network or the relay chain. + pub async fn collate( + &mut self, + parent_header: &Block::Header, + slot_claim: &SlotClaim, + additional_pre_digest: impl Into>>, + inherent_data: (ParachainInherentData, InherentData), + proposal_duration: Duration, + max_pov_size: usize, + ) -> Result< + Option<(Collation, ParachainBlockData, Block::Hash)>, + Box, + > { + let maybe_candidate = self + .build_block_and_import( + parent_header, + slot_claim, + additional_pre_digest, + inherent_data, + proposal_duration, + max_pov_size, + ) + .await?; + + let Some(candidate) = maybe_candidate else { return Ok(None) }; + + let hash = candidate.block.header().hash(); + if let Some((collation, block_data)) = + self.collator_service.build_collation(parent_header, hash, candidate) + { tracing::info!( target: crate::LOG_TARGET, "PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}", @@ -241,7 +265,7 @@ where ); } - Ok(Some((collation, block_data, post_hash))) + Ok(Some((collation, block_data, hash))) } else { Err(Box::::from("Unable to produce collation") as Box) diff --git a/cumulus/client/consensus/aura/src/collators/basic.rs b/cumulus/client/consensus/aura/src/collators/basic.rs index 1047c6219ad1..4efd50a04ec6 100644 --- a/cumulus/client/consensus/aura/src/collators/basic.rs +++ b/cumulus/client/consensus/aura/src/collators/basic.rs @@ -41,7 +41,6 @@ use sc_consensus::BlockImport; use sp_api::{CallApiAt, ProvideRuntimeApi}; use sp_application_crypto::AppPublic; use sp_blockchain::HeaderBackend; -use sp_consensus::SyncOracle; use sp_consensus_aura::AuraApi; use sp_core::crypto::Pair; use sp_inherents::CreateInherentDataProviders; @@ -53,7 +52,7 @@ use std::{sync::Arc, time::Duration}; use crate::collator as collator_util; /// Parameters for [`run`]. -pub struct Params { +pub struct Params { /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. /// the timestamp, slot, and paras inherents should be omitted, as they are set by this /// collator. @@ -64,8 +63,6 @@ pub struct Params { pub para_client: Arc, /// A handle to the relay-chain client. pub relay_client: RClient, - /// A chain synchronization oracle. - pub sync_oracle: SO, /// The underlying keystore, which should contain Aura consensus keys. pub keystore: KeystorePtr, /// The collator key used to sign collations before submitting to validators. @@ -89,8 +86,8 @@ pub struct Params { } /// Run bare Aura consensus as a relay-chain-driven collator. -pub fn run( - params: Params, +pub fn run( + params: Params, ) -> impl Future + Send + 'static where Block: BlockT + Send, @@ -108,7 +105,6 @@ where CIDP: CreateInherentDataProviders + Send + 'static, CIDP::InherentDataProviders: Send, BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, - SO: SyncOracle + Send + Sync + Clone + 'static, Proposer: ProposerInterface + Send + Sync + 'static, CS: CollatorServiceInterface + Send + Sync + 'static, P: Pair, diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs index b6f7b07f55d3..749b13112394 100644 --- a/cumulus/client/consensus/aura/src/collators/lookahead.rs +++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs @@ -33,46 +33,34 @@ use codec::{Codec, Encode}; use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; -use cumulus_client_consensus_common::{ - self as consensus_common, load_abridged_host_configuration, ParachainBlockImportMarker, - ParentSearchParams, -}; +use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; use cumulus_client_consensus_proposer::ProposerInterface; use cumulus_primitives_aura::AuraUnincludedSegmentApi; -use cumulus_primitives_core::{ - relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData, -}; +use cumulus_primitives_core::{CollectCollationInfo, PersistedValidationData}; use cumulus_relay_chain_interface::RelayChainInterface; use polkadot_node_primitives::SubmitCollationParams; -use polkadot_node_subsystem::messages::{ - CollationGenerationMessage, RuntimeApiMessage, RuntimeApiRequest, -}; +use polkadot_node_subsystem::messages::CollationGenerationMessage; use polkadot_overseer::Handle as OverseerHandle; -use polkadot_primitives::{ - AsyncBackingParams, CollatorPair, CoreIndex, CoreState, Id as ParaId, OccupiedCoreAssumption, -}; +use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption}; -use futures::{channel::oneshot, prelude::*}; +use futures::prelude::*; use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf}; use sc_consensus::BlockImport; -use sc_consensus_aura::standalone as aura_internal; use sp_api::ProvideRuntimeApi; use sp_application_crypto::AppPublic; use sp_blockchain::HeaderBackend; -use sp_consensus::SyncOracle; use sp_consensus_aura::{AuraApi, Slot}; use sp_core::crypto::Pair; use sp_inherents::CreateInherentDataProviders; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member}; -use sp_timestamp::Timestamp; use std::{sync::Arc, time::Duration}; -use crate::collator::{self as collator_util, SlotClaim}; +use crate::collator::{self as collator_util}; /// Parameters for [`run`]. -pub struct Params { +pub struct Params { /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. /// the timestamp, slot, and paras inherents should be omitted, as they are set by this /// collator. @@ -87,8 +75,6 @@ pub struct Params { pub relay_client: RClient, /// A validation code hash provider, used to get the current validation code hash. pub code_hash_provider: CHP, - /// A chain synchronization oracle. - pub sync_oracle: SO, /// The underlying keystore, which should contain Aura consensus keys. pub keystore: KeystorePtr, /// The collator key used to sign collations before submitting to validators. @@ -110,8 +96,8 @@ pub struct Params { } /// Run async-backing-friendly Aura. -pub fn run( - mut params: Params, +pub fn run( + mut params: Params, ) -> impl Future + Send + 'static where Block: BlockT, @@ -130,7 +116,6 @@ where CIDP: CreateInherentDataProviders + 'static, CIDP::InherentDataProviders: Send, BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, - SO: SyncOracle + Send + Sync + Clone + 'static, Proposer: ProposerInterface + Send + Sync + 'static, CS: CollatorServiceInterface + Send + Sync + 'static, CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, @@ -138,14 +123,6 @@ where P::Public: AppPublic + Member + Codec, P::Signature: TryFrom> + Member + Codec, { - // This is an arbitrary value which is likely guaranteed to exceed any reasonable - // limit, as it would correspond to 10 non-included blocks. - // - // Since we only search for parent blocks which have already been imported, - // we can guarantee that all imported blocks respect the unincluded segment - // rules specified by the parachain's runtime and thus will never be too deep. - const PARENT_SEARCH_DEPTH: usize = 10; - async move { cumulus_client_collator::initialize_collator_subsystems( &mut params.overseer_handle, @@ -186,12 +163,9 @@ where while let Some(relay_parent_header) = import_notifications.next().await { let relay_parent = relay_parent_header.hash(); - // TODO: Currently we use just the first core here, but for elastic scaling - // we iterate and build on all of the cores returned. - let core_index = if let Some(core_index) = cores_scheduled_for_para( + let core_index = if let Some(core_index) = super::cores_scheduled_for_para( relay_parent, params.para_id, - &mut params.overseer_handle, &mut params.relay_client, ) .await @@ -226,42 +200,16 @@ where }, }; - let parent_search_params = ParentSearchParams { + let (included_block, initial_parent) = match crate::collators::find_parent( relay_parent, - para_id: params.para_id, - ancestry_lookback: async_backing_params(relay_parent, ¶ms.relay_client) - .await - .map(|c| c.allowed_ancestry_len as usize) - .unwrap_or(0), - max_depth: PARENT_SEARCH_DEPTH, - ignore_alternative_branches: true, - }; - - let potential_parents = - cumulus_client_consensus_common::find_potential_parents::( - parent_search_params, - &*params.para_backend, - ¶ms.relay_client, - ) - .await; - - let mut potential_parents = match potential_parents { - Err(e) => { - tracing::error!( - target: crate::LOG_TARGET, - ?relay_parent, - err = ?e, - "Could not fetch potential parents to build upon" - ); - - continue - }, - Ok(x) => x, - }; - - let included_block = match potential_parents.iter().find(|x| x.depth == 0) { - None => continue, // also serves as an `is_empty` check. - Some(b) => b.hash, + params.para_id, + &*params.para_backend, + ¶ms.relay_client, + ) + .await + { + Some(value) => value, + None => continue, }; let para_client = &*params.para_client; @@ -292,7 +240,7 @@ where relay_chain_slot_duration = ?params.relay_chain_slot_duration, "Adjusted relay-chain slot to parachain slot" ); - Some(can_build_upon::<_, _, P>( + Some(super::can_build_upon::<_, _, P>( slot_now, timestamp, block_hash, @@ -302,13 +250,6 @@ where )) }; - // Sort by depth, ascending, to choose the longest chain. - // - // If the longest chain has space, build upon that. Otherwise, don't - // build at all. - potential_parents.sort_by_key(|a| a.depth); - let Some(initial_parent) = potential_parents.pop() else { continue }; - // Build in a loop until not allowed. Note that the authorities can change // at any block, so we need to re-claim our slot every time. let mut parent_hash = initial_parent.hash; @@ -435,124 +376,3 @@ where } } } - -// Checks if we own the slot at the given block and whether there -// is space in the unincluded segment. -async fn can_build_upon( - slot: Slot, - timestamp: Timestamp, - parent_hash: Block::Hash, - included_block: Block::Hash, - client: &Client, - keystore: &KeystorePtr, -) -> Option> -where - Client: ProvideRuntimeApi, - Client::Api: AuraApi + AuraUnincludedSegmentApi, - P: Pair, - P::Public: Codec, - P::Signature: Codec, -{ - let runtime_api = client.runtime_api(); - let authorities = runtime_api.authorities(parent_hash).ok()?; - let author_pub = aura_internal::claim_slot::

(slot, &authorities, keystore).await?; - - // Here we lean on the property that building on an empty unincluded segment must always - // be legal. Skipping the runtime API query here allows us to seamlessly run this - // collator against chains which have not yet upgraded their runtime. - if parent_hash != included_block { - if !runtime_api.can_build_upon(parent_hash, included_block, slot).ok()? { - return None - } - } - - Some(SlotClaim::unchecked::

(author_pub, slot, timestamp)) -} - -/// Reads async backing parameters from the relay chain storage at the given relay parent. -async fn async_backing_params( - relay_parent: PHash, - relay_client: &impl RelayChainInterface, -) -> Option { - match load_abridged_host_configuration(relay_parent, relay_client).await { - Ok(Some(config)) => Some(config.async_backing_params), - Ok(None) => { - tracing::error!( - target: crate::LOG_TARGET, - "Active config is missing in relay chain storage", - ); - None - }, - Err(err) => { - tracing::error!( - target: crate::LOG_TARGET, - ?err, - ?relay_parent, - "Failed to read active config from relay chain client", - ); - None - }, - } -} - -// Return all the cores assigned to the para at the provided relay parent. -async fn cores_scheduled_for_para( - relay_parent: PHash, - para_id: ParaId, - overseer_handle: &mut OverseerHandle, - relay_client: &impl RelayChainInterface, -) -> Vec { - // Get `AvailabilityCores` from runtime - let (tx, rx) = oneshot::channel(); - let request = RuntimeApiRequest::AvailabilityCores(tx); - overseer_handle - .send_msg(RuntimeApiMessage::Request(relay_parent, request), "LookaheadCollator") - .await; - - let cores = match rx.await { - Ok(Ok(cores)) => cores, - Ok(Err(error)) => { - tracing::error!( - target: crate::LOG_TARGET, - ?error, - ?relay_parent, - "Failed to query availability cores runtime API", - ); - return Vec::new() - }, - Err(oneshot::Canceled) => { - tracing::error!( - target: crate::LOG_TARGET, - ?relay_parent, - "Sender for availability cores runtime request dropped", - ); - return Vec::new() - }, - }; - - let max_candidate_depth = async_backing_params(relay_parent, relay_client) - .await - .map(|c| c.max_candidate_depth) - .unwrap_or(0); - - cores - .iter() - .enumerate() - .filter_map(|(index, core)| { - let core_para_id = match core { - CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id), - CoreState::Occupied(occupied_core) if max_candidate_depth >= 1 => occupied_core - .next_up_on_available - .as_ref() - .map(|scheduled_core| scheduled_core.para_id), - CoreState::Free | CoreState::Occupied(_) => None, - }; - - if core_para_id == Some(para_id) { - Some(CoreIndex(index as u32)) - } else { - None - } - }) - .collect() -} diff --git a/cumulus/client/consensus/aura/src/collators/mod.rs b/cumulus/client/consensus/aura/src/collators/mod.rs index 0abc034c1ed6..7d430ecdc727 100644 --- a/cumulus/client/consensus/aura/src/collators/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/mod.rs @@ -20,13 +20,35 @@ //! included parachain block, as well as the [`lookahead`] collator, which prospectively //! builds on parachain blocks which have not yet been included in the relay chain. +use crate::collator::SlotClaim; +use codec::Codec; +use cumulus_client_consensus_common::{ + self as consensus_common, load_abridged_host_configuration, ParentSearchParams, +}; +use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot}; +use cumulus_primitives_core::{relay_chain::Hash as ParaHash, BlockT}; use cumulus_relay_chain_interface::RelayChainInterface; use polkadot_primitives::{ - Hash as RHash, Id as ParaId, OccupiedCoreAssumption, ValidationCodeHash, + AsyncBackingParams, CoreIndex, CoreState, Hash as RelayHash, Id as ParaId, + OccupiedCoreAssumption, ValidationCodeHash, }; +use sc_consensus_aura::{standalone as aura_internal, AuraApi}; +use sp_api::ProvideRuntimeApi; +use sp_core::Pair; +use sp_keystore::KeystorePtr; +use sp_timestamp::Timestamp; pub mod basic; pub mod lookahead; +pub mod slot_based; + +// This is an arbitrary value which is likely guaranteed to exceed any reasonable +// limit, as it would correspond to 10 non-included blocks. +// +// Since we only search for parent blocks which have already been imported, +// we can guarantee that all imported blocks respect the unincluded segment +// rules specified by the parachain's runtime and thus will never be too deep. +const PARENT_SEARCH_DEPTH: usize = 10; /// Check the `local_validation_code_hash` against the validation code hash in the relay chain /// state. @@ -36,7 +58,7 @@ async fn check_validation_code_or_log( local_validation_code_hash: &ValidationCodeHash, para_id: ParaId, relay_client: &impl RelayChainInterface, - relay_parent: RHash, + relay_parent: RelayHash, ) { let state_validation_code_hash = match relay_client .validation_code_hash(relay_parent, para_id, OccupiedCoreAssumption::Included) @@ -77,3 +99,159 @@ async fn check_validation_code_or_log( }, } } + +/// Reads async backing parameters from the relay chain storage at the given relay parent. +async fn async_backing_params( + relay_parent: RelayHash, + relay_client: &impl RelayChainInterface, +) -> Option { + match load_abridged_host_configuration(relay_parent, relay_client).await { + Ok(Some(config)) => Some(config.async_backing_params), + Ok(None) => { + tracing::error!( + target: crate::LOG_TARGET, + "Active config is missing in relay chain storage", + ); + None + }, + Err(err) => { + tracing::error!( + target: crate::LOG_TARGET, + ?err, + ?relay_parent, + "Failed to read active config from relay chain client", + ); + None + }, + } +} + +// Return all the cores assigned to the para at the provided relay parent. +async fn cores_scheduled_for_para( + relay_parent: RelayHash, + para_id: ParaId, + relay_client: &impl RelayChainInterface, +) -> Vec { + // Get `AvailabilityCores` from runtime + let cores = match relay_client.availability_cores(relay_parent).await { + Ok(cores) => cores, + Err(error) => { + tracing::error!( + target: crate::LOG_TARGET, + ?error, + ?relay_parent, + "Failed to query availability cores runtime API", + ); + return Vec::new() + }, + }; + + let max_candidate_depth = async_backing_params(relay_parent, relay_client) + .await + .map(|c| c.max_candidate_depth) + .unwrap_or(0); + + cores + .iter() + .enumerate() + .filter_map(|(index, core)| { + let core_para_id = match core { + CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id), + CoreState::Occupied(occupied_core) if max_candidate_depth > 0 => occupied_core + .next_up_on_available + .as_ref() + .map(|scheduled_core| scheduled_core.para_id), + CoreState::Free | CoreState::Occupied(_) => None, + }; + + if core_para_id == Some(para_id) { + Some(CoreIndex(index as u32)) + } else { + None + } + }) + .collect() +} + +// Checks if we own the slot at the given block and whether there +// is space in the unincluded segment. +async fn can_build_upon( + slot: Slot, + timestamp: Timestamp, + parent_hash: Block::Hash, + included_block: Block::Hash, + client: &Client, + keystore: &KeystorePtr, +) -> Option> +where + Client: ProvideRuntimeApi, + Client::Api: AuraApi + AuraUnincludedSegmentApi, + P: Pair, + P::Public: Codec, + P::Signature: Codec, +{ + let runtime_api = client.runtime_api(); + let authorities = runtime_api.authorities(parent_hash).ok()?; + let author_pub = aura_internal::claim_slot::

(slot, &authorities, keystore).await?; + + // Here we lean on the property that building on an empty unincluded segment must always + // be legal. Skipping the runtime API query here allows us to seamlessly run this + // collator against chains which have not yet upgraded their runtime. + if parent_hash != included_block && + !runtime_api.can_build_upon(parent_hash, included_block, slot).ok()? + { + return None + } + + Some(SlotClaim::unchecked::

(author_pub, slot, timestamp)) +} + +/// Use [`cumulus_client_consensus_common::find_potential_parents`] to find parachain blocks that +/// we can build on. Once a list of potential parents is retrieved, return the last one of the +/// longest chain. +async fn find_parent( + relay_parent: ParaHash, + para_id: ParaId, + para_backend: &impl sc_client_api::Backend, + relay_client: &impl RelayChainInterface, +) -> Option<(::Hash, consensus_common::PotentialParent)> +where + Block: BlockT, +{ + let parent_search_params = ParentSearchParams { + relay_parent, + para_id, + ancestry_lookback: crate::collators::async_backing_params(relay_parent, relay_client) + .await + .map_or(0, |params| params.allowed_ancestry_len as usize), + max_depth: PARENT_SEARCH_DEPTH, + ignore_alternative_branches: true, + }; + + let potential_parents = cumulus_client_consensus_common::find_potential_parents::( + parent_search_params, + para_backend, + relay_client, + ) + .await; + + let potential_parents = match potential_parents { + Err(e) => { + tracing::error!( + target: crate::LOG_TARGET, + ?relay_parent, + err = ?e, + "Could not fetch potential parents to build upon" + ); + + return None + }, + Ok(x) => x, + }; + + let included_block = potential_parents.iter().find(|x| x.depth == 0)?.hash; + potential_parents + .into_iter() + .max_by_key(|a| a.depth) + .map(|parent| (included_block, parent)) +} diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs new file mode 100644 index 000000000000..1fbc0689da86 --- /dev/null +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs @@ -0,0 +1,491 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use codec::{Codec, Encode}; + +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_primitives_aura::AuraUnincludedSegmentApi; +use cumulus_primitives_core::{CollectCollationInfo, PersistedValidationData}; +use cumulus_relay_chain_interface::RelayChainInterface; + +use polkadot_primitives::{ + BlockId, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId, + OccupiedCoreAssumption, +}; + +use futures::prelude::*; +use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider}; +use sc_consensus::BlockImport; +use sp_api::ProvideRuntimeApi; +use sp_application_crypto::AppPublic; +use sp_blockchain::HeaderBackend; +use sp_consensus_aura::{AuraApi, Slot, SlotDuration}; +use sp_core::crypto::Pair; +use sp_inherents::CreateInherentDataProviders; +use sp_keystore::KeystorePtr; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member}; +use sp_timestamp::Timestamp; +use std::{sync::Arc, time::Duration}; + +use super::CollatorMessage; +use crate::{ + collator::{self as collator_util}, + collators::{check_validation_code_or_log, cores_scheduled_for_para}, + LOG_TARGET, +}; + +/// Parameters for [`run_block_builder`]. +pub struct BuilderTaskParams< + Block: BlockT, + BI, + CIDP, + Client, + Backend, + RelayClient, + CHP, + Proposer, + CS, +> { + /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. + /// the timestamp, slot, and paras inherents should be omitted, as they are set by this + /// collator. + pub create_inherent_data_providers: CIDP, + /// Used to actually import blocks. + pub block_import: BI, + /// The underlying para client. + pub para_client: Arc, + /// The para client's backend, used to access the database. + pub para_backend: Arc, + /// A handle to the relay-chain client. + pub relay_client: RelayClient, + /// A validation code hash provider, used to get the current validation code hash. + pub code_hash_provider: CHP, + /// The underlying keystore, which should contain Aura consensus keys. + pub keystore: KeystorePtr, + /// The para's ID. + pub para_id: ParaId, + /// The underlying block proposer this should call into. + pub proposer: Proposer, + /// The generic collator service used to plug into this consensus engine. + pub collator_service: CS, + /// The amount of time to spend authoring each block. + pub authoring_duration: Duration, + /// Channel to send built blocks to the collation task. + pub collator_sender: sc_utils::mpsc::TracingUnboundedSender>, + /// Slot duration of the relay chain + pub relay_chain_slot_duration: Duration, + /// Drift every slot by this duration. + /// This is a time quantity that is subtracted from the actual timestamp when computing + /// the time left to enter a new slot. In practice, this *left-shifts* the clock time with the + /// intent to keep our "clock" slightly behind the relay chain one and thus reducing the + /// likelihood of encountering unfavorable notification arrival timings (i.e. we don't want to + /// wait for relay chain notifications because we woke up too early). + pub slot_drift: Duration, +} + +#[derive(Debug)] +struct SlotInfo { + pub timestamp: Timestamp, + pub slot: Slot, + pub slot_duration: SlotDuration, +} + +#[derive(Debug)] +struct SlotTimer { + client: Arc, + drift: Duration, + _marker: std::marker::PhantomData<(Block, Box)>, +} + +/// Returns current duration since Unix epoch. +fn duration_now() -> Duration { + use std::time::SystemTime; + let now = SystemTime::now(); + now.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_else(|e| { + panic!("Current time {:?} is before Unix epoch. Something is wrong: {:?}", now, e) + }) +} + +/// Returns the duration until the next slot from now. +fn time_until_next_slot(slot_duration: Duration, drift: Duration) -> Duration { + let now = duration_now().as_millis() - drift.as_millis(); + + let next_slot = (now + slot_duration.as_millis()) / slot_duration.as_millis(); + let remaining_millis = next_slot * slot_duration.as_millis() - now; + Duration::from_millis(remaining_millis as u64) +} + +impl SlotTimer +where + Block: BlockT, + Client: ProvideRuntimeApi + Send + Sync + 'static + UsageProvider, + Client::Api: AuraApi, + P: Pair, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, +{ + pub fn new_with_drift(client: Arc, drift: Duration) -> Self { + Self { client, drift, _marker: Default::default() } + } + + /// Returns a future that resolves when the next slot arrives. + pub async fn wait_until_next_slot(&self) -> Result { + let Ok(slot_duration) = crate::slot_duration(&*self.client) else { + tracing::error!(target: crate::LOG_TARGET, "Failed to fetch slot duration from runtime."); + return Err(()) + }; + + let time_until_next_slot = time_until_next_slot(slot_duration.as_duration(), self.drift); + tokio::time::sleep(time_until_next_slot).await; + let timestamp = sp_timestamp::Timestamp::current(); + Ok(SlotInfo { + slot: Slot::from_timestamp(timestamp, slot_duration), + timestamp, + slot_duration, + }) + } +} + +/// Run block-builder. +pub fn run_block_builder( + params: BuilderTaskParams, +) -> impl Future + Send + 'static +where + Block: BlockT, + Client: ProvideRuntimeApi + + UsageProvider + + BlockOf + + AuxStore + + HeaderBackend + + BlockBackend + + Send + + Sync + + 'static, + Client::Api: + AuraApi + CollectCollationInfo + AuraUnincludedSegmentApi, + Backend: sc_client_api::Backend + 'static, + RelayClient: RelayChainInterface + Clone + 'static, + CIDP: CreateInherentDataProviders + 'static, + CIDP::InherentDataProviders: Send, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + Proposer: ProposerInterface + Send + Sync + 'static, + CS: CollatorServiceInterface + Send + Sync + 'static, + CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, + P: Pair, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, +{ + async move { + tracing::info!(target: LOG_TARGET, "Starting slot-based block-builder task."); + let BuilderTaskParams { + relay_client, + create_inherent_data_providers, + para_client, + keystore, + block_import, + para_id, + proposer, + collator_service, + collator_sender, + code_hash_provider, + authoring_duration, + para_backend, + relay_chain_slot_duration, + slot_drift, + } = params; + + let slot_timer = SlotTimer::<_, _, P>::new_with_drift(para_client.clone(), slot_drift); + + let mut collator = { + let params = collator_util::Params { + create_inherent_data_providers, + block_import, + relay_client: relay_client.clone(), + keystore: keystore.clone(), + para_id, + proposer, + collator_service, + }; + + collator_util::Collator::::new(params) + }; + + let mut relay_chain_fetcher = RelayChainCachingFetcher::new(relay_client.clone(), para_id); + + loop { + // We wait here until the next slot arrives. + let Ok(para_slot) = slot_timer.wait_until_next_slot().await else { + return; + }; + + let Some(expected_cores) = + expected_core_count(relay_chain_slot_duration, para_slot.slot_duration) + else { + return + }; + + let Ok(RelayChainData { + relay_parent_header, + max_pov_size, + relay_parent_hash: relay_parent, + scheduled_cores, + }) = relay_chain_fetcher.get_relay_chain_data().await + else { + continue; + }; + + if scheduled_cores.is_empty() { + tracing::debug!(target: LOG_TARGET, "Parachain not scheduled, skipping slot."); + continue; + } + + let core_index_in_scheduled: u64 = *para_slot.slot % expected_cores; + let Some(core_index) = scheduled_cores.get(core_index_in_scheduled as usize) else { + tracing::debug!(target: LOG_TARGET, core_index_in_scheduled, core_len = scheduled_cores.len(), "Para is scheduled, but not enough cores available."); + continue; + }; + + let Some((included_block, parent)) = + crate::collators::find_parent(relay_parent, para_id, &*para_backend, &relay_client) + .await + else { + continue + }; + + let parent_header = parent.header; + let parent_hash = parent.hash; + + // We mainly call this to inform users at genesis if there is a mismatch with the + // on-chain data. + collator.collator_service().check_block_status(parent_hash, &parent_header); + + let slot_claim = match crate::collators::can_build_upon::<_, _, P>( + para_slot.slot, + para_slot.timestamp, + parent_hash, + included_block, + &*para_client, + &keystore, + ) + .await + { + Some(slot) => slot, + None => { + tracing::debug!( + target: crate::LOG_TARGET, + ?core_index, + slot_info = ?para_slot, + unincluded_segment_len = parent.depth, + relay_parent = %relay_parent, + included = %included_block, + parent = %parent_hash, + "Not building block." + ); + continue + }, + }; + + tracing::debug!( + target: crate::LOG_TARGET, + ?core_index, + slot_info = ?para_slot, + unincluded_segment_len = parent.depth, + relay_parent = %relay_parent, + included = %included_block, + parent = %parent_hash, + "Building block." + ); + + let validation_data = PersistedValidationData { + parent_head: parent_header.encode().into(), + relay_parent_number: *relay_parent_header.number(), + relay_parent_storage_root: *relay_parent_header.state_root(), + max_pov_size, + }; + + let (parachain_inherent_data, other_inherent_data) = match collator + .create_inherent_data( + relay_parent, + &validation_data, + parent_hash, + slot_claim.timestamp(), + ) + .await + { + Err(err) => { + tracing::error!(target: crate::LOG_TARGET, ?err); + break + }, + Ok(x) => x, + }; + + let validation_code_hash = match code_hash_provider.code_hash_at(parent_hash) { + None => { + tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash"); + break + }, + Some(v) => v, + }; + + check_validation_code_or_log( + &validation_code_hash, + para_id, + &relay_client, + relay_parent, + ) + .await; + + let Ok(Some(candidate)) = collator + .build_block_and_import( + &parent_header, + &slot_claim, + None, + (parachain_inherent_data, other_inherent_data), + authoring_duration, + // Set the block limit to 50% of the maximum PoV size. + // + // TODO: If we got benchmarking that includes the proof size, + // we should be able to use the maximum pov size. + (validation_data.max_pov_size / 2) as usize, + ) + .await + else { + tracing::error!(target: crate::LOG_TARGET, "Unable to build block at slot."); + continue; + }; + + let new_block_hash = candidate.block.header().hash(); + + // Announce the newly built block to our peers. + collator.collator_service().announce_block(new_block_hash, None); + + if let Err(err) = collator_sender.unbounded_send(CollatorMessage { + relay_parent, + parent_header, + parachain_candidate: candidate, + validation_code_hash, + core_index: *core_index, + }) { + tracing::error!(target: crate::LOG_TARGET, ?err, "Unable to send block to collation task."); + return + } + } + } +} + +/// Calculate the expected core count based on the slot duration of the relay and parachain. +/// +/// If `slot_duration` is smaller than `relay_chain_slot_duration` that means that we produce more +/// than one parachain block per relay chain block. In order to get these backed, we need multiple +/// cores. This method calculates how many cores we should expect to have scheduled under the +/// assumption that we have a fixed number of cores assigned to our parachain. +fn expected_core_count( + relay_chain_slot_duration: Duration, + slot_duration: SlotDuration, +) -> Option { + let slot_duration_millis = slot_duration.as_millis(); + u64::try_from(relay_chain_slot_duration.as_millis()) + .map_err(|e| tracing::error!("Unable to calculate expected parachain core count: {e}")) + .map(|relay_slot_duration| (relay_slot_duration / slot_duration_millis).max(1)) + .ok() +} + +/// Contains relay chain data necessary for parachain block building. +#[derive(Clone)] +struct RelayChainData { + /// Current relay chain parent header. + pub relay_parent_header: RelayHeader, + /// The cores this para is scheduled on in the context of the relay parent. + pub scheduled_cores: Vec, + /// Maximum configured PoV size on the relay chain. + pub max_pov_size: u32, + /// Current relay chain parent header. + pub relay_parent_hash: RelayHash, +} + +/// Simple helper to fetch relay chain data and cache it based on the current relay chain best block +/// hash. +struct RelayChainCachingFetcher { + relay_client: RI, + para_id: ParaId, + last_data: Option<(RelayHash, RelayChainData)>, +} + +impl RelayChainCachingFetcher +where + RI: RelayChainInterface + Clone + 'static, +{ + pub fn new(relay_client: RI, para_id: ParaId) -> Self { + Self { relay_client, para_id, last_data: None } + } + + /// Fetch required [`RelayChainData`] from the relay chain. + /// If this data has been fetched in the past for the incoming hash, it will reuse + /// cached data. + pub async fn get_relay_chain_data(&mut self) -> Result { + let Ok(relay_parent) = self.relay_client.best_block_hash().await else { + tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash."); + return Err(()) + }; + + match &self.last_data { + Some((last_seen_hash, data)) if *last_seen_hash == relay_parent => { + tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Using cached data for relay parent."); + Ok(data.clone()) + }, + _ => { + tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Relay chain best block changed, fetching new data from relay chain."); + let data = self.update_for_relay_parent(relay_parent).await?; + self.last_data = Some((relay_parent, data.clone())); + Ok(data) + }, + } + } + + /// Fetch fresh data from the relay chain for the given relay parent hash. + async fn update_for_relay_parent(&self, relay_parent: RelayHash) -> Result { + let scheduled_cores = + cores_scheduled_for_para(relay_parent, self.para_id, &self.relay_client).await; + let Ok(Some(relay_parent_header)) = + self.relay_client.header(BlockId::Hash(relay_parent)).await + else { + tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block header."); + return Err(()) + }; + + let max_pov_size = match self + .relay_client + .persisted_validation_data(relay_parent, self.para_id, OccupiedCoreAssumption::Included) + .await + { + Ok(None) => return Err(()), + Ok(Some(pvd)) => pvd.max_pov_size, + Err(err) => { + tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client"); + return Err(()) + }, + }; + + Ok(RelayChainData { + relay_parent_hash: relay_parent, + relay_parent_header, + scheduled_cores, + max_pov_size, + }) + } +} diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs new file mode 100644 index 000000000000..5b8151f6302c --- /dev/null +++ b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs @@ -0,0 +1,140 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use codec::Encode; + +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_relay_chain_interface::RelayChainInterface; + +use polkadot_node_primitives::{MaybeCompressedPoV, SubmitCollationParams}; +use polkadot_node_subsystem::messages::CollationGenerationMessage; +use polkadot_overseer::Handle as OverseerHandle; +use polkadot_primitives::{CollatorPair, Id as ParaId}; + +use futures::prelude::*; + +use sc_utils::mpsc::TracingUnboundedReceiver; +use sp_runtime::traits::{Block as BlockT, Header}; + +use super::CollatorMessage; + +const LOG_TARGET: &str = "aura::cumulus::collation_task"; + +/// Parameters for the collation task. +pub struct Params { + /// A handle to the relay-chain client. + pub relay_client: RClient, + /// The collator key used to sign collations before submitting to validators. + pub collator_key: CollatorPair, + /// The para's ID. + pub para_id: ParaId, + /// Whether we should reinitialize the collator config (i.e. we are transitioning to aura). + pub reinitialize: bool, + /// Collator service interface + pub collator_service: CS, + /// Receiver channel for communication with the block builder task. + pub collator_receiver: TracingUnboundedReceiver>, +} + +/// Asynchronously executes the collation task for a parachain. +/// +/// This function initializes the collator subsystems necessary for producing and submitting +/// collations to the relay chain. It listens for new best relay chain block notifications and +/// handles collator messages. If our parachain is scheduled on a core and we have a candidate, +/// the task will build a collation and send it to the relay chain. +pub async fn run_collation_task(mut params: Params) +where + Block: BlockT, + CS: CollatorServiceInterface + Send + Sync + 'static, + RClient: RelayChainInterface + Clone + 'static, +{ + let Ok(mut overseer_handle) = params.relay_client.overseer_handle() else { + tracing::error!(target: LOG_TARGET, "Failed to get overseer handle."); + return + }; + + cumulus_client_collator::initialize_collator_subsystems( + &mut overseer_handle, + params.collator_key, + params.para_id, + params.reinitialize, + ) + .await; + + let collator_service = params.collator_service; + while let Some(collator_message) = params.collator_receiver.next().await { + handle_collation_message(collator_message, &collator_service, &mut overseer_handle).await; + } +} + +/// Handle an incoming collation message from the block builder task. +/// This builds the collation from the [`CollatorMessage`] and submits it to +/// the collation-generation subsystem of the relay chain. +async fn handle_collation_message( + message: CollatorMessage, + collator_service: &impl CollatorServiceInterface, + overseer_handle: &mut OverseerHandle, +) { + let CollatorMessage { + parent_header, + parachain_candidate, + validation_code_hash, + relay_parent, + core_index, + } = message; + + let hash = parachain_candidate.block.header().hash(); + let number = *parachain_candidate.block.header().number(); + let (collation, block_data) = + match collator_service.build_collation(&parent_header, hash, parachain_candidate) { + Some(collation) => collation, + None => { + tracing::warn!(target: LOG_TARGET, %hash, ?number, ?core_index, "Unable to build collation."); + return; + }, + }; + + tracing::info!( + target: LOG_TARGET, + "PoV size {{ header: {:.2}kB, extrinsics: {:.2}kB, storage_proof: {:.2}kB }}", + block_data.header().encoded_size() as f64 / 1024f64, + block_data.extrinsics().encoded_size() as f64 / 1024f64, + block_data.storage_proof().encoded_size() as f64 / 1024f64, + ); + + if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { + tracing::info!( + target: LOG_TARGET, + "Compressed PoV size: {}kb", + pov.block_data.0.len() as f64 / 1024f64, + ); + } + + tracing::debug!(target: LOG_TARGET, ?core_index, %hash, %number, "Submitting collation for core."); + overseer_handle + .send_msg( + CollationGenerationMessage::SubmitCollation(SubmitCollationParams { + relay_parent, + collation, + parent_head: parent_header.encode().into(), + validation_code_hash, + core_index, + result_sender: None, + }), + "SubmitCollation", + ) + .await; +} diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs new file mode 100644 index 000000000000..0fe49d58d25b --- /dev/null +++ b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs @@ -0,0 +1,178 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! A collator for Aura that looks ahead of the most recently included parachain block +//! when determining what to build upon. +//! +//! The block building mechanism consists of two parts: +//! 1. A block-builder task that builds parachain blocks at each of our slots. +//! 2. A collator task that transforms the blocks into a collation and submits them to the relay +//! chain. +//! +//! Blocks are built on every parachain slot if there is a core scheduled on the relay chain. At the +//! beginning of each block building loop, we determine how many blocks we expect to build per relay +//! chain block. The collator implementation then expects that we have that many cores scheduled +//! during the relay chain block. After the block is built, the block builder task sends it to +//! the collation task which compresses it and submits it to the collation-generation subsystem. + +use codec::Codec; +use consensus_common::ParachainCandidate; +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_primitives_aura::AuraUnincludedSegmentApi; +use cumulus_primitives_core::CollectCollationInfo; +use cumulus_relay_chain_interface::RelayChainInterface; +use polkadot_primitives::{ + CollatorPair, CoreIndex, Hash as RelayHash, Id as ParaId, ValidationCodeHash, +}; + +use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider}; +use sc_consensus::BlockImport; +use sc_utils::mpsc::tracing_unbounded; + +use sp_api::ProvideRuntimeApi; +use sp_application_crypto::AppPublic; +use sp_blockchain::HeaderBackend; +use sp_consensus_aura::AuraApi; +use sp_core::crypto::Pair; +use sp_inherents::CreateInherentDataProviders; +use sp_keystore::KeystorePtr; +use sp_runtime::traits::{Block as BlockT, Member}; + +use std::{sync::Arc, time::Duration}; + +use self::{block_builder_task::run_block_builder, collation_task::run_collation_task}; + +mod block_builder_task; +mod collation_task; + +/// Parameters for [`run`]. +pub struct Params { + /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. + /// the timestamp, slot, and paras inherents should be omitted, as they are set by this + /// collator. + pub create_inherent_data_providers: CIDP, + /// Used to actually import blocks. + pub block_import: BI, + /// The underlying para client. + pub para_client: Arc, + /// The para client's backend, used to access the database. + pub para_backend: Arc, + /// A handle to the relay-chain client. + pub relay_client: RClient, + /// A validation code hash provider, used to get the current validation code hash. + pub code_hash_provider: CHP, + /// The underlying keystore, which should contain Aura consensus keys. + pub keystore: KeystorePtr, + /// The collator key used to sign collations before submitting to validators. + pub collator_key: CollatorPair, + /// The para's ID. + pub para_id: ParaId, + /// The length of slots in the relay chain. + pub relay_chain_slot_duration: Duration, + /// The underlying block proposer this should call into. + pub proposer: Proposer, + /// The generic collator service used to plug into this consensus engine. + pub collator_service: CS, + /// The amount of time to spend authoring each block. + pub authoring_duration: Duration, + /// Whether we should reinitialize the collator config (i.e. we are transitioning to aura). + pub reinitialize: bool, + /// Drift slots by a fixed duration. This can be used to create more preferrable authoring + /// timings. + pub slot_drift: Duration, +} + +/// Run aura-based block building and collation task. +pub fn run( + params: Params, +) -> (impl futures::Future, impl futures::Future) +where + Block: BlockT, + Client: ProvideRuntimeApi + + BlockOf + + AuxStore + + HeaderBackend + + BlockBackend + + UsageProvider + + Send + + Sync + + 'static, + Client::Api: + AuraApi + CollectCollationInfo + AuraUnincludedSegmentApi, + Backend: sc_client_api::Backend + 'static, + RClient: RelayChainInterface + Clone + 'static, + CIDP: CreateInherentDataProviders + 'static, + CIDP::InherentDataProviders: Send, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + Proposer: ProposerInterface + Send + Sync + 'static, + CS: CollatorServiceInterface + Send + Sync + Clone + 'static, + CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, + P: Pair + 'static, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, +{ + let (tx, rx) = tracing_unbounded("mpsc_builder_to_collator", 100); + let collator_task_params = collation_task::Params { + relay_client: params.relay_client.clone(), + collator_key: params.collator_key, + para_id: params.para_id, + reinitialize: params.reinitialize, + collator_service: params.collator_service.clone(), + collator_receiver: rx, + }; + + let collation_task_fut = run_collation_task::(collator_task_params); + + let block_builder_params = block_builder_task::BuilderTaskParams { + create_inherent_data_providers: params.create_inherent_data_providers, + block_import: params.block_import, + para_client: params.para_client, + para_backend: params.para_backend, + relay_client: params.relay_client, + code_hash_provider: params.code_hash_provider, + keystore: params.keystore, + para_id: params.para_id, + proposer: params.proposer, + collator_service: params.collator_service, + authoring_duration: params.authoring_duration, + collator_sender: tx, + relay_chain_slot_duration: params.relay_chain_slot_duration, + slot_drift: params.slot_drift, + }; + + let block_builder_fut = + run_block_builder::(block_builder_params); + + (collation_task_fut, block_builder_fut) +} + +/// Message to be sent from the block builder to the collation task. +/// +/// Contains all data necessary to submit a collation to the relay chain. +struct CollatorMessage { + /// The hash of the relay chain block that provides the context for the parachain block. + pub relay_parent: RelayHash, + /// The header of the parent block. + pub parent_header: Block::Header, + /// The parachain block candidate. + pub parachain_candidate: ParachainCandidate, + /// The validation code hash at the parent block. + pub validation_code_hash: ValidationCodeHash, + /// Core index that this block should be submitted on + pub core_index: CoreIndex, +} diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs index 2b0d8290182a..e12750dcc553 100644 --- a/cumulus/client/consensus/common/src/lib.rs +++ b/cumulus/client/consensus/common/src/lib.rs @@ -19,16 +19,13 @@ use polkadot_primitives::{ Block as PBlock, Hash as PHash, Header as PHeader, PersistedValidationData, ValidationCodeHash, }; -use cumulus_primitives_core::{ - relay_chain::{self, BlockId as RBlockId, OccupiedCoreAssumption}, - AbridgedHostConfiguration, ParaId, -}; +use cumulus_primitives_core::{relay_chain, AbridgedHostConfiguration}; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface}; -use sc_client_api::{Backend, HeaderBackend}; +use sc_client_api::Backend; use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult}; -use sp_blockchain::Backend as BlockchainBackend; use sp_consensus_slots::Slot; + use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use sp_timestamp::Timestamp; @@ -36,9 +33,12 @@ use std::{sync::Arc, time::Duration}; mod level_monitor; mod parachain_consensus; +mod parent_search; #[cfg(test)] mod tests; +pub use parent_search::*; + pub use parachain_consensus::run_parachain_consensus; use level_monitor::LevelMonitor; @@ -229,196 +229,6 @@ pub trait ParachainBlockImportMarker {} impl ParachainBlockImportMarker for ParachainBlockImport {} -/// Parameters when searching for suitable parents to build on top of. -#[derive(Debug)] -pub struct ParentSearchParams { - /// The relay-parent that is intended to be used. - pub relay_parent: PHash, - /// The ID of the parachain. - pub para_id: ParaId, - /// A limitation on the age of relay parents for parachain blocks that are being - /// considered. This is relative to the `relay_parent` number. - pub ancestry_lookback: usize, - /// How "deep" parents can be relative to the included parachain block at the relay-parent. - /// The included block has depth 0. - pub max_depth: usize, - /// Whether to only ignore "alternative" branches, i.e. branches of the chain - /// which do not contain the block pending availability. - pub ignore_alternative_branches: bool, -} - -/// A potential parent block returned from [`find_potential_parents`] -#[derive(Debug, PartialEq)] -pub struct PotentialParent { - /// The hash of the block. - pub hash: B::Hash, - /// The header of the block. - pub header: B::Header, - /// The depth of the block. - pub depth: usize, - /// Whether the block is the included block, is itself pending on-chain, or descends - /// from the block pending availability. - pub aligned_with_pending: bool, -} - -/// Perform a recursive search through blocks to find potential -/// parent blocks for a new block. -/// -/// This accepts a relay-chain block to be used as an anchor and a maximum search depth, -/// along with some arguments for filtering parachain blocks and performs a recursive search -/// for parachain blocks. The search begins at the last included parachain block and returns -/// a set of [`PotentialParent`]s which could be potential parents of a new block with this -/// relay-parent according to the search parameters. -/// -/// A parachain block is a potential parent if it is either the last included parachain block, the -/// pending parachain block (when `max_depth` >= 1), or all of the following hold: -/// * its parent is a potential parent -/// * its relay-parent is within `ancestry_lookback` of the targeted relay-parent. -/// * its relay-parent is within the same session as the targeted relay-parent. -/// * the block number is within `max_depth` blocks of the included block -pub async fn find_potential_parents( - params: ParentSearchParams, - client: &impl Backend, - relay_client: &impl RelayChainInterface, -) -> Result>, RelayChainError> { - // 1. Build up the ancestry record of the relay chain to compare against. - let rp_ancestry = { - let mut ancestry = Vec::with_capacity(params.ancestry_lookback + 1); - let mut current_rp = params.relay_parent; - let mut required_session = None; - - while ancestry.len() <= params.ancestry_lookback { - let header = match relay_client.header(RBlockId::hash(current_rp)).await? { - None => break, - Some(h) => h, - }; - - let session = relay_client.session_index_for_child(current_rp).await?; - if let Some(required_session) = required_session { - // Respect the relay-chain rule not to cross session boundaries. - if session != required_session { - break - } - } else { - required_session = Some(session); - } - - ancestry.push((current_rp, *header.state_root())); - current_rp = *header.parent_hash(); - - // don't iterate back into the genesis block. - if header.number == 1 { - break - } - } - - ancestry - }; - - let is_hash_in_ancestry = |hash| rp_ancestry.iter().any(|x| x.0 == hash); - let is_root_in_ancestry = |root| rp_ancestry.iter().any(|x| x.1 == root); - - // 2. Get the included and pending availability blocks. - let included_header = relay_client - .persisted_validation_data( - params.relay_parent, - params.para_id, - OccupiedCoreAssumption::TimedOut, - ) - .await?; - - let included_header = match included_header { - Some(pvd) => pvd.parent_head, - None => return Ok(Vec::new()), // this implies the para doesn't exist. - }; - - let pending_header = relay_client - .persisted_validation_data( - params.relay_parent, - params.para_id, - OccupiedCoreAssumption::Included, - ) - .await? - .and_then(|x| if x.parent_head != included_header { Some(x.parent_head) } else { None }); - - let included_header = match B::Header::decode(&mut &included_header.0[..]).ok() { - None => return Ok(Vec::new()), - Some(x) => x, - }; - // Silently swallow if pending block can't decode. - let pending_header = pending_header.and_then(|p| B::Header::decode(&mut &p.0[..]).ok()); - let included_hash = included_header.hash(); - let pending_hash = pending_header.as_ref().map(|hdr| hdr.hash()); - - let mut frontier = vec![PotentialParent:: { - hash: included_hash, - header: included_header, - depth: 0, - aligned_with_pending: true, - }]; - - // Recursive search through descendants of the included block which have acceptable - // relay parents. - let mut potential_parents = Vec::new(); - while let Some(entry) = frontier.pop() { - let is_pending = - entry.depth == 1 && pending_hash.as_ref().map_or(false, |h| &entry.hash == h); - let is_included = entry.depth == 0; - - // note: even if the pending block or included block have a relay parent - // outside of the expected part of the relay chain, they are always allowed - // because they have already been posted on chain. - let is_potential = is_pending || is_included || { - let digest = entry.header.digest(); - cumulus_primitives_core::extract_relay_parent(digest).map_or(false, is_hash_in_ancestry) || - cumulus_primitives_core::rpsr_digest::extract_relay_parent_storage_root(digest) - .map(|(r, _n)| r) - .map_or(false, is_root_in_ancestry) - }; - - let parent_aligned_with_pending = entry.aligned_with_pending; - let child_depth = entry.depth + 1; - let hash = entry.hash; - - if is_potential { - potential_parents.push(entry); - } - - if !is_potential || child_depth > params.max_depth { - continue - } - - // push children onto search frontier. - for child in client.blockchain().children(hash).ok().into_iter().flatten() { - let aligned_with_pending = parent_aligned_with_pending && - if child_depth == 1 { - pending_hash.as_ref().map_or(true, |h| &child == h) - } else { - true - }; - - if params.ignore_alternative_branches && !aligned_with_pending { - continue - } - - let header = match client.blockchain().header(child) { - Ok(Some(h)) => h, - Ok(None) => continue, - Err(_) => continue, - }; - - frontier.push(PotentialParent { - hash: child, - header, - depth: child_depth, - aligned_with_pending, - }); - } - } - - Ok(potential_parents) -} - /// Get the relay-parent slot and timestamp from a header. pub fn relay_slot_and_timestamp( relay_parent_header: &PHeader, diff --git a/cumulus/client/consensus/common/src/parachain_consensus.rs b/cumulus/client/consensus/common/src/parachain_consensus.rs index b4b315bb32be..944917673b11 100644 --- a/cumulus/client/consensus/common/src/parachain_consensus.rs +++ b/cumulus/client/consensus/common/src/parachain_consensus.rs @@ -375,60 +375,61 @@ async fn handle_new_best_parachain_head( target: LOG_TARGET, block_hash = ?hash, "Skipping set new best block, because block is already the best.", - ) - } else { - // Make sure the block is already known or otherwise we skip setting new best. - match parachain.block_status(hash) { - Ok(BlockStatus::InChainWithState) => { - unset_best_header.take(); - tracing::debug!( - target: LOG_TARGET, - ?hash, - "Importing block as new best for parachain.", - ); - import_block_as_new_best(hash, parachain_head, parachain).await; - }, - Ok(BlockStatus::InChainPruned) => { - tracing::error!( - target: LOG_TARGET, - block_hash = ?hash, - "Trying to set pruned block as new best!", - ); - }, - Ok(BlockStatus::Unknown) => { - *unset_best_header = Some(parachain_head); + ); + return; + } - tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Parachain block not yet imported, waiting for import to enact as best block.", - ); - - if let Some(ref mut recovery_chan_tx) = recovery_chan_tx { - // Best effort channel to actively encourage block recovery. - // An error here is not fatal; the relay chain continuously re-announces - // the best block, thus we will have other opportunities to retry. - let req = RecoveryRequest { hash, kind: RecoveryKind::Full }; - if let Err(err) = recovery_chan_tx.try_send(req) { - tracing::warn!( - target: LOG_TARGET, - block_hash = ?hash, - error = ?err, - "Unable to notify block recovery subsystem" - ) - } + // Make sure the block is already known or otherwise we skip setting new best. + match parachain.block_status(hash) { + Ok(BlockStatus::InChainWithState) => { + unset_best_header.take(); + tracing::debug!( + target: LOG_TARGET, + included = ?hash, + "Importing block as new best for parachain.", + ); + import_block_as_new_best(hash, parachain_head, parachain).await; + }, + Ok(BlockStatus::InChainPruned) => { + tracing::error!( + target: LOG_TARGET, + block_hash = ?hash, + "Trying to set pruned block as new best!", + ); + }, + Ok(BlockStatus::Unknown) => { + *unset_best_header = Some(parachain_head); + + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Parachain block not yet imported, waiting for import to enact as best block.", + ); + + if let Some(ref mut recovery_chan_tx) = recovery_chan_tx { + // Best effort channel to actively encourage block recovery. + // An error here is not fatal; the relay chain continuously re-announces + // the best block, thus we will have other opportunities to retry. + let req = RecoveryRequest { hash, kind: RecoveryKind::Full }; + if let Err(err) = recovery_chan_tx.try_send(req) { + tracing::warn!( + target: LOG_TARGET, + block_hash = ?hash, + error = ?err, + "Unable to notify block recovery subsystem" + ) } - }, - Err(e) => { - tracing::error!( - target: LOG_TARGET, - block_hash = ?hash, - error = ?e, - "Failed to get block status of block.", - ); - }, - _ => {}, - } + } + }, + Err(e) => { + tracing::error!( + target: LOG_TARGET, + block_hash = ?hash, + error = ?e, + "Failed to get block status of block.", + ); + }, + _ => {}, } } diff --git a/cumulus/client/consensus/common/src/parent_search.rs b/cumulus/client/consensus/common/src/parent_search.rs new file mode 100644 index 000000000000..c371ec62f845 --- /dev/null +++ b/cumulus/client/consensus/common/src/parent_search.rs @@ -0,0 +1,418 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use codec::Decode; +use polkadot_primitives::Hash as RelayHash; + +use cumulus_primitives_core::{ + relay_chain::{BlockId as RBlockId, OccupiedCoreAssumption}, + ParaId, +}; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface}; + +use sc_client_api::{Backend, HeaderBackend}; + +use sp_blockchain::{Backend as BlockchainBackend, TreeRoute}; + +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; + +const PARENT_SEARCH_LOG_TARGET: &str = "consensus::common::find_potential_parents"; + +/// Parameters when searching for suitable parents to build on top of. +#[derive(Debug)] +pub struct ParentSearchParams { + /// The relay-parent that is intended to be used. + pub relay_parent: RelayHash, + /// The ID of the parachain. + pub para_id: ParaId, + /// A limitation on the age of relay parents for parachain blocks that are being + /// considered. This is relative to the `relay_parent` number. + pub ancestry_lookback: usize, + /// How "deep" parents can be relative to the included parachain block at the relay-parent. + /// The included block has depth 0. + pub max_depth: usize, + /// Whether to only ignore "alternative" branches, i.e. branches of the chain + /// which do not contain the block pending availability. + pub ignore_alternative_branches: bool, +} + +/// A potential parent block returned from [`find_potential_parents`] +#[derive(PartialEq)] +pub struct PotentialParent { + /// The hash of the block. + pub hash: B::Hash, + /// The header of the block. + pub header: B::Header, + /// The depth of the block with respect to the included block. + pub depth: usize, + /// Whether the block is the included block, is itself pending on-chain, or descends + /// from the block pending availability. + pub aligned_with_pending: bool, +} + +impl std::fmt::Debug for PotentialParent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PotentialParent") + .field("hash", &self.hash) + .field("depth", &self.depth) + .field("aligned_with_pending", &self.aligned_with_pending) + .field("number", &self.header.number()) + .finish() + } +} + +/// Perform a recursive search through blocks to find potential +/// parent blocks for a new block. +/// +/// This accepts a relay-chain block to be used as an anchor and a maximum search depth, +/// along with some arguments for filtering parachain blocks and performs a recursive search +/// for parachain blocks. The search begins at the last included parachain block and returns +/// a set of [`PotentialParent`]s which could be potential parents of a new block with this +/// relay-parent according to the search parameters. +/// +/// A parachain block is a potential parent if it is either the last included parachain block, the +/// pending parachain block (when `max_depth` >= 1), or all of the following hold: +/// * its parent is a potential parent +/// * its relay-parent is within `ancestry_lookback` of the targeted relay-parent. +/// * its relay-parent is within the same session as the targeted relay-parent. +/// * the block number is within `max_depth` blocks of the included block +pub async fn find_potential_parents( + params: ParentSearchParams, + backend: &impl Backend, + relay_client: &impl RelayChainInterface, +) -> Result>, RelayChainError> { + tracing::trace!("Parent search parameters: {params:?}"); + // Get the included block. + let Some((included_header, included_hash)) = + fetch_included_from_relay_chain(relay_client, backend, params.para_id, params.relay_parent) + .await? + else { + return Ok(Default::default()) + }; + + let only_included = vec![PotentialParent { + hash: included_hash, + header: included_header.clone(), + depth: 0, + aligned_with_pending: true, + }]; + + if params.max_depth == 0 { + return Ok(only_included) + }; + + // Pending header and hash. + let maybe_pending = { + // Fetch the most recent pending header from the relay chain. We use + // `OccupiedCoreAssumption::Included` so the candidate pending availability gets enacted + // before being returned to us. + let pending_header = relay_client + .persisted_validation_data( + params.relay_parent, + params.para_id, + OccupiedCoreAssumption::Included, + ) + .await? + .and_then(|p| B::Header::decode(&mut &p.parent_head.0[..]).ok()) + .filter(|x| x.hash() != included_hash); + + // If the pending block is not locally known, we can't do anything. + if let Some(header) = pending_header { + let pending_hash = header.hash(); + match backend.blockchain().header(pending_hash) { + // We are supposed to ignore branches that don't contain the pending block, but we + // do not know the pending block locally. + Ok(None) | Err(_) if params.ignore_alternative_branches => { + tracing::warn!( + target: PARENT_SEARCH_LOG_TARGET, + %pending_hash, + "Failed to get header for pending block.", + ); + return Ok(Default::default()) + }, + Ok(Some(_)) => Some((header, pending_hash)), + _ => None, + } + } else { + None + } + }; + + let maybe_route_to_last_pending = maybe_pending + .as_ref() + .map(|(_, pending)| { + sp_blockchain::tree_route(backend.blockchain(), included_hash, *pending) + }) + .transpose()?; + + // If we want to ignore alternative branches there is no reason to start + // the parent search at the included block. We can add the included block and + // the path to the pending block to the potential parents directly (limited by max_depth). + let (frontier, potential_parents) = match ( + &maybe_pending, + params.ignore_alternative_branches, + &maybe_route_to_last_pending, + ) { + (Some((pending_header, pending_hash)), true, Some(ref route_to_pending)) => { + let mut potential_parents = only_included; + + // This is a defensive check, should never happen. + if !route_to_pending.retracted().is_empty() { + tracing::warn!(target: PARENT_SEARCH_LOG_TARGET, "Included block not an ancestor of pending block. This should not happen."); + return Ok(Default::default()) + } + + // Add all items on the path included -> pending - 1 to the potential parents, but + // not more than `max_depth`. + let num_parents_on_path = + route_to_pending.enacted().len().saturating_sub(1).min(params.max_depth); + for (num, block) in + route_to_pending.enacted().iter().take(num_parents_on_path).enumerate() + { + let Ok(Some(header)) = backend.blockchain().header(block.hash) else { continue }; + + potential_parents.push(PotentialParent { + hash: block.hash, + header, + depth: 1 + num, + aligned_with_pending: true, + }); + } + + // The search for additional potential parents should now start at the children of + // the pending block. + ( + vec![PotentialParent { + hash: *pending_hash, + header: pending_header.clone(), + depth: route_to_pending.enacted().len(), + aligned_with_pending: true, + }], + potential_parents, + ) + }, + _ => (only_included, Default::default()), + }; + + if potential_parents.len() > params.max_depth { + return Ok(potential_parents); + } + + // Build up the ancestry record of the relay chain to compare against. + let rp_ancestry = + build_relay_parent_ancestry(params.ancestry_lookback, params.relay_parent, relay_client) + .await?; + + Ok(search_child_branches_for_parents( + frontier, + maybe_route_to_last_pending, + included_header, + maybe_pending.map(|(_, hash)| hash), + backend, + params.max_depth, + params.ignore_alternative_branches, + rp_ancestry, + potential_parents, + )) +} + +/// Fetch the included block from the relay chain. +async fn fetch_included_from_relay_chain( + relay_client: &impl RelayChainInterface, + backend: &impl Backend, + para_id: ParaId, + relay_parent: RelayHash, +) -> Result, RelayChainError> { + // Fetch the pending header from the relay chain. We use `OccupiedCoreAssumption::TimedOut` + // so that even if there is a pending candidate, we assume it is timed out and we get the + // included head. + let included_header = relay_client + .persisted_validation_data(relay_parent, para_id, OccupiedCoreAssumption::TimedOut) + .await?; + let included_header = match included_header { + Some(pvd) => pvd.parent_head, + None => return Ok(None), // this implies the para doesn't exist. + }; + + let included_header = match B::Header::decode(&mut &included_header.0[..]).ok() { + None => return Ok(None), + Some(x) => x, + }; + + let included_hash = included_header.hash(); + // If the included block is not locally known, we can't do anything. + match backend.blockchain().header(included_hash) { + Ok(None) => { + tracing::warn!( + target: PARENT_SEARCH_LOG_TARGET, + %included_hash, + "Failed to get header for included block.", + ); + return Ok(None) + }, + Err(e) => { + tracing::warn!( + target: PARENT_SEARCH_LOG_TARGET, + %included_hash, + %e, + "Failed to get header for included block.", + ); + return Ok(None) + }, + _ => {}, + }; + + Ok(Some((included_header, included_hash))) +} + +/// Build an ancestry of relay parents that are acceptable. +/// +/// An acceptable relay parent is one that is no more than `ancestry_lookback` + 1 blocks below the +/// relay parent we want to build on. Parachain blocks anchored on relay parents older than that can +/// not be considered potential parents for block building. They have no chance of still getting +/// included, so our newly build parachain block would also not get included. +/// +/// On success, returns a vector of `(header_hash, state_root)` of the relevant relay chain +/// ancestry blocks. +async fn build_relay_parent_ancestry( + ancestry_lookback: usize, + relay_parent: RelayHash, + relay_client: &impl RelayChainInterface, +) -> Result, RelayChainError> { + let mut ancestry = Vec::with_capacity(ancestry_lookback + 1); + let mut current_rp = relay_parent; + let mut required_session = None; + while ancestry.len() <= ancestry_lookback { + let Some(header) = relay_client.header(RBlockId::hash(current_rp)).await? else { break }; + + let session = relay_client.session_index_for_child(current_rp).await?; + if required_session.get_or_insert(session) != &session { + // Respect the relay-chain rule not to cross session boundaries. + break; + } + + ancestry.push((current_rp, *header.state_root())); + current_rp = *header.parent_hash(); + + // don't iterate back into the genesis block. + if header.number == 1 { + break + } + } + Ok(ancestry) +} + +/// Start search for child blocks that can be used as parents. +pub fn search_child_branches_for_parents( + mut frontier: Vec>, + maybe_route_to_last_pending: Option>, + included_header: Block::Header, + pending_hash: Option, + backend: &impl Backend, + max_depth: usize, + ignore_alternative_branches: bool, + rp_ancestry: Vec<(RelayHash, RelayHash)>, + mut potential_parents: Vec>, +) -> Vec> { + let included_hash = included_header.hash(); + let is_hash_in_ancestry = |hash| rp_ancestry.iter().any(|x| x.0 == hash); + let is_root_in_ancestry = |root| rp_ancestry.iter().any(|x| x.1 == root); + + // The distance between pending and included block. Is later used to check if a child + // is aligned with pending when it is between pending and included block. + let pending_distance = maybe_route_to_last_pending.as_ref().map(|route| route.enacted().len()); + + // If a block is on the path included -> pending, we consider it `aligned_with_pending`. + let is_child_pending = |hash| { + maybe_route_to_last_pending + .as_ref() + .map_or(true, |route| route.enacted().iter().any(|x| x.hash == hash)) + }; + + tracing::trace!( + target: PARENT_SEARCH_LOG_TARGET, + ?included_hash, + included_num = ?included_header.number(), + ?pending_hash , + ?rp_ancestry, + "Searching relay chain ancestry." + ); + while let Some(entry) = frontier.pop() { + let is_pending = pending_hash.as_ref().map_or(false, |h| &entry.hash == h); + let is_included = included_hash == entry.hash; + + // note: even if the pending block or included block have a relay parent + // outside of the expected part of the relay chain, they are always allowed + // because they have already been posted on chain. + let is_potential = is_pending || is_included || { + let digest = entry.header.digest(); + let is_hash_in_ancestry_check = cumulus_primitives_core::extract_relay_parent(digest) + .map_or(false, is_hash_in_ancestry); + let is_root_in_ancestry_check = + cumulus_primitives_core::rpsr_digest::extract_relay_parent_storage_root(digest) + .map(|(r, _n)| r) + .map_or(false, is_root_in_ancestry); + + is_hash_in_ancestry_check || is_root_in_ancestry_check + }; + + let parent_aligned_with_pending = entry.aligned_with_pending; + let child_depth = entry.depth + 1; + let hash = entry.hash; + + tracing::trace!( + target: PARENT_SEARCH_LOG_TARGET, + ?hash, + is_potential, + is_pending, + is_included, + "Checking potential parent." + ); + + if is_potential { + potential_parents.push(entry); + } + + if !is_potential || child_depth > max_depth { + continue + } + + // push children onto search frontier. + for child in backend.blockchain().children(hash).ok().into_iter().flatten() { + tracing::trace!(target: PARENT_SEARCH_LOG_TARGET, ?child, child_depth, ?pending_distance, "Looking at child."); + + let aligned_with_pending = parent_aligned_with_pending && + (pending_distance.map_or(true, |dist| child_depth > dist) || + is_child_pending(child)); + + if ignore_alternative_branches && !aligned_with_pending { + tracing::trace!(target: PARENT_SEARCH_LOG_TARGET, ?child, "Child is not aligned with pending block."); + continue + } + + let Ok(Some(header)) = backend.blockchain().header(child) else { continue }; + + frontier.push(PotentialParent { + hash: child, + header, + depth: child_depth, + aligned_with_pending, + }); + } + } + + potential_parents +} diff --git a/cumulus/client/consensus/common/src/tests.rs b/cumulus/client/consensus/common/src/tests.rs index 2a944bc7f9fa..284fa39ed1e7 100644 --- a/cumulus/client/consensus/common/src/tests.rs +++ b/cumulus/client/consensus/common/src/tests.rs @@ -20,7 +20,7 @@ use async_trait::async_trait; use codec::Encode; use cumulus_client_pov_recovery::RecoveryKind; use cumulus_primitives_core::{ - relay_chain::{self, BlockId}, + relay_chain::{BlockId, BlockNumber, CoreState}, CumulusDigestItem, InboundDownwardMessage, InboundHrmpMessage, }; use cumulus_relay_chain_interface::{ @@ -37,6 +37,7 @@ use futures_timer::Delay; use polkadot_primitives::HeadData; use sc_client_api::{Backend as _, UsageProvider}; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; +use sp_blockchain::Backend as BlockchainBackend; use sp_consensus::{BlockOrigin, BlockStatus}; use sp_version::RuntimeVersion; use std::{ @@ -46,11 +47,11 @@ use std::{ time::Duration, }; -fn relay_block_num_from_hash(hash: &PHash) -> relay_chain::BlockNumber { +fn relay_block_num_from_hash(hash: &PHash) -> BlockNumber { hash.to_low_u64_be() as u32 } -fn relay_hash_from_block_num(block_number: relay_chain::BlockNumber) -> PHash { +fn relay_hash_from_block_num(block_number: BlockNumber) -> PHash { PHash::from_low_u64_be(block_number as u64) } @@ -257,6 +258,13 @@ impl RelayChainInterface for Relaychain { })) } + async fn availability_cores( + &self, + _relay_parent: PHash, + ) -> RelayChainResult>> { + unimplemented!("Not needed for test"); + } + async fn version(&self, _: PHash) -> RelayChainResult { unimplemented!("Not needed for test") } @@ -1138,6 +1146,357 @@ fn find_potential_parents_with_max_depth() { } } +#[test] +fn find_potential_parents_unknown_included() { + sp_tracing::try_init_simple(); + + const NON_INCLUDED_CHAIN_LEN: usize = 5; + + let backend = Arc::new(Backend::new_test(1000, 1)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + let relay_parent = relay_hash_from_block_num(10); + // Choose different relay parent for alternative chain to get new hashes. + let search_relay_parent = relay_hash_from_block_num(11); + + let sproof = sproof_with_best_parent(&client); + let included_but_unknown = build_block(&*client, sproof, None, None, Some(relay_parent)); + + let relay_chain = Relaychain::new(); + { + let relay_inner = &mut relay_chain.inner.lock().unwrap(); + relay_inner + .relay_chain_hash_to_header + .insert(search_relay_parent, included_but_unknown.header().clone()); + } + + // Ignore alternative branch: + let potential_parents = block_on(find_potential_parents( + ParentSearchParams { + relay_parent: search_relay_parent, + para_id: ParaId::from(100), + ancestry_lookback: 1, // aligned chain is in ancestry. + max_depth: NON_INCLUDED_CHAIN_LEN, + ignore_alternative_branches: true, + }, + &*backend, + &relay_chain, + )) + .unwrap(); + + assert_eq!(potential_parents.len(), 0); +} + +#[test] +fn find_potential_parents_unknown_pending() { + sp_tracing::try_init_simple(); + + const NON_INCLUDED_CHAIN_LEN: usize = 5; + + let backend = Arc::new(Backend::new_test(1000, 1)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + let mut para_import = + ParachainBlockImport::new_with_delayed_best_block(client.clone(), backend.clone()); + + let relay_parent = relay_hash_from_block_num(10); + // Choose different relay parent for alternative chain to get new hashes. + let search_relay_parent = relay_hash_from_block_num(11); + let included_block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + None, + None, + Some(relay_parent), + ); + + let sproof = sproof_with_parent_by_hash(&client, included_block.header().hash()); + let pending_but_unknown = build_block( + &*client, + sproof, + Some(included_block.header().hash()), + None, + Some(relay_parent), + ); + + let relay_chain = Relaychain::new(); + { + let relay_inner = &mut relay_chain.inner.lock().unwrap(); + relay_inner + .relay_chain_hash_to_header + .insert(search_relay_parent, included_block.header().clone()); + relay_inner + .relay_chain_hash_to_header_pending + .insert(search_relay_parent, pending_but_unknown.header().clone()); + } + + // Ignore alternative branch: + let potential_parents = block_on(find_potential_parents( + ParentSearchParams { + relay_parent: search_relay_parent, + para_id: ParaId::from(100), + ancestry_lookback: 1, // aligned chain is in ancestry. + max_depth: NON_INCLUDED_CHAIN_LEN, + ignore_alternative_branches: true, + }, + &*backend, + &relay_chain, + )) + .unwrap(); + + assert!(potential_parents.is_empty()); +} + +#[test] +fn find_potential_parents_unknown_pending_include_alternative_branches() { + sp_tracing::try_init_simple(); + + const NON_INCLUDED_CHAIN_LEN: usize = 5; + + let backend = Arc::new(Backend::new_test(1000, 1)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + let mut para_import = + ParachainBlockImport::new_with_delayed_best_block(client.clone(), backend.clone()); + + let relay_parent = relay_hash_from_block_num(10); + + // Choose different relay parent for alternative chain to get new hashes. + let search_relay_parent = relay_hash_from_block_num(11); + + let included_block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + None, + None, + Some(relay_parent), + ); + + let alt_block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + Some(included_block.header().hash()), + None, + Some(search_relay_parent), + ); + + tracing::info!(hash = %alt_block.header().hash(), "Alt block."); + let sproof = sproof_with_parent_by_hash(&client, included_block.header().hash()); + let pending_but_unknown = build_block( + &*client, + sproof, + Some(included_block.header().hash()), + None, + Some(relay_parent), + ); + + let relay_chain = Relaychain::new(); + { + let relay_inner = &mut relay_chain.inner.lock().unwrap(); + relay_inner + .relay_chain_hash_to_header + .insert(search_relay_parent, included_block.header().clone()); + relay_inner + .relay_chain_hash_to_header_pending + .insert(search_relay_parent, pending_but_unknown.header().clone()); + } + + // Ignore alternative branch: + let potential_parents = block_on(find_potential_parents( + ParentSearchParams { + relay_parent: search_relay_parent, + para_id: ParaId::from(100), + ancestry_lookback: 1, // aligned chain is in ancestry. + max_depth: NON_INCLUDED_CHAIN_LEN, + ignore_alternative_branches: false, + }, + &*backend, + &relay_chain, + )) + .unwrap(); + + let expected_parents: Vec<_> = vec![&included_block, &alt_block]; + assert_eq!(potential_parents.len(), 2); + assert_eq!(expected_parents[0].hash(), potential_parents[0].hash); + assert_eq!(expected_parents[1].hash(), potential_parents[1].hash); +} + +/// Test where there are multiple pending blocks. +#[test] +fn find_potential_parents_aligned_with_late_pending() { + sp_tracing::try_init_simple(); + + const NON_INCLUDED_CHAIN_LEN: usize = 5; + + let backend = Arc::new(Backend::new_test(1000, 1)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + let mut para_import = + ParachainBlockImport::new_with_delayed_best_block(client.clone(), backend.clone()); + + let relay_parent = relay_hash_from_block_num(10); + // Choose different relay parent for alternative chain to get new hashes. + let search_relay_parent = relay_hash_from_block_num(11); + let included_block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + None, + None, + Some(relay_parent), + ); + + let in_between_block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + Some(included_block.header().hash()), + None, + Some(relay_parent), + ); + + let pending_block = build_and_import_block_ext( + &client, + BlockOrigin::Own, + true, + &mut para_import, + Some(in_between_block.header().hash()), + None, + Some(relay_parent), + ); + + let relay_chain = Relaychain::new(); + { + let relay_inner = &mut relay_chain.inner.lock().unwrap(); + relay_inner + .relay_chain_hash_to_header + .insert(search_relay_parent, included_block.header().clone()); + relay_inner + .relay_chain_hash_to_header_pending + .insert(search_relay_parent, in_between_block.header().clone()); + relay_inner + .relay_chain_hash_to_header_pending + .insert(search_relay_parent, pending_block.header().clone()); + } + + // Build some blocks on the pending block and on the included block. + // We end up with two sibling chains, one is aligned with the pending block, + // the other is not. + let mut aligned_blocks = Vec::new(); + let mut parent = pending_block.header().hash(); + for _ in 2..NON_INCLUDED_CHAIN_LEN { + let block = build_and_import_block_ext( + &client, + BlockOrigin::Own, + true, + &mut para_import, + Some(parent), + None, + Some(relay_parent), + ); + parent = block.header().hash(); + aligned_blocks.push(block); + } + + let mut alt_blocks = Vec::new(); + let mut parent = included_block.header().hash(); + for _ in 0..NON_INCLUDED_CHAIN_LEN { + let block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + Some(parent), + None, + Some(search_relay_parent), + ); + parent = block.header().hash(); + alt_blocks.push(block); + } + + // Ignore alternative branch: + for max_depth in 0..=NON_INCLUDED_CHAIN_LEN { + let potential_parents = block_on(find_potential_parents( + ParentSearchParams { + relay_parent: search_relay_parent, + para_id: ParaId::from(100), + ancestry_lookback: 1, // aligned chain is in ancestry. + max_depth, + ignore_alternative_branches: true, + }, + &*backend, + &relay_chain, + )) + .unwrap(); + + assert_eq!(potential_parents.len(), max_depth + 1); + let expected_parents: Vec<_> = [&included_block, &in_between_block, &pending_block] + .into_iter() + .chain(aligned_blocks.iter()) + .take(max_depth + 1) + .collect(); + + for i in 0..(max_depth + 1) { + let parent = &potential_parents[i]; + let expected = &expected_parents[i]; + + assert_eq!(parent.hash, expected.hash()); + assert_eq!(&parent.header, expected.header()); + assert_eq!(parent.depth, i); + assert!(parent.aligned_with_pending); + } + } + + // Do not ignore: + for max_depth in 0..=NON_INCLUDED_CHAIN_LEN { + let potential_parents = block_on(find_potential_parents( + ParentSearchParams { + relay_parent: search_relay_parent, + para_id: ParaId::from(100), + ancestry_lookback: 1, // aligned chain is in ancestry. + max_depth, + ignore_alternative_branches: false, + }, + &*backend, + &relay_chain, + )) + .unwrap(); + + let expected_len = 2 * max_depth + 1; + assert_eq!(potential_parents.len(), expected_len); + let expected_aligned: Vec<_> = [&included_block, &in_between_block, &pending_block] + .into_iter() + .chain(aligned_blocks.iter()) + .take(max_depth + 1) + .collect(); + let expected_alt = alt_blocks.iter().take(max_depth); + + let expected_parents: Vec<_> = + expected_aligned.clone().into_iter().chain(expected_alt).collect(); + // Check correctness. + assert_eq!(expected_parents.len(), expected_len); + + for i in 0..expected_len { + let parent = &potential_parents[i]; + let expected = expected_parents + .iter() + .find(|block| block.header().hash() == parent.hash) + .expect("missing parent"); + + let is_aligned = expected_aligned.contains(&expected); + + assert_eq!(parent.hash, expected.hash()); + assert_eq!(&parent.header, expected.header()); + + assert_eq!(parent.aligned_with_pending, is_aligned); + } + } +} + #[test] fn find_potential_parents_aligned_with_pending() { sp_tracing::try_init_simple(); @@ -1249,6 +1608,7 @@ fn find_potential_parents_aligned_with_pending() { // Do not ignore: for max_depth in 0..=NON_INCLUDED_CHAIN_LEN { + log::info!("Ran with max_depth = {max_depth}"); let potential_parents = block_on(find_potential_parents( ParentSearchParams { relay_parent: search_relay_parent, @@ -1276,6 +1636,7 @@ fn find_potential_parents_aligned_with_pending() { // Check correctness. assert_eq!(expected_parents.len(), expected_len); + potential_parents.iter().for_each(|p| log::info!("result: {:?}", p)); for i in 0..expected_len { let parent = &potential_parents[i]; let expected = expected_parents @@ -1288,6 +1649,12 @@ fn find_potential_parents_aligned_with_pending() { assert_eq!(parent.hash, expected.hash()); assert_eq!(&parent.header, expected.header()); + log::info!( + "Check hash: {:?} expected: {} is: {}", + parent.hash, + is_aligned, + parent.aligned_with_pending, + ); assert_eq!(parent.aligned_with_pending, is_aligned); } } diff --git a/cumulus/client/network/src/tests.rs b/cumulus/client/network/src/tests.rs index eb0d7f0e01b3..18d121c41d16 100644 --- a/cumulus/client/network/src/tests.rs +++ b/cumulus/client/network/src/tests.rs @@ -26,9 +26,10 @@ use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt use parking_lot::Mutex; use polkadot_node_primitives::{SignedFullStatement, Statement}; use polkadot_primitives::{ - CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt, - Hash as PHash, HeadData, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, - PersistedValidationData, SessionIndex, SigningContext, ValidationCodeHash, ValidatorId, + BlockNumber, CandidateCommitments, CandidateDescriptor, CollatorPair, + CommittedCandidateReceipt, CoreState, Hash as PHash, HeadData, InboundDownwardMessage, + InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex, + SigningContext, ValidationCodeHash, ValidatorId, }; use polkadot_test_client::{ Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend, @@ -297,6 +298,13 @@ impl RelayChainInterface for DummyRelayChainInterface { Ok(header) } + async fn availability_cores( + &self, + _relay_parent: PHash, + ) -> RelayChainResult>> { + unimplemented!("Not needed for test"); + } + async fn version(&self, _: PHash) -> RelayChainResult { let version = self.data.lock().runtime_version; diff --git a/cumulus/client/parachain-inherent/Cargo.toml b/cumulus/client/parachain-inherent/Cargo.toml index 9d346ce17f56..d81f727b41b9 100644 --- a/cumulus/client/parachain-inherent/Cargo.toml +++ b/cumulus/client/parachain-inherent/Cargo.toml @@ -9,7 +9,6 @@ license = "Apache-2.0" [dependencies] async-trait = { workspace = true } codec = { features = ["derive"], workspace = true, default-features = true } -scale-info = { features = ["derive"], workspace = true, default-features = true } tracing = { workspace = true, default-features = true } # Substrate diff --git a/cumulus/client/pov-recovery/src/tests.rs b/cumulus/client/pov-recovery/src/tests.rs index 75bf308ef27a..6f274ed18b6b 100644 --- a/cumulus/client/pov-recovery/src/tests.rs +++ b/cumulus/client/pov-recovery/src/tests.rs @@ -17,7 +17,9 @@ use super::*; use assert_matches::assert_matches; use codec::{Decode, Encode}; -use cumulus_primitives_core::relay_chain::{BlockId, CandidateCommitments, CandidateDescriptor}; +use cumulus_primitives_core::relay_chain::{ + BlockId, CandidateCommitments, CandidateDescriptor, CoreState, +}; use cumulus_relay_chain_interface::{ InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PHash, PHeader, PersistedValidationData, StorageValue, ValidationCodeHash, ValidatorId, @@ -478,6 +480,13 @@ impl RelayChainInterface for Relaychain { async fn header(&self, _: BlockId) -> RelayChainResult> { unimplemented!("Not needed for test"); } + + async fn availability_cores( + &self, + _: PHash, + ) -> RelayChainResult>>> { + unimplemented!("Not needed for test"); + } } fn make_candidate_chain(candidate_number_range: Range) -> Vec { diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index 7871623e8447..8f8d666bd143 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -19,9 +19,9 @@ use std::{pin::Pin, sync::Arc, time::Duration}; use async_trait::async_trait; use cumulus_primitives_core::{ relay_chain::{ - runtime_api::ParachainHost, Block as PBlock, BlockId, CommittedCandidateReceipt, - Hash as PHash, Header as PHeader, InboundHrmpMessage, OccupiedCoreAssumption, SessionIndex, - ValidationCodeHash, ValidatorId, + runtime_api::ParachainHost, Block as PBlock, BlockId, BlockNumber, + CommittedCandidateReceipt, CoreState, Hash as PHash, Header as PHeader, InboundHrmpMessage, + OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; @@ -256,6 +256,13 @@ impl RelayChainInterface for RelayChainInProcessInterface { Ok(Box::pin(notifications_stream)) } + async fn availability_cores( + &self, + relay_parent: PHash, + ) -> RelayChainResult>> { + Ok(self.full_client.runtime_api().availability_cores(relay_parent)?) + } + async fn candidates_pending_availability( &self, hash: PHash, diff --git a/cumulus/client/relay-chain-interface/src/lib.rs b/cumulus/client/relay-chain-interface/src/lib.rs index 46e19b40f010..d02035e84e92 100644 --- a/cumulus/client/relay-chain-interface/src/lib.rs +++ b/cumulus/client/relay-chain-interface/src/lib.rs @@ -29,8 +29,8 @@ use sp_api::ApiError; use cumulus_primitives_core::relay_chain::BlockId; pub use cumulus_primitives_core::{ relay_chain::{ - CommittedCandidateReceipt, Hash as PHash, Header as PHeader, InboundHrmpMessage, - OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId, + BlockNumber, CommittedCandidateReceipt, CoreState, Hash as PHash, Header as PHeader, + InboundHrmpMessage, OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; @@ -217,6 +217,14 @@ pub trait RelayChainInterface: Send + Sync { /// Get the runtime version of the relay chain. async fn version(&self, relay_parent: PHash) -> RelayChainResult; + + /// Yields information on all availability cores as relevant to the child block. + /// + /// Cores are either free, scheduled or occupied. Free cores can have paras assigned to them. + async fn availability_cores( + &self, + relay_parent: PHash, + ) -> RelayChainResult>>; } #[async_trait] @@ -337,6 +345,13 @@ where .await } + async fn availability_cores( + &self, + relay_parent: PHash, + ) -> RelayChainResult>> { + (**self).availability_cores(relay_parent).await + } + async fn candidates_pending_availability( &self, block_id: PHash, diff --git a/cumulus/client/relay-chain-minimal-node/Cargo.toml b/cumulus/client/relay-chain-minimal-node/Cargo.toml index 1d89316d400b..95ecadc8bd06 100644 --- a/cumulus/client/relay-chain-minimal-node/Cargo.toml +++ b/cumulus/client/relay-chain-minimal-node/Cargo.toml @@ -17,13 +17,7 @@ polkadot-overseer = { workspace = true, default-features = true } polkadot-node-subsystem-util = { workspace = true, default-features = true } polkadot-node-network-protocol = { workspace = true, default-features = true } -polkadot-availability-recovery = { workspace = true, default-features = true } -polkadot-collator-protocol = { workspace = true, default-features = true } polkadot-network-bridge = { workspace = true, default-features = true } -polkadot-node-collation-generation = { workspace = true, default-features = true } -polkadot-node-core-runtime-api = { workspace = true, default-features = true } -polkadot-node-core-chain-api = { workspace = true, default-features = true } -polkadot-node-core-prospective-parachains = { workspace = true, default-features = true } polkadot-service = { workspace = true, default-features = true } # substrate deps @@ -51,4 +45,3 @@ array-bytes = { workspace = true, default-features = true } tracing = { workspace = true, default-features = true } async-trait = { workspace = true } futures = { workspace = true } -parking_lot = { workspace = true, default-features = true } diff --git a/cumulus/client/relay-chain-rpc-interface/src/lib.rs b/cumulus/client/relay-chain-rpc-interface/src/lib.rs index bb7bfa5dc322..692a1fb537a8 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/lib.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/lib.rs @@ -24,7 +24,7 @@ use cumulus_primitives_core::{ InboundDownwardMessage, ParaId, PersistedValidationData, }; use cumulus_relay_chain_interface::{ - PHeader, RelayChainError, RelayChainInterface, RelayChainResult, + BlockNumber, CoreState, PHeader, RelayChainError, RelayChainInterface, RelayChainResult, }; use futures::{FutureExt, Stream, StreamExt}; use polkadot_overseer::Handle; @@ -252,4 +252,11 @@ impl RelayChainInterface for RelayChainRpcInterface { async fn version(&self, relay_parent: RelayHash) -> RelayChainResult { self.rpc_client.runtime_version(relay_parent).await } + + async fn availability_cores( + &self, + relay_parent: RelayHash, + ) -> RelayChainResult>> { + self.rpc_client.parachain_host_availability_cores(relay_parent).await + } } diff --git a/cumulus/pallets/aura-ext/src/consensus_hook.rs b/cumulus/pallets/aura-ext/src/consensus_hook.rs index 592029803391..560d477b2a85 100644 --- a/cumulus/pallets/aura-ext/src/consensus_hook.rs +++ b/cumulus/pallets/aura-ext/src/consensus_hook.rs @@ -65,9 +65,19 @@ where let para_slot_from_relay = Slot::from_timestamp(relay_chain_timestamp.into(), para_slot_duration); - // Perform checks. - assert_eq!(slot, para_slot_from_relay, "slot number mismatch"); - if authored > velocity + 1 { + // Check that we are not too far in the future. Since we expect `V` parachain blocks + // during the relay chain slot, we can allow for `V` parachain slots into the future. + if *slot > *para_slot_from_relay + u64::from(velocity) { + panic!( + "Parachain slot is too far in the future: parachain_slot: {:?}, derived_from_relay_slot: {:?} velocity: {:?}", + slot, + para_slot_from_relay, + velocity + ); + } + + // We need to allow authoring multiple blocks in the same slot. + if slot != para_slot_from_relay && authored > velocity { panic!("authored blocks limit is reached for the slot") } let weight = T::DbWeight::get().reads(1); @@ -113,6 +123,11 @@ impl< return false } + // TODO: This logic needs to be adjusted. + // It checks that we have not authored more than `V + 1` blocks in the slot. + // As a slot however, we take the parachain slot here. Velocity should + // be measured in relation to the relay chain slot. + // https://github.com/paritytech/polkadot-sdk/issues/3967 if last_slot == new_slot { authored_so_far < velocity + 1 } else { diff --git a/cumulus/pallets/aura-ext/src/lib.rs b/cumulus/pallets/aura-ext/src/lib.rs index 7ca84dff7c51..4605dd325bee 100644 --- a/cumulus/pallets/aura-ext/src/lib.rs +++ b/cumulus/pallets/aura-ext/src/lib.rs @@ -83,7 +83,7 @@ pub mod pallet { SlotInfo::::put((new_slot, authored)); - T::DbWeight::get().reads_writes(2, 1) + T::DbWeight::get().reads_writes(4, 2) } } diff --git a/cumulus/polkadot-parachain/Cargo.toml b/cumulus/polkadot-parachain/Cargo.toml index ae5abdcfab6a..7085211dad26 100644 --- a/cumulus/polkadot-parachain/Cargo.toml +++ b/cumulus/polkadot-parachain/Cargo.toml @@ -120,7 +120,7 @@ substrate-build-script-utils = { workspace = true, default-features = true } assert_cmd = { workspace = true } nix = { features = ["signal"], workspace = true } tempfile = { workspace = true } -tokio = { features = ["macros", "parking_lot", "time"], workspace = true, default-features = true } +tokio = { version = "1.32.0", features = ["macros", "parking_lot", "time"] } wait-timeout = { workspace = true } [features] diff --git a/cumulus/polkadot-parachain/src/cli.rs b/cumulus/polkadot-parachain/src/cli.rs index 3f8a2ec0d118..7c01e34f9a03 100644 --- a/cumulus/polkadot-parachain/src/cli.rs +++ b/cumulus/polkadot-parachain/src/cli.rs @@ -73,6 +73,12 @@ pub struct Cli { #[command(flatten)] pub run: cumulus_client_cli::RunCmd, + /// EXPERIMENTAL: Use slot-based collator which can handle elastic scaling. + /// + /// Use with care, this flag is unstable and subject to change. + #[arg(long)] + pub experimental_use_slot_based: bool, + /// Disable automatic hardware benchmarks. /// /// By default these benchmarks are automatically ran at startup and measure diff --git a/cumulus/polkadot-parachain/src/command.rs b/cumulus/polkadot-parachain/src/command.rs index 2a1f20d5c176..323216f300d8 100644 --- a/cumulus/polkadot-parachain/src/command.rs +++ b/cumulus/polkadot-parachain/src/command.rs @@ -690,6 +690,7 @@ pub fn run() -> Result<()> { polkadot_config, collator_options, id, + cli.experimental_use_slot_based, hwbench, ) .await, @@ -699,6 +700,7 @@ pub fn run() -> Result<()> { polkadot_config, collator_options, id, + cli.experimental_use_slot_based, hwbench, ) .await, @@ -713,24 +715,27 @@ async fn start_node>( polkadot_config: sc_service::Configuration, collator_options: cumulus_client_cli::CollatorOptions, id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> Result { match config.chain_spec.runtime()? { - Runtime::AssetHubPolkadot => crate::service::start_asset_hub_lookahead_node::< - AssetHubPolkadotRuntimeApi, - AssetHubPolkadotAuraId, - Network, - >(config, polkadot_config, collator_options, id, hwbench) - .await - .map(|r| r.0) - .map_err(Into::into), + Runtime::AssetHubPolkadot => + crate::service::start_asset_hub_async_backing_node::< + AssetHubPolkadotRuntimeApi, + AssetHubPolkadotAuraId, + Network, + >(config, polkadot_config, collator_options, id, use_experimental_slot_based, hwbench) + .await + .map(|r| r.0) + .map_err(Into::into), Runtime::AssetHub | Runtime::Collectives => - crate::service::start_generic_aura_lookahead_node::( + crate::service::start_generic_aura_async_backing_node::( config, polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -753,6 +758,7 @@ async fn start_node>( polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -770,11 +776,12 @@ async fn start_node>( chain_spec::bridge_hubs::BridgeHubRuntimeType::Rococo | chain_spec::bridge_hubs::BridgeHubRuntimeType::RococoLocal | chain_spec::bridge_hubs::BridgeHubRuntimeType::RococoDevelopment => - crate::service::start_generic_aura_lookahead_node::( + crate::service::start_generic_aura_async_backing_node::( config, polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -793,11 +800,12 @@ async fn start_node>( chain_spec::coretime::CoretimeRuntimeType::Westend | chain_spec::coretime::CoretimeRuntimeType::WestendLocal | chain_spec::coretime::CoretimeRuntimeType::WestendDevelopment => - crate::service::start_generic_aura_lookahead_node::( + crate::service::start_generic_aura_async_backing_node::( config, polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -810,17 +818,19 @@ async fn start_node>( polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await .map(|r| r.0) .map_err(Into::into), - Runtime::Glutton => crate::service::start_basic_lookahead_node::( + Runtime::Glutton => crate::service::start_basic_async_backing_node::( config, polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -838,11 +848,12 @@ async fn start_node>( chain_spec::people::PeopleRuntimeType::Westend | chain_spec::people::PeopleRuntimeType::WestendLocal | chain_spec::people::PeopleRuntimeType::WestendDevelopment => - crate::service::start_generic_aura_lookahead_node::( + crate::service::start_generic_aura_async_backing_node::( config, polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -857,6 +868,7 @@ async fn start_node>( polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await diff --git a/cumulus/polkadot-parachain/src/service.rs b/cumulus/polkadot-parachain/src/service.rs index 42efe8098b26..0f2aed8ee4d8 100644 --- a/cumulus/polkadot-parachain/src/service.rs +++ b/cumulus/polkadot-parachain/src/service.rs @@ -16,7 +16,10 @@ use cumulus_client_cli::CollatorOptions; use cumulus_client_collator::service::CollatorService; -use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams}; +use cumulus_client_consensus_aura::collators::{ + lookahead::{self as aura, Params as AuraParams}, + slot_based::{self as slot_based, Params as SlotBasedParams}, +}; use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport; use cumulus_client_consensus_proposer::Proposer; #[allow(deprecated)] @@ -51,7 +54,6 @@ use sc_consensus::{ }; use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY}; use sc_network::{config::FullNetworkConfiguration, service::traits::NetworkBackend, NetworkBlock}; -use sc_network_sync::SyncingService; use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager}; use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle}; use sp_api::{ApiExt, ConstructRuntimeApi, ProvideRuntimeApi}; @@ -214,7 +216,6 @@ where &TaskManager, Arc, Arc>>, - Arc>, KeystorePtr, Duration, ParaId, @@ -348,7 +349,6 @@ where &task_manager, relay_chain_interface.clone(), transaction_pool, - sync_service.clone(), params.keystore_container.keystore(), relay_chain_slot_duration, para_id, @@ -408,8 +408,14 @@ pub async fn start_rococo_parachain_node>( polkadot_config: Configuration, collator_options: CollatorOptions, para_id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc>)> { + let consensus_starter = if use_experimental_slot_based { + start_slot_based_aura_consensus::<_, AuraId> + } else { + start_lookahead_aura_consensus::<_, AuraId> + }; start_node_impl::( parachain_config, polkadot_config, @@ -418,7 +424,7 @@ pub async fn start_rococo_parachain_node>( para_id, build_parachain_rpc_extensions::, build_aura_import_queue, - start_lookahead_aura_consensus::<_, AuraId>, + consensus_starter, hwbench, ) .await @@ -580,13 +586,19 @@ where /// Uses the lookahead collator to support async backing. /// /// Start an aura powered parachain node. Some system chains use this. -pub async fn start_generic_aura_lookahead_node>( +pub async fn start_generic_aura_async_backing_node>( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, para_id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc>)> { + let consensus_starter = if use_experimental_slot_based { + start_slot_based_aura_consensus::<_, AuraId> + } else { + start_lookahead_aura_consensus::<_, AuraId> + }; start_node_impl::( parachain_config, polkadot_config, @@ -595,7 +607,7 @@ pub async fn start_generic_aura_lookahead_node> para_id, build_parachain_rpc_extensions::, build_relay_to_aura_import_queue::<_, AuraId>, - start_lookahead_aura_consensus::<_, AuraId>, + consensus_starter, hwbench, ) .await @@ -607,11 +619,12 @@ pub async fn start_generic_aura_lookahead_node> /// /// Uses the lookahead collator to support async backing. #[sc_tracing::logging::prefix_logs_with("Parachain")] -pub async fn start_asset_hub_lookahead_node( +pub async fn start_asset_hub_async_backing_node( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, para_id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc>)> where @@ -622,15 +635,21 @@ where AuraId: AuraIdT + Sync, Net: NetworkBackend, { + let consensus_starter = if use_experimental_slot_based { + start_slot_based_aura_consensus::<_, AuraId> + } else { + start_lookahead_aura_consensus::<_, AuraId> + }; + start_node_impl::( parachain_config, polkadot_config, collator_options, CollatorSybilResistance::Resistant, // Aura para_id, - build_parachain_rpc_extensions::, + build_parachain_rpc_extensions, build_relay_to_aura_import_queue::<_, AuraId>, - start_lookahead_aura_consensus::, + consensus_starter, hwbench, ) .await @@ -676,7 +695,6 @@ fn start_relay_chain_consensus( task_manager: &TaskManager, relay_chain_interface: Arc, transaction_pool: Arc>>, - _sync_oracle: Arc>, _keystore: KeystorePtr, _relay_chain_slot_duration: Duration, para_id: ParaId, @@ -747,7 +765,6 @@ fn start_lookahead_aura_consensus( task_manager: &TaskManager, relay_chain_interface: Arc, transaction_pool: Arc>>, - sync_oracle: Arc>, keystore: KeystorePtr, relay_chain_slot_duration: Duration, para_id: ParaId, @@ -788,7 +805,6 @@ where client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash()) } }, - sync_oracle, keystore, collator_key, para_id, @@ -802,23 +818,104 @@ where let fut = async move { wait_for_aura(client).await; - aura::run::::Pair, _, _, _, _, _, _, _, _, _>(params).await; + aura::run::::Pair, _, _, _, _, _, _, _, _>(params).await; }; task_manager.spawn_essential_handle().spawn("aura", None, fut); Ok(()) } +/// Start consensus using the lookahead aura collator. +fn start_slot_based_aura_consensus( + client: Arc>, + block_import: ParachainBlockImport, + prometheus_registry: Option<&Registry>, + telemetry: Option, + task_manager: &TaskManager, + relay_chain_interface: Arc, + transaction_pool: Arc>>, + keystore: KeystorePtr, + relay_chain_slot_duration: Duration, + para_id: ParaId, + collator_key: CollatorPair, + _overseer_handle: OverseerHandle, + announce_block: Arc>) + Send + Sync>, + backend: Arc, +) -> Result<(), sc_service::Error> +where + RuntimeApi: ConstructNodeRuntimeApi>, + RuntimeApi::RuntimeApi: AuraRuntimeApi, + AuraId: AuraIdT + Sync, +{ + let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( + task_manager.spawn_handle(), + client.clone(), + transaction_pool, + prometheus_registry, + telemetry.clone(), + ); + + let proposer = Proposer::new(proposer_factory); + let collator_service = CollatorService::new( + client.clone(), + Arc::new(task_manager.spawn_handle()), + announce_block, + client.clone(), + ); + + let client_for_aura = client.clone(); + let params = SlotBasedParams { + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import, + para_client: client.clone(), + para_backend: backend.clone(), + relay_client: relay_chain_interface, + code_hash_provider: move |block_hash| { + client_for_aura.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash()) + }, + keystore, + collator_key, + para_id, + relay_chain_slot_duration, + proposer, + collator_service, + authoring_duration: Duration::from_millis(2000), + reinitialize: false, + slot_drift: Duration::from_secs(1), + }; + + let (collation_future, block_builder_future) = + slot_based::run::::Pair, _, _, _, _, _, _, _, _>(params); + + task_manager.spawn_essential_handle().spawn( + "collation-task", + Some("parachain-block-authoring"), + collation_future, + ); + task_manager.spawn_essential_handle().spawn( + "block-builder-task", + Some("parachain-block-authoring"), + block_builder_future, + ); + Ok(()) +} + /// Start an aura powered parachain node which uses the lookahead collator to support async backing. /// This node is basic in the sense that its runtime api doesn't include common contents such as /// transaction payment. Used for aura glutton. -pub async fn start_basic_lookahead_node>( +pub async fn start_basic_async_backing_node>( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, para_id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc>)> { + let consensus_starter = if use_experimental_slot_based { + start_slot_based_aura_consensus::<_, AuraId> + } else { + start_lookahead_aura_consensus::<_, AuraId> + }; start_node_impl::( parachain_config, polkadot_config, @@ -827,7 +924,7 @@ pub async fn start_basic_lookahead_node>( para_id, |_, _, _, _| Ok(RpcModule::new(())), build_relay_to_aura_import_queue::<_, AuraId>, - start_lookahead_aura_consensus::<_, AuraId>, + consensus_starter, hwbench, ) .await @@ -839,8 +936,14 @@ pub async fn start_contracts_rococo_node>( polkadot_config: Configuration, collator_options: CollatorOptions, para_id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc>)> { + let consensus_starter = if use_experimental_slot_based { + start_slot_based_aura_consensus::<_, AuraId> + } else { + start_lookahead_aura_consensus::<_, AuraId> + }; start_node_impl::( parachain_config, polkadot_config, @@ -849,7 +952,7 @@ pub async fn start_contracts_rococo_node>( para_id, build_contracts_rpc_extensions, build_aura_import_queue, - start_lookahead_aura_consensus::<_, AuraId>, + consensus_starter, hwbench, ) .await diff --git a/cumulus/test/client/src/lib.rs b/cumulus/test/client/src/lib.rs index d233ad269176..f26413e441e7 100644 --- a/cumulus/test/client/src/lib.rs +++ b/cumulus/test/client/src/lib.rs @@ -79,6 +79,7 @@ impl substrate_test_client::GenesisInit for GenesisParameters { cumulus_test_service::chain_spec::get_chain_spec_with_extra_endowed( None, self.endowed_accounts.clone(), + cumulus_test_runtime::WASM_BINARY.expect("WASM binary not compiled!"), ) .build_storage() .expect("Builds test runtime genesis storage") diff --git a/cumulus/test/runtime/Cargo.toml b/cumulus/test/runtime/Cargo.toml index fc0eb3ce742f..d5582f2d2a23 100644 --- a/cumulus/test/runtime/Cargo.toml +++ b/cumulus/test/runtime/Cargo.toml @@ -93,3 +93,4 @@ std = [ "substrate-wasm-builder", ] increment-spec-version = [] +elastic-scaling = [] diff --git a/cumulus/test/runtime/build.rs b/cumulus/test/runtime/build.rs index ebd5c178cba0..bf579f4121e5 100644 --- a/cumulus/test/runtime/build.rs +++ b/cumulus/test/runtime/build.rs @@ -24,6 +24,13 @@ fn main() { .enable_feature("increment-spec-version") .set_file_name("wasm_binary_spec_version_incremented.rs") .build(); + + WasmBuilder::new() + .with_current_project() + .enable_feature("elastic-scaling") + .import_memory() + .set_file_name("wasm_binary_elastic_scaling.rs") + .build(); } #[cfg(not(feature = "std"))] diff --git a/cumulus/test/runtime/src/lib.rs b/cumulus/test/runtime/src/lib.rs index 26c6635e1ad3..97cb02ab779e 100644 --- a/cumulus/test/runtime/src/lib.rs +++ b/cumulus/test/runtime/src/lib.rs @@ -27,6 +27,11 @@ pub mod wasm_spec_version_incremented { include!(concat!(env!("OUT_DIR"), "/wasm_binary_spec_version_incremented.rs")); } +pub mod elastic_scaling { + #[cfg(feature = "std")] + include!(concat!(env!("OUT_DIR"), "/wasm_binary_elastic_scaling.rs")); +} + mod test_pallet; use frame_support::{derive_impl, traits::OnRuntimeUpgrade, PalletId}; use sp_api::{decl_runtime_apis, impl_runtime_apis}; @@ -83,8 +88,23 @@ impl_opaque_keys! { /// The para-id used in this runtime. pub const PARACHAIN_ID: u32 = 100; -const UNINCLUDED_SEGMENT_CAPACITY: u32 = 3; +#[cfg(not(feature = "elastic-scaling"))] +const UNINCLUDED_SEGMENT_CAPACITY: u32 = 4; +#[cfg(not(feature = "elastic-scaling"))] const BLOCK_PROCESSING_VELOCITY: u32 = 1; + +#[cfg(feature = "elastic-scaling")] +const UNINCLUDED_SEGMENT_CAPACITY: u32 = 7; +#[cfg(feature = "elastic-scaling")] +const BLOCK_PROCESSING_VELOCITY: u32 = 4; + +#[cfg(not(feature = "elastic-scaling"))] +pub const MILLISECS_PER_BLOCK: u64 = 6000; +#[cfg(feature = "elastic-scaling")] +pub const MILLISECS_PER_BLOCK: u64 = 2000; + +pub const SLOT_DURATION: u64 = MILLISECS_PER_BLOCK; + const RELAY_CHAIN_SLOT_DURATION_MILLIS: u32 = 6000; // The only difference between the two declarations below is the `spec_version`. With the @@ -126,10 +146,6 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { state_version: 1, }; -pub const MILLISECS_PER_BLOCK: u64 = 6000; - -pub const SLOT_DURATION: u64 = MILLISECS_PER_BLOCK; - pub const EPOCH_DURATION_IN_BLOCKS: u32 = 10 * MINUTES; // These time units are defined in number of blocks. diff --git a/cumulus/test/service/Cargo.toml b/cumulus/test/service/Cargo.toml index c40863b90b54..f766d1236320 100644 --- a/cumulus/test/service/Cargo.toml +++ b/cumulus/test/service/Cargo.toml @@ -92,8 +92,6 @@ pallet-timestamp = { workspace = true, default-features = true } [dev-dependencies] futures = { workspace = true } portpicker = { workspace = true } -rococo-parachain-runtime = { workspace = true } -sp-consensus-grandpa = { workspace = true, default-features = true } sp-authority-discovery = { workspace = true, default-features = true } cumulus-test-client = { workspace = true } @@ -116,7 +114,6 @@ runtime-benchmarks = [ "polkadot-primitives/runtime-benchmarks", "polkadot-service/runtime-benchmarks", "polkadot-test-service/runtime-benchmarks", - "rococo-parachain-runtime/runtime-benchmarks", "sc-service/runtime-benchmarks", "sp-runtime/runtime-benchmarks", ] diff --git a/cumulus/test/service/src/chain_spec.rs b/cumulus/test/service/src/chain_spec.rs index 174d478f2575..ae71028ad486 100644 --- a/cumulus/test/service/src/chain_spec.rs +++ b/cumulus/test/service/src/chain_spec.rs @@ -66,9 +66,10 @@ where pub fn get_chain_spec_with_extra_endowed( id: Option, extra_endowed_accounts: Vec, + code: &[u8], ) -> ChainSpec { ChainSpec::builder( - cumulus_test_runtime::WASM_BINARY.expect("WASM binary was not built, please build it!"), + code, Extensions { para_id: id.unwrap_or(cumulus_test_runtime::PARACHAIN_ID.into()).into() }, ) .with_name("Local Testnet") @@ -83,7 +84,21 @@ pub fn get_chain_spec_with_extra_endowed( /// Get the chain spec for a specific parachain ID. pub fn get_chain_spec(id: Option) -> ChainSpec { - get_chain_spec_with_extra_endowed(id, Default::default()) + get_chain_spec_with_extra_endowed( + id, + Default::default(), + cumulus_test_runtime::WASM_BINARY.expect("WASM binary was not built, please build it!"), + ) +} + +/// Get the chain spec for a specific parachain ID. +pub fn get_elastic_scaling_chain_spec(id: Option) -> ChainSpec { + get_chain_spec_with_extra_endowed( + id, + Default::default(), + cumulus_test_runtime::elastic_scaling::WASM_BINARY + .expect("WASM binary was not built, please build it!"), + ) } /// Local testnet genesis for testing. diff --git a/cumulus/test/service/src/cli.rs b/cumulus/test/service/src/cli.rs index 87d1d4af8a95..37ca27542cbf 100644 --- a/cumulus/test/service/src/cli.rs +++ b/cumulus/test/service/src/cli.rs @@ -50,6 +50,12 @@ pub struct TestCollatorCli { #[arg(long)] pub fail_pov_recovery: bool, + + /// EXPERIMENTAL: Use slot-based collator which can handle elastic scaling. + /// + /// Use with care, this flag is unstable and subject to change. + #[arg(long)] + pub experimental_use_slot_based: bool, } #[derive(Debug, clap::Subcommand)] @@ -253,8 +259,16 @@ impl SubstrateCli for TestCollatorCli { fn load_spec(&self, id: &str) -> std::result::Result, String> { Ok(match id { - "" => - Box::new(cumulus_test_service::get_chain_spec(Some(ParaId::from(2000)))) as Box<_>, + "" => { + tracing::info!("Using default test service chain spec."); + Box::new(cumulus_test_service::get_chain_spec(Some(ParaId::from(2000)))) as Box<_> + }, + "elastic-scaling" => { + tracing::info!("Using elastic-scaling chain spec."); + Box::new(cumulus_test_service::get_elastic_scaling_chain_spec(Some(ParaId::from( + 2100, + )))) as Box<_> + }, path => { let chain_spec = cumulus_test_service::chain_spec::ChainSpec::from_json_file(path.into())?; diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index 6f8b9d19bb29..51cdebbaf54e 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -25,7 +25,10 @@ pub mod chain_spec; use cumulus_client_collator::service::CollatorService; use cumulus_client_consensus_aura::{ - collators::lookahead::{self as aura, Params as AuraParams}, + collators::{ + lookahead::{self as aura, Params as AuraParams}, + slot_based::{self as slot_based, Params as SlotBasedParams}, + }, ImportQueueParams, }; use cumulus_client_consensus_proposer::Proposer; @@ -45,7 +48,7 @@ use cumulus_client_cli::{CollatorOptions, RelayChainMode}; use cumulus_client_consensus_common::{ ParachainBlockImport as TParachainBlockImport, ParachainCandidate, ParachainConsensus, }; -use cumulus_client_pov_recovery::RecoveryHandle; +use cumulus_client_pov_recovery::{RecoveryDelayRange, RecoveryHandle}; #[allow(deprecated)] use cumulus_client_service::old_consensus; use cumulus_client_service::{ @@ -304,7 +307,7 @@ async fn build_relay_chain_interface( /// Start a node with the given parachain `Configuration` and relay chain `Configuration`. /// /// This is the actual implementation that is abstract over the executor and the runtime api. -#[sc_tracing::logging::prefix_logs_with(parachain_config.network.node_name.as_str())] +#[sc_tracing::logging::prefix_logs_with("Parachain")] pub async fn start_node_impl>( parachain_config: Configuration, collator_key: Option, @@ -316,6 +319,7 @@ pub async fn start_node_impl>( consensus: Consensus, collator_options: CollatorOptions, proof_recording_during_import: bool, + use_slot_based_collator: bool, ) -> sc_service::error::Result<( TaskManager, Arc, @@ -409,7 +413,6 @@ where } else { Box::new(overseer_handle.clone()) }; - let is_collator = collator_key.is_some(); let relay_chain_slot_duration = Duration::from_secs(6); start_relay_chain_tasks(StartRelayChainTasksParams { @@ -418,11 +421,11 @@ where para_id, relay_chain_interface: relay_chain_interface.clone(), task_manager: &mut task_manager, - da_recovery_profile: if is_collator { - DARecoveryProfile::Collator - } else { - DARecoveryProfile::FullNode - }, + // Increase speed of recovery for testing purposes. + da_recovery_profile: DARecoveryProfile::Other(RecoveryDelayRange { + min: Duration::from_secs(1), + max: Duration::from_secs(5), + }), import_queue: import_queue_service, relay_chain_slot_duration, recovery_handle, @@ -461,29 +464,72 @@ where ); let client_for_aura = client.clone(); - let params = AuraParams { - create_inherent_data_providers: move |_, ()| async move { Ok(()) }, - block_import, - para_client: client.clone(), - para_backend: backend.clone(), - relay_client: relay_chain_interface, - code_hash_provider: move |block_hash| { - client_for_aura.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash()) - }, - sync_oracle: sync_service, - keystore, - collator_key, - para_id, - overseer_handle, - relay_chain_slot_duration, - proposer, - collator_service, - authoring_duration: Duration::from_millis(2000), - reinitialize: false, - }; - let fut = aura::run::(params); - task_manager.spawn_essential_handle().spawn("aura", None, fut); + if use_slot_based_collator { + tracing::info!(target: LOG_TARGET, "Starting block authoring with slot based authoring."); + let params = SlotBasedParams { + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import, + para_client: client.clone(), + para_backend: backend.clone(), + relay_client: relay_chain_interface, + code_hash_provider: move |block_hash| { + client_for_aura + .code_at(block_hash) + .ok() + .map(|c| ValidationCode::from(c).hash()) + }, + keystore, + collator_key, + para_id, + relay_chain_slot_duration, + proposer, + collator_service, + authoring_duration: Duration::from_millis(2000), + reinitialize: false, + slot_drift: Duration::from_secs(1), + }; + + let (collation_future, block_builer_future) = + slot_based::run::(params); + task_manager.spawn_essential_handle().spawn( + "collation-task", + None, + collation_future, + ); + task_manager.spawn_essential_handle().spawn( + "block-builder-task", + None, + block_builer_future, + ); + } else { + tracing::info!(target: LOG_TARGET, "Starting block authoring with lookahead collator."); + let params = AuraParams { + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import, + para_client: client.clone(), + para_backend: backend.clone(), + relay_client: relay_chain_interface, + code_hash_provider: move |block_hash| { + client_for_aura + .code_at(block_hash) + .ok() + .map(|c| ValidationCode::from(c).hash()) + }, + keystore, + collator_key, + para_id, + overseer_handle, + relay_chain_slot_duration, + proposer, + collator_service, + authoring_duration: Duration::from_millis(2000), + reinitialize: false, + }; + + let fut = aura::run::(params); + task_manager.spawn_essential_handle().spawn("aura", None, fut); + } } } @@ -720,6 +766,7 @@ impl TestNodeBuilder { self.consensus, collator_options, self.record_proof_during_import, + false, ) .await .expect("could not create Cumulus test service"), @@ -735,6 +782,7 @@ impl TestNodeBuilder { self.consensus, collator_options, self.record_proof_during_import, + false, ) .await .expect("could not create Cumulus test service"), @@ -766,8 +814,11 @@ pub fn node_config( let root = base_path.path().join(format!("cumulus_test_service_{}", key)); let role = if is_collator { Role::Authority } else { Role::Full }; let key_seed = key.to_seed(); - let mut spec = - Box::new(chain_spec::get_chain_spec_with_extra_endowed(Some(para_id), endowed_accounts)); + let mut spec = Box::new(chain_spec::get_chain_spec_with_extra_endowed( + Some(para_id), + endowed_accounts, + cumulus_test_runtime::WASM_BINARY.expect("WASM binary was not built, please build it!"), + )); let mut storage = spec.as_storage_builder().build_storage().expect("could not build storage"); diff --git a/cumulus/test/service/src/main.rs b/cumulus/test/service/src/main.rs index 90d37173dd59..9357978b769a 100644 --- a/cumulus/test/service/src/main.rs +++ b/cumulus/test/service/src/main.rs @@ -118,6 +118,7 @@ fn main() -> Result<(), sc_cli::Error> { consensus, collator_options, true, + cli.experimental_use_slot_based, ) .await, sc_network::config::NetworkBackendType::Litep2p => @@ -135,6 +136,7 @@ fn main() -> Result<(), sc_cli::Error> { consensus, collator_options, true, + cli.experimental_use_slot_based, ) .await, } diff --git a/cumulus/zombienet/tests/0003-full_node_catching_up.zndsl b/cumulus/zombienet/tests/0003-full_node_catching_up.zndsl index 49b6d9e94fd1..e1e8442f3050 100644 --- a/cumulus/zombienet/tests/0003-full_node_catching_up.zndsl +++ b/cumulus/zombienet/tests/0003-full_node_catching_up.zndsl @@ -6,3 +6,6 @@ alice: parachain 2000 is registered within 225 seconds dave: reports block height is at least 7 within 250 seconds eve: reports block height is at least 7 within 250 seconds ferdie: reports block height is at least 7 within 250 seconds + +# We want to make sure that none of the consensus hook checks fail, even if the chain makes progress +charlie: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds diff --git a/cumulus/zombienet/tests/0006-rpc_collator_builds_blocks.zndsl b/cumulus/zombienet/tests/0006-rpc_collator_builds_blocks.zndsl index 7da8416d0161..b14c15ed5e5b 100644 --- a/cumulus/zombienet/tests/0006-rpc_collator_builds_blocks.zndsl +++ b/cumulus/zombienet/tests/0006-rpc_collator_builds_blocks.zndsl @@ -13,3 +13,7 @@ two: restart after 1 seconds three: restart after 20 seconds dave: is up dave: reports block height is at least 30 within 200 seconds + +# We want to make sure that none of the consensus hook checks fail, even if the chain makes progress +dave: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds +eve: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds diff --git a/cumulus/zombienet/tests/0008-elastic_authoring.toml b/cumulus/zombienet/tests/0008-elastic_authoring.toml new file mode 100644 index 000000000000..f2e2010a9e45 --- /dev/null +++ b/cumulus/zombienet/tests/0008-elastic_authoring.toml @@ -0,0 +1,50 @@ +[settings] +timeout = 1000 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.async_backing_params] + max_candidate_depth = 6 + allowed_ancestry_len = 3 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params] + max_validators_per_core = 1 + num_cores = 4 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params] + max_approval_coalesce_count = 5 + +[relaychain] +default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}" +chain = "rococo-local" +command = "polkadot" + + [[relaychain.nodes]] + name = "alice" + args = ["" ] + + [[relaychain.node_groups]] + name = "validator" + args = ["-lruntime=debug,parachain=trace" ] + count = 8 + +# Slot based authoring with 3 cores and 2s slot duration +[[parachains]] +id = 2100 +chain = "elastic-scaling" +add_to_genesis = true + + [[parachains.collators]] + name = "collator-elastic" + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug", "--force-authoring", "--experimental-use-slot-based"] + +# Slot based authoring with 1 core and 6s slot duration +[[parachains]] +id = 2000 +add_to_genesis = true + + [[parachains.collators]] + name = "collator-single-core" + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug", "--force-authoring", "--experimental-use-slot-based"] diff --git a/cumulus/zombienet/tests/0008-elastic_authoring.zndsl b/cumulus/zombienet/tests/0008-elastic_authoring.zndsl new file mode 100644 index 000000000000..a06ffd24fefd --- /dev/null +++ b/cumulus/zombienet/tests/0008-elastic_authoring.zndsl @@ -0,0 +1,19 @@ +Description: Slot based authoring for elastic scaling +Network: ./0008-elastic_authoring.toml +Creds: config + +alice: is up +collator-elastic: is up +collator-single-core: is up + + +# configure relay chain +alice: js-script ./assign-core.js with "2100,0" return is 0 within 600 seconds +alice: js-script ./assign-core.js with "2100,1" return is 0 within 600 seconds + +collator-single-core: reports block height is at least 20 within 225 seconds +collator-elastic: reports block height is at least 40 within 225 seconds + +# We want to make sure that none of the consensus hook checks fail, even if the chain makes progress +collator-elastic: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds +collator-single-core: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds diff --git a/cumulus/zombienet/tests/0009-elastic_pov_recovery.toml b/cumulus/zombienet/tests/0009-elastic_pov_recovery.toml new file mode 100644 index 000000000000..9b296e8a8b36 --- /dev/null +++ b/cumulus/zombienet/tests/0009-elastic_pov_recovery.toml @@ -0,0 +1,48 @@ +[settings] +timeout = 1000 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.async_backing_params] + max_candidate_depth = 6 + allowed_ancestry_len = 3 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params] + max_validators_per_core = 1 + num_cores = 4 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params] + max_approval_coalesce_count = 5 + +[relaychain] +default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}" +chain = "rococo-local" +command = "polkadot" + + [[relaychain.nodes]] + name = "alice" + args = ["" ] + + [[relaychain.node_groups]] + name = "validator" + args = ["-lruntime=debug,parachain=trace", "--reserved-only", "--reserved-nodes {{'alice'|zombie('multiAddress')}}"] + count = 8 + +# Slot based authoring with 3 cores and 2s slot duration +[[parachains]] +id = 2100 +chain = "elastic-scaling" +add_to_genesis = true + + # Slot based authoring with 3 cores and 2s slot duration + [[parachains.collators]] + name = "collator-elastic" + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["--disable-block-announcements", "-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug", "--force-authoring", "--experimental-use-slot-based"] + + # run 'recovery-target' as a parachain full node + [[parachains.collators]] + name = "recovery-target" + validator = false # full node + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--disable-block-announcements", "--bootnodes {{'collator-elastic'|zombie('multiAddress')}}", "--in-peers 0", "--out-peers 0", "--", "--reserved-only", "--reserved-nodes {{'alice'|zombie('multiAddress')}}"] diff --git a/cumulus/zombienet/tests/0009-elastic_pov_recovery.zndsl b/cumulus/zombienet/tests/0009-elastic_pov_recovery.zndsl new file mode 100644 index 000000000000..3a805078112c --- /dev/null +++ b/cumulus/zombienet/tests/0009-elastic_pov_recovery.zndsl @@ -0,0 +1,19 @@ +Description: Elastic scaling PoV recovery test +Network: ./0009-elastic_pov_recovery.toml +Creds: config + +alice: is up +collator-elastic: is up + +# wait 20 blocks and register parachain +alice: reports block height is at least 20 within 250 seconds + +# configure relay chain +alice: js-script ./assign-core.js with "2100,0" return is 0 within 600 seconds +alice: js-script ./assign-core.js with "2100,1" return is 0 within 600 seconds + +# check block production +collator-elastic: reports block height is at least 40 within 225 seconds +collator-elastic: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds + +recovery-target: count of log lines containing "Importing block retrieved using pov_recovery" is greater than 35 within 10 seconds diff --git a/cumulus/zombienet/tests/assign-core.js b/cumulus/zombienet/tests/assign-core.js new file mode 100644 index 000000000000..4179b68b2e3c --- /dev/null +++ b/cumulus/zombienet/tests/assign-core.js @@ -0,0 +1,46 @@ +// Assign a parachain to a core. +// +// First argument should be the parachain id. +// Second argument should be the core. +async function run(nodeName, networkInfo, args) { + const { wsUri, userDefinedTypes } = networkInfo.nodesByName[nodeName]; + const api = await zombie.connect(wsUri, userDefinedTypes); + + let para = Number(args[0]); + let core = Number(args[1]); + console.log(`Assigning para ${para} to core ${core}`); + + await zombie.util.cryptoWaitReady(); + + // Submit transaction with Alice accoung + const keyring = new zombie.Keyring({ type: "sr25519" }); + const alice = keyring.addFromUri("//Alice"); + + // Wait for this transaction to be finalized in a block. + await new Promise(async (resolve, reject) => { + const unsub = await api.tx.sudo + .sudo(api.tx.coretime.assignCore(core, 0, [[{ task: para }, 57600]], null)) + .signAndSend(alice, ({ status, isError }) => { + if (status.isInBlock) { + console.log( + `Transaction included at blockhash ${status.asInBlock}`, + ); + } else if (status.isFinalized) { + console.log( + `Transaction finalized at blockHash ${status.asFinalized}`, + ); + unsub(); + return resolve(); + } else if (isError) { + console.log(`Transaction error`); + reject(`Transaction error`); + } + }); + }); + + + + return 0; +} + +module.exports = { run }; diff --git a/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.toml b/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.toml index 83f5434edddb..611978a33a5f 100644 --- a/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.toml +++ b/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.toml @@ -7,11 +7,9 @@ timeout = 1000 [relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params] max_validators_per_core = 1 - scheduling_lookahead = 2 num_cores = 3 [relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params] - needed_approvals = 3 max_approval_coalesce_count = 5 [relaychain] @@ -48,4 +46,4 @@ addToGenesis = true [types.Header] number = "u64" parent_hash = "Hash" -post_state = "Hash" \ No newline at end of file +post_state = "Hash" diff --git a/prdoc/pr_4097.prdoc b/prdoc/pr_4097.prdoc new file mode 100644 index 000000000000..2804a9571c79 --- /dev/null +++ b/prdoc/pr_4097.prdoc @@ -0,0 +1,45 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Introduce experimental slot-based collator + +doc: + - audience: Node Operator + description: | + Introduces an experimental collator that is fit fot elastic-scaling. + It can be activated on `test-parachain` and `polkadot-parachain` binaries via + `--experimental-use-slot-based` flag. The current implementation is MVP status and purely + for testing. Behaviour can change any time and should not be relied upon in environments with + any stability requirements. + +crates: + - name: cumulus-client-consensus-aura + bump: major + - name: cumulus-client-consensus-common + bump: minor + - name: cumulus-client-pov-recovery + bump: none + validate: false + - name: cumulus-pallet-aura-ext + bump: patch + - name: cumulus-relay-chain-interface + bump: major + validate: false + - name: sc-consensus-slots + bump: minor + - name: sc-basic-authorship + bump: patch + - name: cumulus-client-network + bump: none + validate: false + - name: cumulus-relay-chain-inprocess-interface + bump: minor + - name: sc-consensus-aura + bump: patch + - name: cumulus-relay-chain-rpc-interface + bump: minor + - name: polkadot-parachain-bin + bump: patch + - name: polkadot + bump: none + validate: false diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs index 1519c76c42c0..74805488792a 100644 --- a/substrate/client/basic-authorship/src/basic_authorship.rs +++ b/substrate/client/basic-authorship/src/basic_authorship.rs @@ -205,7 +205,11 @@ where ) -> Proposer { let parent_hash = parent_header.hash(); - info!("🙌 Starting consensus session on top of parent {:?}", parent_hash); + info!( + "🙌 Starting consensus session on top of parent {:?} (#{})", + parent_hash, + parent_header.number() + ); let proposer = Proposer::<_, _, _, PR> { spawn_handle: self.spawn_handle.clone(), diff --git a/substrate/client/consensus/aura/src/standalone.rs b/substrate/client/consensus/aura/src/standalone.rs index 0f9b8668d447..c1536d9ef73f 100644 --- a/substrate/client/consensus/aura/src/standalone.rs +++ b/substrate/client/consensus/aura/src/standalone.rs @@ -24,7 +24,7 @@ use log::trace; use codec::Codec; -use sc_client_api::{backend::AuxStore, UsageProvider}; +use sc_client_api::UsageProvider; use sp_api::{Core, ProvideRuntimeApi}; use sp_application_crypto::{AppCrypto, AppPublic}; use sp_blockchain::Result as CResult; @@ -48,7 +48,7 @@ pub fn slot_duration(client: &C) -> CResult where A: Codec, B: BlockT, - C: AuxStore + ProvideRuntimeApi + UsageProvider, + C: ProvideRuntimeApi + UsageProvider, C::Api: AuraApi, { slot_duration_at(client, client.usage_info().chain.best_hash) @@ -59,7 +59,7 @@ pub fn slot_duration_at(client: &C, block_hash: B::Hash) -> CResult, + C: ProvideRuntimeApi, C::Api: AuraApi, { client.runtime_api().slot_duration(block_hash).map_err(|err| err.into()) diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs index d9d792005312..7cdf90877dff 100644 --- a/substrate/client/consensus/slots/src/lib.rs +++ b/substrate/client/consensus/slots/src/lib.rs @@ -29,8 +29,8 @@ mod aux_schema; mod slots; pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND}; -pub use slots::SlotInfo; use slots::Slots; +pub use slots::{time_until_next_slot, SlotInfo}; use futures::{future::Either, Future, TryFutureExt}; use futures_timer::Delay; diff --git a/templates/parachain/node/src/service.rs b/templates/parachain/node/src/service.rs index bf44207acc9c..3e7d4de10553 100644 --- a/templates/parachain/node/src/service.rs +++ b/templates/parachain/node/src/service.rs @@ -35,7 +35,6 @@ use sc_client_api::Backend; use sc_consensus::ImportQueue; use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY}; use sc_network::NetworkBlock; -use sc_network_sync::SyncingService; use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager}; use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; @@ -172,7 +171,6 @@ fn start_consensus( task_manager: &TaskManager, relay_chain_interface: Arc, transaction_pool: Arc>, - sync_oracle: Arc>, keystore: KeystorePtr, relay_chain_slot_duration: Duration, para_id: ParaId, @@ -206,7 +204,6 @@ fn start_consensus( code_hash_provider: move |block_hash| { client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash()) }, - sync_oracle, keystore, collator_key, para_id, @@ -217,11 +214,9 @@ fn start_consensus( authoring_duration: Duration::from_millis(2000), reinitialize: false, }; - - let fut = - aura::run::( - params, - ); + let fut = aura::run::( + params, + ); task_manager.spawn_essential_handle().spawn("aura", None, fut); Ok(()) @@ -398,7 +393,6 @@ pub async fn start_parachain_node( &task_manager, relay_chain_interface, transaction_pool, - sync_service, params.keystore_container.keystore(), relay_chain_slot_duration, para_id,