Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

add warp_sync_params #1909

Merged
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
7d1dab1
wait for relay chain to sync then get parachain header
samelamin Dec 2, 2022
e15abde
Spawn new thread to wait for the target block
samelamin Dec 4, 2022
69162d9
second round of comments from the PR on substrate
samelamin Dec 12, 2022
ae91f31
third round of pr comments
samelamin Dec 19, 2022
ceab50b
add zombienet tests
samelamin Dec 19, 2022
4966862
rebase issues
samelamin Dec 19, 2022
ae3823b
refactor tests based on pr comments
samelamin Dec 20, 2022
ddb2281
rebase issues
samelamin Dec 20, 2022
5b9dd05
pr comments
samelamin Jan 4, 2023
b15c844
passing zombienet test
samelamin Jan 5, 2023
667f594
cargo +nightly fmt
samelamin Jan 6, 2023
51b12ae
Update client/network/src/lib.rs
samelamin Jan 9, 2023
05eb1bb
Update client/service/src/lib.rs
samelamin Jan 9, 2023
d929665
Update client/service/src/lib.rs
samelamin Jan 9, 2023
4737de1
Update client/service/src/lib.rs
samelamin Jan 9, 2023
7f66efd
Update client/service/src/lib.rs
samelamin Jan 9, 2023
7f2b6e7
Update client/network/src/lib.rs
samelamin Jan 9, 2023
5a37fb9
Update client/network/src/lib.rs
samelamin Jan 9, 2023
2ac5478
Update client/network/src/lib.rs
samelamin Jan 9, 2023
18dff50
Update client/network/src/lib.rs
samelamin Jan 9, 2023
8e44783
use cargo lock from master
samelamin Jan 9, 2023
4ea44ba
pr comments
samelamin Jan 9, 2023
39732ec
cargo fmt
samelamin Jan 9, 2023
72e0aef
use finalised block instead of best block
samelamin Jan 9, 2023
3a26c23
use import notification stream
samelamin Jan 10, 2023
5a11b71
rebase changes
samelamin Jan 12, 2023
4fc942d
Update client/network/src/lib.rs
samelamin Jan 13, 2023
942a280
Update client/network/src/lib.rs
samelamin Jan 13, 2023
2873567
Update client/network/src/lib.rs
samelamin Jan 13, 2023
3a293b1
Update client/network/src/lib.rs
samelamin Jan 13, 2023
96954a1
Update client/network/src/lib.rs
samelamin Jan 13, 2023
553b61d
Update client/service/src/lib.rs
samelamin Jan 13, 2023
469ea1e
Update client/relay-chain-interface/src/lib.rs
samelamin Jan 13, 2023
4c60006
Update client/relay-chain-interface/src/lib.rs
samelamin Jan 13, 2023
a5eb3f4
pr comments
samelamin Jan 13, 2023
701875f
use new file names
samelamin Jan 19, 2023
d212069
db snaphots moved to google cloud storage
michalkucharczyk Jan 20, 2023
1371d38
Update client/network/src/lib.rs
samelamin Jan 20, 2023
c1acd0b
Update client/service/src/lib.rs
samelamin Jan 20, 2023
c676617
Update client/service/src/lib.rs
samelamin Jan 20, 2023
379ec15
Update client/service/src/lib.rs
samelamin Jan 20, 2023
28c8b7c
Update client/service/src/lib.rs
samelamin Jan 20, 2023
2097a8a
Update client/service/src/lib.rs
samelamin Jan 20, 2023
286388d
Update client/service/src/lib.rs
samelamin Jan 20, 2023
0abef7d
Update client/service/src/lib.rs
samelamin Jan 20, 2023
bf875f9
Update client/service/src/lib.rs
samelamin Jan 20, 2023
8df4983
Update client/service/src/lib.rs
samelamin Jan 23, 2023
78a201c
pr comments
samelamin Jan 23, 2023
218b684
Update zombienet/tests/0007-full_node_warp_sync.toml
samelamin Jan 24, 2023
f76abda
Update zombienet/tests/0007-full_node_warp_sync.toml
samelamin Jan 24, 2023
80edadf
Scenario 1
samelamin Jan 24, 2023
ec03c83
Use test-parachain
samelamin Jan 26, 2023
f71b9a7
use test-parachain chainspecs
samelamin Jan 26, 2023
3a65290
remove relay chain spec as it is no longer required
samelamin Jan 26, 2023
f9eed93
add back relaychain spec file
samelamin Jan 27, 2023
bba1f59
pr comments
samelamin Jan 27, 2023
fdd54d6
Upload snapshots to google cloud
skunert Feb 6, 2023
5f2f888
Update zombienet/tests/0007-prepare-warp-sync-db-snapshot.md
samelamin Feb 6, 2023
37ef032
update documentation
samelamin Feb 6, 2023
fbac1db
Fix snapshot URLs
skunert Feb 7, 2023
ad0f0ec
Merge branch 'master' into add_warp_sync_params
samelamin Feb 7, 2023
427cfae
Merge branch 'master' into add_warp_sync_params
samelamin Feb 7, 2023
91a7978
use master lock file
samelamin Feb 7, 2023
ac04dad
add finalized_block_hash
samelamin Feb 8, 2023
9ece6c9
Patch diener for CI
skunert Feb 8, 2023
be55330
Bump Zombienet
skunert Feb 8, 2023
862f46a
Add 0007 zombienet test
skunert Feb 8, 2023
3d0be71
Bump zombienet
skunert Feb 10, 2023
56d931f
Revert "Patch diener for CI"
skunert Feb 13, 2023
e8fa358
Merge branch 'master' into add_warp_sync_params
samelamin Feb 13, 2023
ffc3179
merge fixes
samelamin Feb 13, 2023
d46590f
use master lock file
samelamin Feb 13, 2023
ba7d4a0
Merge branch 'master' into add_warp_sync_params
samelamin Feb 14, 2023
f6b6d15
Update Substrate & Polkadot
bkchr Feb 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use polkadot_primitives::{

use codec::{Decode, DecodeAll, Encode};
use futures::{channel::oneshot, future::FutureExt, Future};

use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc};

