From c16b8b7edbba0934bc4b361d970288c8acfa062e Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Tue, 20 Sep 2022 09:30:58 +0400 Subject: [PATCH] feat(core/sync): adds `connecting` sync status --- .../tari_app_grpc/proto/base_node.proto | 2 +- .../src/conversions/base_node_state.rs | 6 ++--- .../src/grpc/base_node_grpc_server.rs | 2 +- .../states/block_sync.rs | 17 +++++++------ .../states/events_and_states.rs | 24 ++++++++++++------- .../states/header_sync.rs | 4 ++-- .../states/horizon_state_sync.rs | 8 +++---- .../base_node/sync/block_sync/synchronizer.rs | 9 ++++++- .../sync/header_sync/synchronizer.rs | 8 +++++-- base_layer/core/src/base_node/sync/hooks.rs | 8 +++---- .../sync/horizon_state_sync/synchronizer.rs | 5 ++-- 11 files changed, 57 insertions(+), 36 deletions(-) diff --git a/applications/tari_app_grpc/proto/base_node.proto b/applications/tari_app_grpc/proto/base_node.proto index 0df29e5b98..1499cfc73b 100644 --- a/applications/tari_app_grpc/proto/base_node.proto +++ b/applications/tari_app_grpc/proto/base_node.proto @@ -156,7 +156,7 @@ enum BaseNodeState{ START_UP = 0; HEADER_SYNC = 1; HORIZON_SYNC = 2; - BLOCK_SYNC_STARTING= 3; + CONNECTING = 3; BLOCK_SYNC = 4; LISTENING = 5; } diff --git a/applications/tari_app_grpc/src/conversions/base_node_state.rs b/applications/tari_app_grpc/src/conversions/base_node_state.rs index 81c6ed1199..68f29ba3de 100644 --- a/applications/tari_app_grpc/src/conversions/base_node_state.rs +++ b/applications/tari_app_grpc/src/conversions/base_node_state.rs @@ -22,7 +22,7 @@ use tari_core::base_node::state_machine_service::states::{ StateInfo, - StateInfo::{BlockSync, BlockSyncStarting, HeaderSync, HorizonSync, Listening, StartUp}, + StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp}, }; use crate::tari_rpc as grpc; @@ -33,7 +33,7 @@ impl From for grpc::BaseNodeState { StartUp => grpc::BaseNodeState::HeaderSync, HeaderSync(_) => grpc::BaseNodeState::HeaderSync, HorizonSync(_) => grpc::BaseNodeState::HorizonSync, - BlockSyncStarting => grpc::BaseNodeState::BlockSyncStarting, + Connecting(_) => grpc::BaseNodeState::Connecting, BlockSync(_) => grpc::BaseNodeState::BlockSync, Listening(_) => grpc::BaseNodeState::Listening, } @@ -46,7 +46,7 @@ impl From<&StateInfo> for grpc::BaseNodeState { StartUp => grpc::BaseNodeState::HeaderSync, HeaderSync(_) => grpc::BaseNodeState::HeaderSync, HorizonSync(_) => grpc::BaseNodeState::HorizonSync, - BlockSyncStarting => grpc::BaseNodeState::BlockSyncStarting, + Connecting(_) => grpc::BaseNodeState::Connecting, BlockSync(_) => grpc::BaseNodeState::BlockSync, Listening(_) => grpc::BaseNodeState::Listening, } diff --git a/applications/tari_base_node/src/grpc/base_node_grpc_server.rs b/applications/tari_base_node/src/grpc/base_node_grpc_server.rs index 3fbe992ea6..a5f03e51b7 100644 --- a/applications/tari_base_node/src/grpc/base_node_grpc_server.rs +++ b/applications/tari_base_node/src/grpc/base_node_grpc_server.rs @@ -1394,7 +1394,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { local_height: info.local_height, state: tari_rpc::SyncState::Header.into(), }, - StateInfo::BlockSyncStarting => tari_rpc::SyncProgressResponse { + StateInfo::Connecting(_) => tari_rpc::SyncProgressResponse { tip_height: 0, local_height: 0, state: tari_rpc::SyncState::BlockStarting.into(), diff --git a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs index 00a2e791d2..4d276796ea 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs @@ -23,7 +23,6 @@ use std::{mem, time::Instant}; use log::*; -use randomx_rs::RandomXFlag; use crate::{ base_node::{ @@ -59,16 +58,20 @@ impl BlockSync { let status_event_sender = shared.status_event_sender.clone(); let bootstrapped = shared.is_bootstrapped(); - let _result = status_event_sender.send(StatusInfo { - bootstrapped, - state_info: StateInfo::BlockSyncStarting, - randomx_vm_cnt: 0, - randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, - }); let local_nci = shared.local_node_interface.clone(); let randomx_vm_cnt = shared.get_randomx_vm_cnt(); let randomx_vm_flags = shared.get_randomx_vm_flags(); let tip_height_metric = metrics::tip_height(); + synchronizer.on_starting(move |sync_peer| { + let _result = status_event_sender.send(StatusInfo { + bootstrapped, + state_info: StateInfo::Connecting(sync_peer.clone()), + randomx_vm_cnt, + randomx_vm_flags, + }); + }); + + let status_event_sender = shared.status_event_sender.clone(); synchronizer.on_progress(move |block, remote_tip_height, sync_peer| { let local_height = block.height(); local_nci.publish_block_event(BlockEvent::ValidBlockAdded( diff --git a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs index e5858967d6..7201e170c5 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs @@ -166,25 +166,32 @@ impl Display for BaseNodeState { #[derive(Debug, Clone, PartialEq)] pub enum StateInfo { StartUp, + Connecting(SyncPeer), HeaderSync(Option), HorizonSync(HorizonSyncInfo), - BlockSyncStarting, BlockSync(BlockSyncInfo), Listening(ListeningInfo), } impl StateInfo { pub fn short_desc(&self) -> String { - use StateInfo::{BlockSync, BlockSyncStarting, HeaderSync, HorizonSync, Listening, StartUp}; + use StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp}; match self { StartUp => "Starting up".to_string(), + Connecting(sync_peer) => format!( + "Connecting to {}{}", + sync_peer.node_id().short_str(), + sync_peer + .latency() + .map(|l| format!(", Latency: {:.2?}", l)) + .unwrap_or_else(|| "".to_string()) + ), HeaderSync(None) => "Starting header sync".to_string(), HeaderSync(Some(info)) => format!("Syncing headers: {}", info.sync_progress_string()), HorizonSync(info) => info.to_progress_string(), BlockSync(info) => format!("Syncing blocks: {}", info.sync_progress_string()), Listening(_) => "Listening".to_string(), - BlockSyncStarting => "Starting block sync".to_string(), } } @@ -196,9 +203,9 @@ impl StateInfo { } pub fn is_synced(&self) -> bool { - use StateInfo::{BlockSync, BlockSyncStarting, HeaderSync, HorizonSync, Listening, StartUp}; + use StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp}; match self { - StartUp | HeaderSync(_) | HorizonSync(_) | BlockSync(_) | BlockSyncStarting => false, + StartUp | Connecting(_) | HeaderSync(_) | HorizonSync(_) | BlockSync(_) => false, Listening(info) => info.is_synced(), } } @@ -206,15 +213,16 @@ impl StateInfo { impl Display for StateInfo { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { - use StateInfo::{BlockSync, BlockSyncStarting, HeaderSync, HorizonSync, Listening, StartUp}; + #[allow(clippy::enum_glob_use)] + use StateInfo::*; match self { StartUp => write!(f, "Node starting up"), + Connecting(sync_peer) => write!(f, "Connecting to {}", sync_peer), HeaderSync(Some(info)) => write!(f, "Synchronizing block headers: {}", info), HeaderSync(None) => write!(f, "Synchronizing block headers: Starting"), HorizonSync(info) => write!(f, "Synchronizing horizon state: {}", info), BlockSync(info) => write!(f, "Synchronizing blocks: {}", info), Listening(info) => write!(f, "Listening: {}", info), - BlockSyncStarting => write!(f, "Synchronizing blocks: Starting"), } } } @@ -275,7 +283,7 @@ impl BlockSyncInfo { self.sync_peer.node_id().short_str(), self.local_height, self.tip_height, - (self.local_height as f64 / self.tip_height as f64 * 100.0), + (self.local_height as f64 / self.tip_height as f64 * 100.0).floor(), self.sync_peer .items_per_second() .map(|bps| format!(" {:.2?} blks/s", bps)) diff --git a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs index cfe29d4033..243af2e30e 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs @@ -88,10 +88,10 @@ impl HeaderSyncState { let bootstrapped = shared.is_bootstrapped(); let randomx_vm_cnt = shared.get_randomx_vm_cnt(); let randomx_vm_flags = shared.get_randomx_vm_flags(); - synchronizer.on_starting(move || { + synchronizer.on_starting(move |sync_peer| { let _result = status_event_sender.send(StatusInfo { bootstrapped, - state_info: StateInfo::HeaderSync(None), + state_info: StateInfo::Connecting(sync_peer.clone()), randomx_vm_cnt, randomx_vm_flags, }); diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs index 258523cab1..5fdf4a2c07 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync.rs @@ -31,7 +31,7 @@ use super::{StateEvent, StateInfo}; use crate::{ base_node::{ state_machine_service::states::StatusInfo, - sync::{HorizonStateSynchronization, HorizonSyncInfo, HorizonSyncStatus, SyncPeer}, + sync::{HorizonStateSynchronization, SyncPeer}, BaseNodeStateMachine, }, chain_storage::BlockchainBackend, @@ -101,12 +101,10 @@ impl HorizonStateSync { let bootstrapped = shared.is_bootstrapped(); let randomx_vm_cnt = shared.get_randomx_vm_cnt(); let randomx_vm_flags = shared.get_randomx_vm_flags(); - let sync_peers_node_id = sync_peers.iter().map(|p| p.node_id()).cloned().collect(); - horizon_sync.on_starting(move || { - let info = HorizonSyncInfo::new(sync_peers_node_id, HorizonSyncStatus::Starting); + horizon_sync.on_starting(move |sync_peer| { let _result = status_event_sender.send(StatusInfo { bootstrapped, - state_info: StateInfo::HorizonSync(info), + state_info: StateInfo::Connecting(sync_peer.clone()), randomx_vm_cnt, randomx_vm_flags, }); diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index 8a5542b893..319fd172b7 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -81,6 +81,11 @@ impl BlockSynchronizer { } } + pub fn on_starting(&mut self, hook: H) + where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static { + self.hooks.add_on_starting_hook(hook); + } + pub fn on_progress(&mut self, hook: H) where H: Fn(Arc, u64, &SyncPeer) + Send + Sync + 'static { self.hooks.add_on_progress_block_hook(hook); @@ -123,6 +128,8 @@ impl BlockSynchronizer { async fn attempt_block_sync(&mut self, max_latency: Duration) -> Result<(), BlockSyncError> { let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::>(); for (i, node_id) in sync_peer_node_ids.iter().enumerate() { + let sync_peer = &self.sync_peers[i]; + self.hooks.call_on_starting_hook(sync_peer); let mut conn = self.connect_to_sync_peer(node_id.clone()).await?; let config = RpcClient::builder() .with_deadline(self.config.rpc_deadline) @@ -199,10 +206,10 @@ impl BlockSynchronizer { max_latency: Duration, ) -> Result<(), BlockSyncError> { info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer); - self.hooks.call_on_starting_hook(); let tip_header = self.db.fetch_last_header().await?; let local_metadata = self.db.get_chain_metadata().await?; + if tip_header.height <= local_metadata.height_of_longest_chain() { debug!( target: LOG_TARGET, diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index 889a3568c0..49a6208c80 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -88,7 +88,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } pub fn on_starting(&mut self, hook: H) - where for<'r> H: FnOnce() + Send + Sync + 'static { + where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static { self.hooks.add_on_starting_hook(hook); } @@ -104,7 +104,6 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { pub async fn synchronize(&mut self) -> Result { debug!(target: LOG_TARGET, "Starting header sync.",); - self.hooks.call_on_starting_hook(); info!( target: LOG_TARGET, @@ -127,9 +126,14 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } } + #[allow(clippy::too_many_lines)] pub async fn try_sync_from_all_peers(&mut self, max_latency: Duration) -> Result { let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::>(); for (i, node_id) in sync_peer_node_ids.iter().enumerate() { + { + let sync_peer = &self.sync_peers[i]; + self.hooks.call_on_starting_hook(sync_peer); + } let mut conn = self.dial_sync_peer(node_id).await?; debug!( target: LOG_TARGET, diff --git a/base_layer/core/src/base_node/sync/hooks.rs b/base_layer/core/src/base_node/sync/hooks.rs index db2fe468a5..9ddd2cd7ed 100644 --- a/base_layer/core/src/base_node/sync/hooks.rs +++ b/base_layer/core/src/base_node/sync/hooks.rs @@ -31,7 +31,7 @@ use crate::{ #[derive(Default)] pub(super) struct Hooks { - on_starting: Vec>, + on_starting: Vec>, on_progress_header: Vec>, on_progress_block: Vec, u64, &SyncPeer) + Send + Sync>>, on_progress_horizon_sync: Vec>, @@ -41,12 +41,12 @@ pub(super) struct Hooks { impl Hooks { pub fn add_on_starting_hook(&mut self, hook: H) - where H: FnOnce() + Send + Sync + 'static { + where H: FnOnce(&SyncPeer) + Send + Sync + 'static { self.on_starting.push(Box::new(hook)); } - pub fn call_on_starting_hook(&mut self) { - self.on_starting.drain(..).for_each(|f| (f)()); + pub fn call_on_starting_hook(&mut self, sync_peer: &SyncPeer) { + self.on_starting.drain(..).for_each(|f| (f)(sync_peer)); } pub fn add_on_progress_header_hook(&mut self, hook: H) diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs index 3e01e96910..697d3bfd92 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs @@ -128,7 +128,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { } pub fn on_starting(&mut self, hook: H) - where for<'r> H: FnOnce() + Send + Sync + 'static { + where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static { self.hooks.add_on_starting_hook(hook); } @@ -181,6 +181,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { async fn sync(&mut self, header: &BlockHeader) -> Result<(), HorizonSyncError> { for (i, sync_peer) in self.sync_peers.iter().enumerate() { + self.hooks.call_on_starting_hook(sync_peer); let mut connection = self.connectivity.dial_peer(sync_peer.node_id().clone()).await?; let config = RpcClient::builder() .with_deadline(self.config.rpc_deadline) @@ -220,7 +221,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { ) -> Result<(), HorizonSyncError> { debug!(target: LOG_TARGET, "Initializing"); self.initialize().await?; - self.hooks.call_on_starting_hook(); + debug!(target: LOG_TARGET, "Synchronizing kernels"); self.synchronize_kernels(sync_peer.clone(), client, to_header).await?; debug!(target: LOG_TARGET, "Synchronizing outputs");