diff --git a/Cargo.lock b/Cargo.lock index f5ab9f47d9d..bf488efc524 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3573,6 +3573,7 @@ name = "near-client-primitives" version = "0.0.0" dependencies = [ "actix", + "ansi_term", "chrono", "near-chain-configs", "near-chain-primitives", diff --git a/chain/client-primitives/Cargo.toml b/chain/client-primitives/Cargo.toml index c40910d0a98..fc7a31456ae 100644 --- a/chain/client-primitives/Cargo.toml +++ b/chain/client-primitives/Cargo.toml @@ -11,6 +11,7 @@ publish = true [dependencies] actix.workspace = true +ansi_term.workspace = true chrono.workspace = true once_cell.workspace = true serde.workspace = true diff --git a/chain/client-primitives/src/types.rs b/chain/client-primitives/src/types.rs index 37a2a1f8626..0f78bdeff35 100644 --- a/chain/client-primitives/src/types.rs +++ b/chain/client-primitives/src/types.rs @@ -1,4 +1,6 @@ use actix::Message; +use ansi_term::Color::{Purple, Yellow}; +use ansi_term::Style; use chrono::DateTime; use chrono::Utc; use near_chain_configs::{ClientConfig, ProtocolConfigView}; @@ -210,6 +212,96 @@ impl ShardSyncDownload { } } +pub fn format_shard_sync_phase_per_shard( + new_shard_sync: &HashMap, + use_colour: bool, +) -> Vec<(ShardId, String)> { + new_shard_sync + .iter() + .map(|(&shard_id, shard_progress)| { + (shard_id, format_shard_sync_phase(shard_progress, use_colour)) + }) + .collect::>() +} + +/// Applies style if `use_colour` is enabled. +fn paint(s: &str, style: Style, use_style: bool) -> String { + if use_style { + style.paint(s).to_string() + } else { + s.to_string() + } +} + +/// Formats the given ShardSyncDownload for logging. +pub fn format_shard_sync_phase( + shard_sync_download: &ShardSyncDownload, + use_colour: bool, +) -> String { + match &shard_sync_download.status { + ShardSyncStatus::StateDownloadHeader => format!( + "{} requests sent {}, last target {:?}", + paint("HEADER", Purple.bold(), use_colour), + shard_sync_download.downloads.get(0).map_or(0, |x| x.state_requests_count), + shard_sync_download.downloads.get(0).map_or(None, |x| x.last_target.as_ref()), + ), + ShardSyncStatus::StateDownloadParts => { + let mut num_parts_done = 0; + let mut num_parts_not_done = 0; + let mut text = "".to_string(); + for (i, download) in shard_sync_download.downloads.iter().enumerate() { + if download.done { + num_parts_done += 1; + continue; + } + num_parts_not_done += 1; + text.push_str(&format!( + "[{}: {}, {}, {:?}] ", + paint(&i.to_string(), Yellow.bold(), use_colour), + download.done, + download.state_requests_count, + download.last_target + )); + } + format!( + "{} [{}: is_done, requests sent, last target] {} num_parts_done={} num_parts_not_done={}", + paint("PARTS", Purple.bold(), use_colour), + paint("part_id", Yellow.bold(), use_colour), + text, + num_parts_done, + num_parts_not_done + ) + } + status => format!("{:?}", status), + } +} + +#[derive(Clone)] +pub struct StateSyncStatus { + pub sync_hash: CryptoHash, + pub sync_status: HashMap, +} + +/// If alternate flag was specified, write formatted sync_status per shard. +impl std::fmt::Debug for StateSyncStatus { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + if f.alternate() { + write!( + f, + "StateSyncStatus {{ sync_hash: {:?}, shard_sync: {:?} }}", + self.sync_hash, + format_shard_sync_phase_per_shard(&self.sync_status, false) + ) + } else { + write!( + f, + "StateSyncStatus {{ sync_hash: {:?}, sync_status: {:?} }}", + self.sync_hash, self.sync_status + ) + } + } +} + /// Various status sync can be in, whether it's fast sync or archival. #[derive(Clone, Debug, strum::AsRefStr)] pub enum SyncStatus { @@ -228,7 +320,7 @@ pub enum SyncStatus { highest_height: BlockHeight, }, /// State sync, with different states of state sync for different shards. - StateSync(CryptoHash, HashMap), + StateSync(StateSyncStatus), /// Sync state across all shards is done. StateSyncDone, /// Catch up on blocks. @@ -256,7 +348,7 @@ impl SyncStatus { SyncStatus::AwaitingPeers => 1, SyncStatus::EpochSync { epoch_ord: _ } => 2, SyncStatus::HeaderSync { start_height: _, current_height: _, highest_height: _ } => 3, - SyncStatus::StateSync(_, _) => 4, + SyncStatus::StateSync(_) => 4, SyncStatus::StateSyncDone => 5, SyncStatus::BodySync { start_height: _, current_height: _, highest_height: _ } => 6, } @@ -280,9 +372,10 @@ impl From for SyncStatusView { SyncStatus::HeaderSync { start_height, current_height, highest_height } => { SyncStatusView::HeaderSync { start_height, current_height, highest_height } } - SyncStatus::StateSync(hash, sync_status) => SyncStatusView::StateSync( - hash, - sync_status + SyncStatus::StateSync(state_sync_status) => SyncStatusView::StateSync( + state_sync_status.sync_hash, + state_sync_status + .sync_status .into_iter() .map(|(shard_id, shard_sync)| (shard_id, shard_sync.into())) .collect(), diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 29758129a59..d63a4dc7583 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -7,7 +7,7 @@ use crate::debug::PRODUCTION_TIMES_CACHE_SIZE; use crate::sync::block::BlockSync; use crate::sync::epoch::EpochSync; use crate::sync::header::HeaderSync; -use crate::sync::state::{format_shard_sync_phase, StateSync, StateSyncResult}; +use crate::sync::state::{StateSync, StateSyncResult}; use crate::{metrics, SyncStatus}; use actix_rt::ArbiterHandle; use lru::LruCache; @@ -33,7 +33,9 @@ use near_chunks::logic::{ }; use near_chunks::ShardsManager; use near_client_primitives::debug::ChunkProduction; -use near_client_primitives::types::{Error, ShardSyncDownload, ShardSyncStatus}; +use near_client_primitives::types::{ + format_shard_sync_phase_per_shard, Error, ShardSyncDownload, ShardSyncStatus, +}; use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::EpochManagerAdapter; use near_network::types::{AccountKeys, ChainInfo, PeerManagerMessageRequest, SetChainInfo}; @@ -2287,18 +2289,6 @@ impl Client { } } -fn format_shard_sync_phase_per_shard( - new_shard_sync: &HashMap, - use_colour: bool, -) -> Vec<(ShardId, String)> { - new_shard_sync - .iter() - .map(|(&shard_id, shard_progress)| { - (shard_id, format_shard_sync_phase(shard_progress, use_colour)) - }) - .collect::>() -} - /* implements functions used to communicate with network */ impl Client { pub fn request_block(&self, hash: CryptoHash, peer_id: PeerId) { diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index d5d059b49a2..c4b5403265d 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -40,8 +40,8 @@ use near_chunks::adapter::ShardsManagerRequestFromClient; use near_chunks::client::ShardsManagerResponse; use near_chunks::logic::cares_about_shard_this_or_next_epoch; use near_client_primitives::types::{ - Error, GetClientConfig, GetClientConfigError, GetNetworkInfo, NetworkInfoResponse, Status, - StatusError, StatusSyncInfo, SyncStatus, + Error, GetClientConfig, GetClientConfigError, GetNetworkInfo, NetworkInfoResponse, + StateSyncStatus, Status, StatusError, StatusSyncInfo, SyncStatus, }; use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::EpochManagerAdapter; @@ -441,7 +441,7 @@ impl Handler> for ClientActor { .store() .get_all_block_hashes_by_height(block.header().height()); if was_requested || blocks_at_height.is_err() || blocks_at_height.as_ref().unwrap().is_empty() { - if let SyncStatus::StateSync(sync_hash, _) = &mut this.client.sync_status { + if let SyncStatus::StateSync(StateSyncStatus{ sync_hash, .. }) = &mut this.client.sync_status { if let Ok(header) = this.client.chain.get_block_header(sync_hash) { if block.hash() == header.prev_hash() { if let Err(e) = this.client.chain.save_block(block.into()) { @@ -535,7 +535,7 @@ impl Handler> for ClientActor { // Get the download that matches the shard_id and hash // ... It could be that the state was requested by the state sync - if let SyncStatus::StateSync(sync_hash, shards_to_download) = + if let SyncStatus::StateSync(StateSyncStatus{ sync_hash, sync_status: shards_to_download }) = &mut this.client.sync_status { if hash == *sync_hash { @@ -1006,7 +1006,7 @@ impl ClientActor { let _span = tracing::debug_span!(target: "client", "handle_block_production").entered(); // If syncing, don't try to produce blocks. if self.client.sync_status.is_syncing() { - debug!(target:"client", sync_status=?self.client.sync_status, "Syncing - block production disabled"); + debug!(target:"client", sync_status=format!("{:#?}", self.client.sync_status), "Syncing - block production disabled"); return Ok(()); } @@ -1630,7 +1630,7 @@ impl ClientActor { // Sync state if already running sync state or if block sync is too far. let sync_state = match self.client.sync_status { - SyncStatus::StateSync(_, _) => true, + SyncStatus::StateSync(_) => true, _ if header_head.height >= highest_height .saturating_sub(self.client.config.block_header_fetch_horizon) => @@ -1647,9 +1647,10 @@ impl ClientActor { if sync_state { let (sync_hash, mut new_shard_sync, just_enter_state_sync) = match &self.client.sync_status { - SyncStatus::StateSync(sync_hash, shard_sync) => { - (*sync_hash, shard_sync.clone(), false) - } + SyncStatus::StateSync(StateSyncStatus { + sync_hash, + sync_status: shard_sync, + }) => (*sync_hash, shard_sync.clone(), false), _ => { let sync_hash = unwrap_and_report!(self.find_sync_hash()); (sync_hash, HashMap::default(), true) @@ -1697,8 +1698,10 @@ impl ClientActor { )) { StateSyncResult::Unchanged => (), StateSyncResult::Changed(fetch_block) => { - self.client.sync_status = - SyncStatus::StateSync(sync_hash, new_shard_sync); + self.client.sync_status = SyncStatus::StateSync(StateSyncStatus { + sync_hash, + sync_status: new_shard_sync, + }); if fetch_block { if let Some(peer_info) = self.network_info.highest_height_peers.choose(&mut thread_rng()) diff --git a/chain/client/src/info.rs b/chain/client/src/info.rs index 279a33e9ea2..bc67c19694b 100644 --- a/chain/client/src/info.rs +++ b/chain/client/src/info.rs @@ -3,6 +3,7 @@ use crate::{metrics, SyncStatus}; use actix::Addr; use itertools::Itertools; use near_chain_configs::{ClientConfig, LogSummaryStyle, SyncConfig}; +use near_client_primitives::types::StateSyncStatus; use near_network::types::NetworkInfo; use near_primitives::block::Tip; use near_primitives::network::PeerId; @@ -633,7 +634,7 @@ pub fn display_sync_status( current_height ) } - SyncStatus::StateSync(sync_hash, shard_statuses) => { + SyncStatus::StateSync(StateSyncStatus { sync_hash, sync_status: shard_statuses }) => { let mut res = format!("State {:?}", sync_hash); let mut shard_statuses: Vec<_> = shard_statuses.iter().collect(); shard_statuses.sort_by_key(|(shard_id, _)| *shard_id); diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index e68a3b7ea62..6a13e55d7f3 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -25,8 +25,6 @@ use crate::sync::external::{ create_bucket_readonly, external_storage_location, ExternalConnection, }; use actix_rt::ArbiterHandle; -use ansi_term::Color::{Purple, Yellow}; -use ansi_term::Style; use chrono::{DateTime, Duration, Utc}; use futures::{future, FutureExt}; use near_async::messaging::CanSendAsync; @@ -35,7 +33,8 @@ use near_chain::near_chain_primitives; use near_chain::Chain; use near_chain_configs::{ExternalStorageConfig, ExternalStorageLocation, SyncConfig}; use near_client_primitives::types::{ - DownloadStatus, ShardSyncDownload, ShardSyncStatus, StateSplitApplyingStatus, + format_shard_sync_phase, DownloadStatus, ShardSyncDownload, ShardSyncStatus, + StateSplitApplyingStatus, }; use near_epoch_manager::EpochManagerAdapter; use near_network::types::AccountOrPeerIdOrHash; @@ -1328,58 +1327,6 @@ fn process_part_response( true } -/// Applies style if `use_colour` is enabled. -fn paint(s: &str, style: Style, use_style: bool) -> String { - if use_style { - style.paint(s).to_string() - } else { - s.to_string() - } -} - -/// Formats the given ShardSyncDownload for logging. -pub(crate) fn format_shard_sync_phase( - shard_sync_download: &ShardSyncDownload, - use_colour: bool, -) -> String { - match &shard_sync_download.status { - ShardSyncStatus::StateDownloadHeader => format!( - "{} requests sent {}, last target {:?}", - paint("HEADER", Purple.bold(), use_colour), - shard_sync_download.downloads[0].state_requests_count, - shard_sync_download.downloads[0].last_target - ), - ShardSyncStatus::StateDownloadParts => { - let mut num_parts_done = 0; - let mut num_parts_not_done = 0; - let mut text = "".to_string(); - for (i, download) in shard_sync_download.downloads.iter().enumerate() { - if download.done { - num_parts_done += 1; - continue; - } - num_parts_not_done += 1; - text.push_str(&format!( - "[{}: {}, {}, {:?}] ", - paint(&i.to_string(), Yellow.bold(), use_colour), - download.done, - download.state_requests_count, - download.last_target - )); - } - format!( - "{} [{}: is_done, requests sent, last target] {} num_parts_done={} num_parts_not_done={}", - paint("PARTS", Purple.bold(), use_colour), - paint("part_id", Yellow.bold(), use_colour), - text, - num_parts_done, - num_parts_not_done - ) - } - status => format!("{:?}", status), - } -} - /// Create an abstract collection of elements to be shuffled. /// Each element will appear in the shuffled output exactly `limit` times. /// Use it as an iterator to access the shuffled collection.