From 6d819a61f0bd337383b9e0f355753802737e710c Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 23 Aug 2024 10:38:16 +0300 Subject: [PATCH] Remove the need to wait for target block header in warp sync implementation (#5431) I'm not sure if this is exactly what https://github.com/paritytech/polkadot-sdk/issues/3537 meant, but I think it should be fine to wait for relay chain before initializing parachain node fully, which removed the need for background task and extra hacks throughout the stack just to know where warp sync should start. Previously there were both `WarpSyncParams` and `WarpSyncConfig`, but there was no longer any point in having two data structures, so I simplified it to just `WarpSyncConfig`. Fixes https://github.com/paritytech/polkadot-sdk/issues/3537 --- cumulus/client/service/src/lib.rs | 91 +++++++----------- polkadot/node/service/src/lib.rs | 4 +- prdoc/pr_5431.prdoc | 20 ++++ substrate/bin/node/cli/src/service.rs | 4 +- substrate/client/informant/src/display.rs | 10 +- substrate/client/network/sync/src/engine.rs | 55 +---------- substrate/client/network/sync/src/lib.rs | 2 +- substrate/client/network/sync/src/strategy.rs | 95 +------------------ .../client/network/sync/src/strategy/warp.rs | 86 +++++------------ substrate/client/network/test/src/lib.rs | 18 ++-- substrate/client/network/test/src/sync.rs | 2 +- substrate/client/service/src/builder.rs | 16 ++-- substrate/client/service/src/lib.rs | 2 +- templates/minimal/node/src/service.rs | 2 +- templates/solochain/node/src/service.rs | 4 +- 15 files changed, 107 insertions(+), 304 deletions(-) create mode 100644 prdoc/pr_5431.prdoc diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index 9b5f0bec5387..7f656aabca7a 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -28,10 +28,7 @@ use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use cumulus_relay_chain_minimal_node::{ build_minimal_relay_chain_node_light_client, build_minimal_relay_chain_node_with_rpc, }; -use futures::{ - channel::{mpsc, oneshot}, - FutureExt, StreamExt, -}; +use futures::{channel::mpsc, StreamExt}; use polkadot_primitives::{CollatorPair, OccupiedCoreAssumption}; use sc_client_api::{ Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider, @@ -43,7 +40,7 @@ use sc_consensus::{ use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend}; use sc_network_sync::SyncingService; use sc_network_transactions::TransactionsHandlerController; -use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncParams}; +use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncConfig}; use sc_telemetry::{log, TelemetryWorkerHandle}; use sc_utils::mpsc::TracingUnboundedSender; use sp_api::ProvideRuntimeApi; @@ -467,12 +464,19 @@ where { let warp_sync_params = match parachain_config.network.sync_mode { SyncMode::Warp => { - let target_block = warp_sync_get::( - para_id, - relay_chain_interface.clone(), - spawn_handle.clone(), - ); - Some(WarpSyncParams::WaitForTarget(target_block)) + log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block..."); + + let target_block = + wait_for_finalized_para_head::(para_id, relay_chain_interface.clone()) + .await + .inspect_err(|e| { + log::error!( + target: LOG_TARGET_SYNC, + "Unable to determine parachain target block {:?}", + e + ); + })?; + Some(WarpSyncConfig::WithTarget(target_block)) }, _ => None, }; @@ -500,67 +504,37 @@ where spawn_handle, import_queue, block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)), - warp_sync_params, + warp_sync_config: warp_sync_params, block_relay: None, metrics, }) } -/// Creates a new background task to wait for the relay chain to sync up and retrieve the parachain -/// header -fn warp_sync_get( - para_id: ParaId, - relay_chain_interface: RCInterface, - spawner: SpawnTaskHandle, -) -> oneshot::Receiver<::Header> -where - B: BlockT + 'static, - RCInterface: RelayChainInterface + 'static, -{ - let (sender, receiver) = oneshot::channel::(); - spawner.spawn( - "cumulus-parachain-wait-for-target-block", - None, - async move { - log::debug!( - target: LOG_TARGET_SYNC, - "waiting for announce block in a background task...", - ); - - let _ = wait_for_finalized_para_head::(sender, para_id, relay_chain_interface) - .await - .map_err(|e| { - log::error!( - target: LOG_TARGET_SYNC, - "Unable to determine parachain target block {:?}", - e - ) - }); - } - .boxed(), - ); - - receiver -} - /// Waits for the relay chain to have finished syncing and then gets the parachain header that /// corresponds to the last finalized relay chain block. async fn wait_for_finalized_para_head( - sender: oneshot::Sender<::Header>, para_id: ParaId, relay_chain_interface: RCInterface, -) -> Result<(), Box> +) -> sc_service::error::Result<::Header> where B: BlockT + 'static, RCInterface: RelayChainInterface + Send + 'static, { - let mut imported_blocks = relay_chain_interface.import_notification_stream().await?.fuse(); - while imported_blocks.next().await.is_some() { - let is_syncing = relay_chain_interface.is_major_syncing().await.map_err(|e| { - Box::::from(format!( - "Unable to determine sync status. {e}" + let mut imported_blocks = relay_chain_interface + .import_notification_stream() + .await + .map_err(|error| { + sc_service::Error::Other(format!( + "Relay chain import notification stream error when waiting for parachain head: \ + {error}" )) - })?; + })? + .fuse(); + while imported_blocks.next().await.is_some() { + let is_syncing = relay_chain_interface + .is_major_syncing() + .await + .map_err(|e| format!("Unable to determine sync status: {e}"))?; if !is_syncing { let relay_chain_best_hash = relay_chain_interface @@ -586,8 +560,7 @@ where finalized_header.number(), finalized_header.hash() ); - let _ = sender.send(finalized_header); - return Ok(()) + return Ok(finalized_header) } } diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 5ca566d2c962..e1f42e1ca86a 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -764,7 +764,7 @@ pub fn new_full< ) -> Result { use polkadot_availability_recovery::FETCH_CHUNKS_THRESHOLD; use polkadot_node_network_protocol::request_response::IncomingRequest; - use sc_network_sync::WarpSyncParams; + use sc_network_sync::WarpSyncConfig; let is_offchain_indexing_enabled = config.offchain_worker.indexing_enabled; let role = config.role.clone(); @@ -1037,7 +1037,7 @@ pub fn new_full< spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), + warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), block_relay: None, metrics, })?; diff --git a/prdoc/pr_5431.prdoc b/prdoc/pr_5431.prdoc new file mode 100644 index 000000000000..9f6db7136a58 --- /dev/null +++ b/prdoc/pr_5431.prdoc @@ -0,0 +1,20 @@ +title: Remove the need to wait for target block header in warp sync implementation + +doc: + - audience: Node Dev + description: | + Previously warp sync needed to wait for target block header of the relay chain to become available before warp + sync can start, which resulted in cumbersome APIs. Parachain initialization was refactored to initialize warp sync + with target block header from the very beginning, improving and simplifying sync API. + +crates: + - name: sc-service + bump: major + - name: sc-network-sync + bump: major + - name: polkadot-service + bump: major + - name: cumulus-client-service + bump: major + - name: sc-informant + bump: major diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index d58ca888d419..d45713db5222 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -37,7 +37,7 @@ use sc_consensus_babe::{self, SlotProportion}; use sc_network::{ event::Event, service::traits::NetworkService, NetworkBackend, NetworkEventStream, }; -use sc_network_sync::{strategy::warp::WarpSyncParams, SyncingService}; +use sc_network_sync::{strategy::warp::WarpSyncConfig, SyncingService}; use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager}; use sc_statement_store::Store as StatementStore; use sc_telemetry::{Telemetry, TelemetryWorker}; @@ -517,7 +517,7 @@ pub fn new_full_base::Hash>>( spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), + warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), block_relay: None, metrics, })?; diff --git a/substrate/client/informant/src/display.rs b/substrate/client/informant/src/display.rs index 655bf21c7115..f5e042103ea7 100644 --- a/substrate/client/informant/src/display.rs +++ b/substrate/client/informant/src/display.rs @@ -101,17 +101,9 @@ impl InformantDisplay { _, Some(WarpSyncProgress { phase: WarpSyncPhase::DownloadingBlocks(n), .. }), ) if !sync_status.is_major_syncing() => ("⏩", "Block history".into(), format!(", #{}", n)), - ( - _, - _, - Some(WarpSyncProgress { phase: WarpSyncPhase::AwaitingTargetBlock, .. }), - ) => ("⏩", "Waiting for pending target block".into(), "".into()), // Handle all phases besides the two phases we already handle above. (_, _, Some(warp)) - if !matches!( - warp.phase, - WarpSyncPhase::AwaitingTargetBlock | WarpSyncPhase::DownloadingBlocks(_) - ) => + if !matches!(warp.phase, WarpSyncPhase::DownloadingBlocks(_)) => ( "⏩", "Warping".into(), diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index ee7576c22f16..a25db4f789dd 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -32,7 +32,7 @@ use crate::{ syncing_service::{SyncingService, ToServiceCommand}, }, strategy::{ - warp::{EncodedProof, WarpProofRequest, WarpSyncParams}, + warp::{EncodedProof, WarpProofRequest, WarpSyncConfig}, StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy, }, types::{ @@ -42,11 +42,7 @@ use crate::{ }; use codec::{Decode, DecodeAll, Encode}; -use futures::{ - channel::oneshot, - future::{BoxFuture, Fuse}, - FutureExt, StreamExt, -}; +use futures::{channel::oneshot, FutureExt, StreamExt}; use libp2p::request_response::OutboundFailure; use log::{debug, error, trace, warn}; use prometheus_endpoint::{ @@ -257,10 +253,6 @@ pub struct SyncingEngine { /// The `PeerId`'s of all boot nodes. boot_node_ids: HashSet, - /// A channel to get target block header if we skip over proofs downloading during warp sync. - warp_sync_target_block_header_rx_fused: - Fuse>>, - /// Protocol name used for block announcements block_announce_protocol_name: ProtocolName, @@ -309,7 +301,7 @@ where protocol_id: ProtocolId, fork_id: &Option, block_announce_validator: Box + Send>, - warp_sync_params: Option>, + warp_sync_config: Option>, network_service: service::network::NetworkServiceHandle, import_queue: Box>, block_downloader: Arc>, @@ -404,19 +396,6 @@ where Arc::clone(&peer_store_handle), ); - // Split warp sync params into warp sync config and a channel to retrieve target block - // header. - let (warp_sync_config, warp_sync_target_block_header_rx) = - warp_sync_params.map_or((None, None), |params| { - let (config, target_block_rx) = params.split(); - (Some(config), target_block_rx) - }); - - // Make sure polling of the target block channel is a no-op if there is no block to - // retrieve. - let warp_sync_target_block_header_rx_fused = warp_sync_target_block_header_rx - .map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse()); - // Initialize syncing strategy. let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; @@ -460,7 +439,6 @@ where genesis_hash, important_peers, default_peers_set_no_slot_connected_peers: HashSet::new(), - warp_sync_target_block_header_rx_fused, boot_node_ids, default_peers_set_no_slot_peers, default_peers_set_num_full, @@ -635,17 +613,6 @@ where Some(event) => self.process_notification_event(event), None => return, }, - // TODO: setting of warp sync target block should be moved to the initialization of - // `SyncingEngine`, see https://github.com/paritytech/polkadot-sdk/issues/3537. - warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => { - if let Err(_) = self.pass_warp_sync_target_block_header(warp_target_block_header) { - error!( - target: LOG_TARGET, - "Failed to set warp sync target block header, terminating `SyncingEngine`.", - ); - return - } - }, response_event = self.pending_responses.select_next_some() => self.process_response_event(response_event), validation_result = self.block_announce_validator.select_next_some() => @@ -898,22 +865,6 @@ where } } - fn pass_warp_sync_target_block_header( - &mut self, - header: Result, - ) -> Result<(), ()> { - match header { - Ok(header) => self.strategy.set_warp_sync_target_block_header(header), - Err(err) => { - error!( - target: LOG_TARGET, - "Failed to get target block for warp sync. Error: {err:?}", - ); - Err(()) - }, - } - } - /// Called by peer when it is disconnecting. /// /// Returns a result if the handshake of this peer was indeed accepted. diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 9f6c0f45d089..ca7280edba5f 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -19,7 +19,7 @@ //! Blockchain syncing implementation in Substrate. pub use service::syncing_service::SyncingService; -pub use strategy::warp::{WarpSyncParams, WarpSyncPhase, WarpSyncProgress}; +pub use strategy::warp::{WarpSyncConfig, WarpSyncPhase, WarpSyncProgress}; pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider}; mod block_announce_validator; diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index 58befe94e84a..2c9799a9d836 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -30,7 +30,7 @@ use crate::{ LOG_TARGET, }; use chain_sync::{ChainSync, ChainSyncAction, ChainSyncMode}; -use log::{debug, error, info, warn}; +use log::{debug, error, info}; use prometheus_endpoint::Registry; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; @@ -462,39 +462,6 @@ where } } - /// Let `WarpSync` know about target block header - pub fn set_warp_sync_target_block_header( - &mut self, - target_header: B::Header, - ) -> Result<(), ()> { - match self.config.mode { - SyncMode::Warp => match self.warp { - Some(ref mut warp) => { - warp.set_target_block(target_header); - Ok(()) - }, - None => { - // As mode is set to warp sync, but no warp sync strategy is active, this means - // that warp sync has already finished / was skipped. - warn!( - target: LOG_TARGET, - "Discarding warp sync target, as warp sync was seemingly skipped due \ - to node being (partially) synced.", - ); - Ok(()) - }, - }, - _ => { - error!( - target: LOG_TARGET, - "Cannot set warp sync target block: not in warp sync mode." - ); - debug_assert!(false); - Err(()) - }, - } - } - /// Get actions that should be performed by the owner on the strategy's behalf #[must_use] pub fn actions(&mut self) -> Result>, ClientError> { @@ -600,63 +567,3 @@ where } } } - -#[cfg(test)] -mod test { - use super::*; - use futures::executor::block_on; - use sc_block_builder::BlockBuilderBuilder; - use substrate_test_runtime_client::{ - ClientBlockImportExt, ClientExt, DefaultTestClientBuilderExt, TestClientBuilder, - TestClientBuilderExt, - }; - - /// Regression test for crash when starting already synced parachain node with `--sync=warp`. - /// We must remove this after setting of warp sync target block is moved to initialization of - /// `SyncingEngine` (issue https://github.com/paritytech/polkadot-sdk/issues/3537). - #[test] - fn set_target_block_finished_warp_sync() { - // Populate database with finalized state. - let client = Arc::new(TestClientBuilder::new().build()); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().best_hash) - .with_parent_block_number(client.chain_info().best_number) - .build() - .unwrap() - .build() - .unwrap() - .block; - block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let just = (*b"TEST", Vec::new()); - client.finalize_block(block.hash(), Some(just)).unwrap(); - let target_block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().best_hash) - .with_parent_block_number(client.chain_info().best_number) - .build() - .unwrap() - .build() - .unwrap() - .block; - - // Initialize syncing strategy. - let config = SyncingConfig { - mode: SyncMode::Warp, - max_parallel_downloads: 3, - max_blocks_per_request: 64, - metrics_registry: None, - }; - let mut strategy = - SyncingStrategy::new(config, client, Some(WarpSyncConfig::WaitForTarget)).unwrap(); - - // Warp sync instantly finishes as we have finalized state in DB. - let actions = strategy.actions().unwrap(); - assert_eq!(actions.len(), 1); - assert!(matches!(actions[0], SyncingAction::Finished)); - assert!(strategy.warp.is_none()); - - // Try setting the target block. We mustn't crash. - strategy - .set_warp_sync_target_block_header(target_block.header().clone()) - .unwrap(); - } -} diff --git a/substrate/client/network/sync/src/strategy/warp.rs b/substrate/client/network/sync/src/strategy/warp.rs index 00855578695d..4cc3e9f3a68b 100644 --- a/substrate/client/network/sync/src/strategy/warp.rs +++ b/substrate/client/network/sync/src/strategy/warp.rs @@ -26,7 +26,6 @@ use crate::{ LOG_TARGET, }; use codec::{Decode, Encode}; -use futures::channel::oneshot; use log::{debug, error, trace}; use sc_network_common::sync::message::{ BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock, @@ -104,8 +103,6 @@ mod rep { pub enum WarpSyncPhase { /// Waiting for peers to connect. AwaitingPeers { required_peers: usize }, - /// Waiting for target block to be received. - AwaitingTargetBlock, /// Downloading and verifying grandpa warp proofs. DownloadingWarpProofs, /// Downloading target block. @@ -125,7 +122,6 @@ impl fmt::Display for WarpSyncPhase { match self { Self::AwaitingPeers { required_peers } => write!(f, "Waiting for {required_peers} peers to be connected"), - Self::AwaitingTargetBlock => write!(f, "Waiting for target block to be received"), Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"), Self::DownloadingTargetBlock => write!(f, "Downloading target block"), Self::DownloadingState => write!(f, "Downloading state"), @@ -145,37 +141,14 @@ pub struct WarpSyncProgress { pub total_bytes: u64, } -/// The different types of warp syncing, passed to `build_network`. -pub enum WarpSyncParams { - /// Standard warp sync for the chain. - WithProvider(Arc>), - /// Skip downloading proofs and wait for a header of the state that should be downloaded. - /// - /// It is expected that the header provider ensures that the header is trusted. - WaitForTarget(oneshot::Receiver<::Header>), -} - /// Warp sync configuration as accepted by [`WarpSync`]. pub enum WarpSyncConfig { /// Standard warp sync for the chain. WithProvider(Arc>), - /// Skip downloading proofs and wait for a header of the state that should be downloaded. + /// Skip downloading proofs and use provided header of the state that should be downloaded. /// /// It is expected that the header provider ensures that the header is trusted. - WaitForTarget, -} - -impl WarpSyncParams { - /// Split `WarpSyncParams` into `WarpSyncConfig` and warp sync target block header receiver. - pub fn split( - self, - ) -> (WarpSyncConfig, Option::Header>>) { - match self { - WarpSyncParams::WithProvider(provider) => - (WarpSyncConfig::WithProvider(provider), None), - WarpSyncParams::WaitForTarget(rx) => (WarpSyncConfig::WaitForTarget, Some(rx)), - } - } + WithTarget(::Header), } /// Warp sync phase used by warp sync state machine. @@ -189,9 +162,6 @@ enum Phase { last_hash: B::Hash, warp_sync_provider: Arc>, }, - /// Waiting for target block to be set externally if we skip warp proofs downloading, - /// and start straight from the target block (used by parachains warp sync). - PendingTargetBlock, /// Downloading target block. TargetBlock(B::Header), /// Warp sync is complete. @@ -274,7 +244,7 @@ where let phase = match warp_sync_config { WarpSyncConfig::WithProvider(warp_sync_provider) => Phase::WaitingForPeers { warp_sync_provider }, - WarpSyncConfig::WaitForTarget => Phase::PendingTargetBlock, + WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header), }; Self { @@ -289,20 +259,6 @@ where } } - /// Set target block externally in case we skip warp proof downloading. - pub fn set_target_block(&mut self, header: B::Header) { - let Phase::PendingTargetBlock = self.phase else { - error!( - target: LOG_TARGET, - "Attempt to set warp sync target block in invalid phase.", - ); - debug_assert!(false); - return - }; - - self.phase = Phase::TargetBlock(header); - } - /// Notify that a new peer has connected. pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor) { self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available }); @@ -592,10 +548,6 @@ where phase: WarpSyncPhase::DownloadingTargetBlock, total_bytes: self.total_proof_bytes, }, - Phase::PendingTargetBlock { .. } => WarpSyncProgress { - phase: WarpSyncPhase::AwaitingTargetBlock, - total_bytes: self.total_proof_bytes, - }, Phase::Complete => WarpSyncProgress { phase: WarpSyncPhase::Complete, total_bytes: self.total_proof_bytes + self.total_state_bytes, @@ -614,14 +566,12 @@ where state: match &self.phase { Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() }, Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() }, - Phase::PendingTargetBlock => SyncState::Downloading { target: Zero::zero() }, Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() }, Phase::Complete => SyncState::Idle, }, best_seen_block: match &self.phase { Phase::WaitingForPeers { .. } => None, Phase::WarpProof { .. } => None, - Phase::PendingTargetBlock => None, Phase::TargetBlock(header) => Some(*header.number()), Phase::Complete => None, }, @@ -759,7 +709,13 @@ mod test { #[test] fn warp_sync_to_target_for_db_with_finalized_state_is_noop() { let client = mock_client_with_state(); - let config = WarpSyncConfig::WaitForTarget; + let config = WarpSyncConfig::WithTarget(::Header::new( + 1, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + )); let mut warp_sync = WarpSync::new(Arc::new(client), config); // Warp sync instantly finishes @@ -785,7 +741,13 @@ mod test { #[test] fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() { let client = mock_client_without_state(); - let config = WarpSyncConfig::WaitForTarget; + let config = WarpSyncConfig::WithTarget(::Header::new( + 1, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + )); let mut warp_sync = WarpSync::new(Arc::new(client), config); // No actions are emitted. @@ -936,7 +898,13 @@ mod test { } // Manually set to another phase. - warp_sync.phase = Phase::PendingTargetBlock; + warp_sync.phase = Phase::TargetBlock(::Header::new( + 1, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + )); // No request is made. assert!(warp_sync.warp_proof_request().is_none()); @@ -1193,7 +1161,7 @@ mod test { .unwrap() .block; let target_header = target_block.header().clone(); - let config = WarpSyncConfig::WaitForTarget; + let config = WarpSyncConfig::WithTarget(target_header); let mut warp_sync = WarpSync::new(client, config); // Make sure we have enough peers to make a request. @@ -1201,10 +1169,6 @@ mod test { warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); } - // No actions generated so far. - assert_eq!(warp_sync.actions().count(), 0); - - warp_sync.set_target_block(target_header); assert!(matches!(warp_sync.phase, Phase::TargetBlock(_))); let (_peer_id, request) = warp_sync.target_block_request().unwrap(); diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 9285306948a6..b5aeb162e9fa 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -34,7 +34,7 @@ use std::{ time::Duration, }; -use futures::{channel::oneshot, future::BoxFuture, pin_mut, prelude::*}; +use futures::{future::BoxFuture, pin_mut, prelude::*}; use libp2p::PeerId; use log::trace; use parking_lot::Mutex; @@ -67,7 +67,7 @@ use sc_network_sync::{ service::{network::NetworkServiceProvider, syncing_service::SyncingService}, state_request_handler::StateRequestHandler, strategy::warp::{ - AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider, + AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncConfig, WarpSyncProvider, }, warp_request_handler, }; @@ -701,7 +701,7 @@ pub struct FullPeerConfig { /// Enable transaction indexing. pub storage_chain: bool, /// Optional target block header to sync to - pub target_block: Option<::Header>, + pub target_header: Option<::Header>, /// Force genesis even in case of warp & light state sync. pub force_genesis: bool, } @@ -865,13 +865,9 @@ pub trait TestNetFactory: Default + Sized + Send { let warp_sync = Arc::new(TestWarpSyncProvider(client.clone())); - let warp_sync_params = match config.target_block { - Some(target_block) => { - let (sender, receiver) = oneshot::channel::<::Header>(); - let _ = sender.send(target_block); - WarpSyncParams::WaitForTarget(receiver) - }, - _ => WarpSyncParams::WithProvider(warp_sync.clone()), + let warp_sync_config = match config.target_header { + Some(target_header) => WarpSyncConfig::WithTarget(target_header), + _ => WarpSyncConfig::WithProvider(warp_sync.clone()), }; let warp_protocol_config = { @@ -919,7 +915,7 @@ pub trait TestNetFactory: Default + Sized + Send { protocol_id.clone(), &fork_id, block_announce_validator, - Some(warp_sync_params), + Some(warp_sync_config), chain_sync_network_handle, import_queue.service(), block_relay_params.downloader, diff --git a/substrate/client/network/test/src/sync.rs b/substrate/client/network/test/src/sync.rs index f1c1b7414303..9edd68c2ed9f 100644 --- a/substrate/client/network/test/src/sync.rs +++ b/substrate/client/network/test/src/sync.rs @@ -1298,7 +1298,7 @@ async fn warp_sync_to_target_block() { net.add_full_peer_with_config(FullPeerConfig { sync_mode: SyncMode::Warp, - target_block: Some(target_block), + target_header: Some(target_block), ..Default::default() }); diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 90bfd9ec27f7..d9a91e715fd5 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -55,7 +55,7 @@ use sc_network_sync::{ block_relay_protocol::BlockRelayParams, block_request_handler::BlockRequestHandler, engine::SyncingEngine, service::network::NetworkServiceProvider, state_request_handler::StateRequestHandler, - warp_request_handler::RequestHandler as WarpSyncRequestHandler, SyncingService, WarpSyncParams, + warp_request_handler::RequestHandler as WarpSyncRequestHandler, SyncingService, WarpSyncConfig, }; use sc_rpc::{ author::AuthorApiServer, @@ -756,8 +756,8 @@ pub struct BuildNetworkParams< /// A block announce validator builder. pub block_announce_validator_builder: Option) -> Box + Send> + Send>>, - /// Optional warp sync params. - pub warp_sync_params: Option>, + /// Optional warp sync config. + pub warp_sync_config: Option>, /// User specified block relay params. If not specified, the default /// block request handler will be used. pub block_relay: Option>, @@ -801,12 +801,12 @@ where spawn_handle, import_queue, block_announce_validator_builder, - warp_sync_params, + warp_sync_config, block_relay, metrics, } = params; - if warp_sync_params.is_none() && config.network.sync_mode.is_warp() { + if warp_sync_config.is_none() && config.network.sync_mode.is_warp() { return Err("Warp sync enabled, but no warp sync provider configured.".into()) } @@ -869,8 +869,8 @@ where (protocol_config, config_name) }; - let (warp_sync_protocol_config, warp_request_protocol_name) = match warp_sync_params.as_ref() { - Some(WarpSyncParams::WithProvider(warp_with_provider)) => { + let (warp_sync_protocol_config, warp_request_protocol_name) = match warp_sync_config.as_ref() { + Some(WarpSyncConfig::WithProvider(warp_with_provider)) => { // Allow both outgoing and incoming requests. let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, TNet>( protocol_id.clone(), @@ -939,7 +939,7 @@ where protocol_id.clone(), &config.chain_spec.fork_id().map(ToOwned::to_owned), block_announce_validator, - warp_sync_params, + warp_sync_config, chain_sync_network_handle, import_queue.service(), block_downloader, diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 89d563001cd6..ed108a3102bc 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -82,7 +82,7 @@ pub use sc_chain_spec::{ pub use sc_consensus::ImportQueue; pub use sc_executor::NativeExecutionDispatch; -pub use sc_network_sync::WarpSyncParams; +pub use sc_network_sync::WarpSyncConfig; #[doc(hidden)] pub use sc_network_transactions::config::{TransactionImport, TransactionImportFuture}; pub use sc_rpc::{ diff --git a/templates/minimal/node/src/service.rs b/templates/minimal/node/src/service.rs index 44b374fcc0a4..decb9d6c636e 100644 --- a/templates/minimal/node/src/service.rs +++ b/templates/minimal/node/src/service.rs @@ -138,7 +138,7 @@ pub fn new_full::Ha import_queue, net_config, block_announce_validator_builder: None, - warp_sync_params: None, + warp_sync_config: None, block_relay: None, metrics, })?; diff --git a/templates/solochain/node/src/service.rs b/templates/solochain/node/src/service.rs index 5e84552f4ccd..7eef9766b107 100644 --- a/templates/solochain/node/src/service.rs +++ b/templates/solochain/node/src/service.rs @@ -4,7 +4,7 @@ use futures::FutureExt; use sc_client_api::{Backend, BlockBackend}; use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams}; use sc_consensus_grandpa::SharedVoterState; -use sc_service::{error::Error as ServiceError, Configuration, TaskManager, WarpSyncParams}; +use sc_service::{error::Error as ServiceError, Configuration, TaskManager, WarpSyncConfig}; use sc_telemetry::{Telemetry, TelemetryWorker}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; use solochain_template_runtime::{self, apis::RuntimeApi, opaque::Block}; @@ -175,7 +175,7 @@ pub fn new_full< spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), + warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), block_relay: None, metrics, })?;