#[cfg(test)]
samelamin marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
4 changes: 4 additions & 0 deletions client/relay-chain-inprocess-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ where
Ok(self.backend.blockchain().info().best_hash)
}

async fn finalized_block_hash(&self) -> RelayChainResult<PHash> {
Ok(self.backend.blockchain().info().finalized_hash)
}

async fn is_major_syncing(&self) -> RelayChainResult<bool> {
Ok(self.sync_oracle.is_major_syncing())
}
Expand Down
7 changes: 7 additions & 0 deletions client/relay-chain-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ pub trait RelayChainInterface: Send + Sync {
/// Get the hash of the current best block.
async fn best_block_hash(&self) -> RelayChainResult<PHash>;

/// Get the hash of the finalized block.
async fn finalized_block_hash(&self) -> RelayChainResult<PHash>;

/// Returns the whole contents of the downward message queue for the parachain we are collating
/// for.
///
Expand Down Expand Up @@ -248,6 +251,10 @@ where
(**self).best_block_hash().await
}

async fn finalized_block_hash(&self) -> RelayChainResult<PHash> {
(**self).finalized_block_hash().await
}

async fn is_major_syncing(&self) -> RelayChainResult<bool> {
(**self).is_major_syncing().await
}
Expand Down
4 changes: 4 additions & 0 deletions client/relay-chain-rpc-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ impl RelayChainInterface for RelayChainRpcInterface {
self.rpc_client.chain_get_head(None).await
}

async fn finalized_block_hash(&self) -> RelayChainResult<RelayHash> {
self.rpc_client.chain_get_finalized_head().await
}

async fn is_major_syncing(&self) -> RelayChainResult<bool> {
self.rpc_client.system_health().await.map(|h| h.is_syncing)
}
Expand Down
9 changes: 8 additions & 1 deletion client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@ futures = "0.3.24"
# Substrate
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-rpc = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-sysinfo = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network-transactions = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" }

# Polkadot
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
Expand All @@ -28,7 +34,8 @@ cumulus-client-cli = { path = "../cli" }
cumulus-client-collator = { path = "../collator" }
cumulus-client-consensus-common = { path = "../consensus/common" }
cumulus-client-pov-recovery = { path = "../pov-recovery" }
cumulus-client-network = { path = "../network" }
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
cumulus-relay-chain-inprocess-interface = { path = "../relay-chain-inprocess-interface" }
cumulus-relay-chain-minimal-node = { path = "../relay-chain-minimal-node" }
cumulus-relay-chain-minimal-node = { path = "../relay-chain-minimal-node" }
203 changes: 192 additions & 11 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,38 @@

use cumulus_client_cli::CollatorOptions;
use cumulus_client_consensus_common::ParachainConsensus;
use cumulus_client_network::BlockAnnounceValidator;
use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelay};
use cumulus_primitives_core::{CollectCollationInfo, ParaId};
use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node;
use polkadot_primitives::CollatorPair;

use futures::{
channel::{mpsc, oneshot},
FutureExt, StreamExt,
};
use polkadot_primitives::{CollatorPair, OccupiedCoreAssumption};
use sc_client_api::{
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, UsageProvider,
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider,
};
use sc_consensus::{import_queue::ImportQueueService, BlockImport};
use sc_service::{Configuration, TaskManager};
use sc_telemetry::TelemetryWorkerHandle;
use sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue};
use sc_network::{config::SyncMode, NetworkService};
use sc_network_transactions::TransactionsHandlerController;
use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncParams};
use sc_telemetry::{log, TelemetryWorkerHandle};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_core::traits::SpawnNamed;
use sp_runtime::traits::Block as BlockT;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_core::{traits::SpawnNamed, Decode};
use sp_runtime::traits::{Block as BlockT, BlockIdTo};
use std::{fmt, sync::Arc, time::Duration};

use futures::channel::mpsc;
use std::{sync::Arc, time::Duration};

// Given the sporadic nature of the explicit recovery operation and the
// possibility to retry infinite times this value is more than enough.
// In practice here we expect no more than one queued messages.
const RECOVERY_CHAN_SIZE: usize = 8;
const LOG_TARGET_SYNC: &str = "sync::cumulus";

/// Parameters given to [`start_collator`].
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner> {
Expand Down Expand Up @@ -264,3 +271,177 @@ pub async fn build_relay_chain_interface(
)
}
}

/// Parameters given to [`build_network`].
pub struct BuildNetworkParams<
'a,
Block: BlockT,
Client: ProvideRuntimeApi<Block>
+ BlockBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ HeaderBackend<Block>
+ BlockIdTo<Block>
+ 'static,
RCInterface,
IQ,
> where
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
{
pub parachain_config: &'a Configuration,
pub client: Arc<Client>,
pub transaction_pool: Arc<sc_transaction_pool::FullPool<Block, Client>>,
pub para_id: ParaId,
pub relay_chain_interface: RCInterface,
pub spawn_handle: SpawnTaskHandle,
pub import_queue: IQ,
}

/// Build the network service, the network status sinks and an RPC sender.
pub async fn build_network<'a, Block, Client, RCInterface, IQ>(
BuildNetworkParams {
parachain_config,
client,
transaction_pool,
para_id,
spawn_handle,
relay_chain_interface,
import_queue,
}: BuildNetworkParams<'a, Block, Client, RCInterface, IQ>,
) -> sc_service::error::Result<(
Arc<NetworkService<Block, Block::Hash>>,
TracingUnboundedSender<sc_rpc::system::Request<Block>>,
TransactionsHandlerController<Block::Hash>,
NetworkStarter,
)>
where
Block: BlockT,
Client: UsageProvider<Block>
+ HeaderBackend<Block>
+ sp_consensus::block_validation::Chain<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>
+ ProvideRuntimeApi<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ BlockIdTo<Block, Error = sp_blockchain::Error>
+ ProofProvider<Block>
+ 'static,
Client::Api: CollectCollationInfo<Block>
+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
for<'b> &'b Client: BlockImport<Block>,
RCInterface: RelayChainInterface + Clone + 'static,
IQ: ImportQueue<Block> + 'static,
{
let warp_sync_params = match parachain_config.network.sync_mode {
SyncMode::Warp => {
let target_block = warp_sync_get::<Block, RCInterface>(
para_id,
relay_chain_interface.clone(),
spawn_handle.clone(),
)
.await
.map_err(|e| format!("Error: {:?}", e))?;
Some(WarpSyncParams::WaitForTarget(target_block))
},
_ => None,
};

let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface, para_id);
let block_announce_validator_builder = move |_| Box::new(block_announce_validator) as Box<_>;

sc_service::build_network(sc_service::BuildNetworkParams {
config: parachain_config,
client,
transaction_pool,
spawn_handle,
import_queue,
block_announce_validator_builder: Some(Box::new(block_announce_validator_builder)),
warp_sync_params,
})
}

/// Creates a new background task to wait for the relay chain to sync up and retrieve the parachain header
async fn warp_sync_get<B, RCInterface>(
samelamin marked this conversation as resolved.
Show resolved Hide resolved
para_id: ParaId,
relay_chain_interface: RCInterface,
spawner: SpawnTaskHandle,
) -> Result<oneshot::Receiver<<B as BlockT>::Header>, WarpSyncError>
samelamin marked this conversation as resolved.
Show resolved Hide resolved
where
B: BlockT + 'static,
RCInterface: RelayChainInterface + 'static,
{
let (sender, receiver) = oneshot::channel::<B::Header>();
spawner.spawn(
"cumulus-parachain-wait-for-target-block",
None,
async move {
log::debug!(
target: "cumulus-network",
"waiting for announce block in a background task...",
);

let _ = wait_for_target_block::<B, _>(sender, para_id, relay_chain_interface)
.await
.map_err(|e| {
log::error!(
target: LOG_TARGET_SYNC,
"Unable to determine parachain target block {:?}",
e
)
});
}
.boxed(),
);

Ok(receiver)
}

/// Waits for the relay chain to have finished syncing and then gets the parachain header that corresponds to the last finalized relay chain block.
async fn wait_for_target_block<B, RCInterface>(
sender: oneshot::Sender<<B as BlockT>::Header>,
para_id: ParaId,
relay_chain_interface: RCInterface,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
B: BlockT + 'static,
RCInterface: RelayChainInterface + Send + 'static,
{
let mut imported_blocks = relay_chain_interface.import_notification_stream().await?.fuse();
while imported_blocks.next().await.is_some() {
let is_syncing = relay_chain_interface
.is_major_syncing()
.await
.map_err(|e| log::error!(target: LOG_TARGET_SYNC, "Unable to determine sync status. {e}"))
.unwrap_or(false);
samelamin marked this conversation as resolved.
Show resolved Hide resolved

if !is_syncing {
let relay_chain_best_hash = relay_chain_interface
.finalized_block_hash()
.await
.map_err(|e| Box::new(e) as Box<_>)?;

let validation_data = relay_chain_interface
.persisted_validation_data(
relay_chain_best_hash,
para_id,
OccupiedCoreAssumption::TimedOut,
)
.await
.map_err(|e| Box::new(format!("{e:?}", e)) as Box<_>)?
.ok_or_else(|| {
Box::new("Could not find parachain head in relay chain")
as Box<_>
})?;

let target_block =
B::Header::decode(&mut &validation_data.parent_head.0[..]).map_err(|e| format!("Failed to decode parachain head: {e}"))?;

log::debug!(target: LOG_TARGET_SYNC, "Target block reached {:?}", target_block);
let _ = sender.send(target_block);
return Ok(())
}
}

Err("Stopping following imported blocks. Could not determine parachain target block".into())
}
21 changes: 8 additions & 13 deletions parachain-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ use cumulus_client_consensus_aura::{AuraConsensus, BuildAuraConsensusParams, Slo
use cumulus_client_consensus_common::{
ParachainBlockImport as TParachainBlockImport, ParachainConsensus,
};
use cumulus_client_network::BlockAnnounceValidator;
use cumulus_client_service::{
build_relay_chain_interface, prepare_node_config, start_collator, start_full_node,
StartCollatorParams, StartFullNodeParams,
build_network, build_relay_chain_interface, prepare_node_config, start_collator,
start_full_node, BuildNetworkParams, StartCollatorParams, StartFullNodeParams,
};
use cumulus_primitives_core::ParaId;
use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface};
Expand Down Expand Up @@ -167,27 +166,23 @@ async fn start_node_impl(
s => s.to_string().into(),
})?;

let block_announce_validator =
BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id);

let force_authoring = parachain_config.force_authoring;
let validator = parachain_config.role.is_authority();
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let import_queue_service = params.import_queue.service();

let (network, system_rpc_tx, tx_handler_controller, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &parachain_config,
build_network(BuildNetworkParams {
parachain_config: &parachain_config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
para_id,
spawn_handle: task_manager.spawn_handle(),
relay_chain_interface: relay_chain_interface.clone(),
import_queue: params.import_queue,
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
warp_sync: None,
})?;
})
.await?;

if parachain_config.offchain_worker.enabled {
sc_service::build_offchain_workers(
Expand Down
Loading