diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 43b39d7543c..4e6b7e0a399 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -12,18 +12,18 @@ repository = "https://github.com/nervosnetwork/ckb" ckb-chain = { path = "../chain", version = "= 0.115.0-pre" } ckb-shared = { path = "../shared", version = "= 0.115.0-pre" } ckb-store = { path = "../store", version = "= 0.115.0-pre" } -ckb-app-config = {path = "../util/app-config", version = "= 0.115.0-pre"} -ckb-types = {path = "../util/types", version = "= 0.115.0-pre"} +ckb-app-config = { path = "../util/app-config", version = "= 0.115.0-pre" } +ckb-types = { path = "../util/types", version = "= 0.115.0-pre" } ckb-network = { path = "../network", version = "= 0.115.0-pre" } -ckb-logger = {path = "../util/logger", version = "= 0.115.0-pre"} -ckb-metrics = {path = "../util/metrics", version = "= 0.115.0-pre"} +ckb-logger = { path = "../util/logger", version = "= 0.115.0-pre" } +ckb-metrics = { path = "../util/metrics", version = "= 0.115.0-pre" } ckb-util = { path = "../util", version = "= 0.115.0-pre" } ckb-verification = { path = "../verification", version = "= 0.115.0-pre" } ckb-verification-traits = { path = "../verification/traits", version = "= 0.115.0-pre" } ckb-chain-spec = { path = "../spec", version = "= 0.115.0-pre" } ckb-channel = { path = "../util/channel", version = "= 0.115.0-pre" } ckb-traits = { path = "../traits", version = "= 0.115.0-pre" } -ckb-error = {path = "../error", version = "= 0.115.0-pre"} +ckb-error = { path = "../error", version = "= 0.115.0-pre" } ckb-tx-pool = { path = "../tx-pool", version = "= 0.115.0-pre" } sentry = { version = "0.26.0", optional = true } ckb-constant = { path = "../util/constant", version = "= 0.115.0-pre" } @@ -50,7 +50,7 @@ ckb-reward-calculator = { path = "../util/reward-calculator", version = "= 0.115 ckb-chain = { path = "../chain", version = "= 0.115.0-pre", features = ["mock"] } faux = "^0.1" once_cell = "1.8.0" -ckb-systemtime = { path = "../util/systemtime", version = "= 0.115.0-pre" , features = ["enable_faketime"]} +ckb-systemtime = { path = "../util/systemtime", version = "= 0.115.0-pre", features = ["enable_faketime"] } ckb-proposal-table = { path = "../util/proposal-table", version = "= 0.115.0-pre" } [features] diff --git a/sync/src/orphan_block_pool.rs b/sync/src/orphan_block_pool.rs index 786c6d1f23d..20d6eda26d1 100644 --- a/sync/src/orphan_block_pool.rs +++ b/sync/src/orphan_block_pool.rs @@ -1,4 +1,4 @@ -use ckb_logger::debug; +use ckb_logger::{debug, error}; use ckb_types::core::EpochNumber; use ckb_types::{core, packed}; use ckb_util::{parking_lot::RwLock, shrink_to_fit}; @@ -20,6 +20,8 @@ struct InnerPool { parents: HashMap, // Leaders are blocks not in the orphan pool but having at least a child in the pool. leaders: HashSet, + // block size of pool + block_size: usize, } impl InnerPool { @@ -28,16 +30,26 @@ impl InnerPool { blocks: HashMap::with_capacity(capacity), parents: HashMap::new(), leaders: HashSet::new(), + block_size: 0, } } fn insert(&mut self, block: core::BlockView) { let hash = block.header().hash(); let parent_hash = block.data().header().raw().parent_hash(); + + self.block_size = self + .block_size + .checked_add(block.data().total_size()) + .unwrap_or_else(|| { + error!("orphan pool block size add overflow"); + usize::MAX + }); self.blocks .entry(parent_hash.clone()) .or_default() .insert(hash.clone(), block); + // Out-of-order insertion needs to be deduplicated self.leaders.remove(&hash); // It is a possible optimization to make the judgment in advance, @@ -72,6 +84,13 @@ impl InnerPool { } } + self.block_size = self + .block_size + .checked_sub(removed.iter().map(|b| b.data().total_size()).sum::()) + .unwrap_or_else(|| { + error!("orphan pool block size sub overflow"); + 0 + }); debug!("orphan pool pop chain len: {}", removed.len()); debug_assert_ne!( removed.len(), @@ -160,6 +179,10 @@ impl OrphanBlockPool { self.len() == 0 } + pub fn total_size(&self) -> usize { + self.inner.read().block_size + } + pub fn clone_leaders(&self) -> Vec { self.inner.read().leaders.iter().cloned().collect() } diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index e880716d6da..52f37e8099d 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -3,6 +3,7 @@ use crate::types::{ActiveChain, BlockNumberAndHash, HeaderIndex, HeaderIndexView use crate::SyncShared; use ckb_constant::sync::{ BLOCK_DOWNLOAD_WINDOW, CHECK_POINT_WINDOW, INIT_BLOCKS_IN_TRANSIT_PER_PEER, + MAX_ORPHAN_POOL_SIZE, }; use ckb_logger::{debug, trace}; use ckb_network::PeerIndex; @@ -146,6 +147,58 @@ impl BlockFetcher { end.saturating_sub(start) as usize + 1, inflight.peer_can_fetch_count(self.peer), ); + + // During IBD, if the total block size of the orphan block pool is greater than 500M, + // we will enter a special download mode. In this mode, the node will only allow downloading + // the tip+1 block to reduce memory usage as quickly as possible. + // + // If there are more than 2000 blocks(ckb block maximum is 570kb) in + // the orphan block pool, immediately trace the tip + 1 block being downloaded, and + // re-select the target for downloading after timeout. + // + // Also try to send a chunk download request for tip + 1 + if matches!(self.ibd, IBDState::In) + && state.orphan_pool().total_size() >= MAX_ORPHAN_POOL_SIZE + { + let tip = self.active_chain.tip_header(); + + debug!( + "[Enter special download mode], orphan pool total size = {}, \ + orphan len = {}, inflight_len = {}, tip = {}", + state.orphan_pool().total_size(), + state.orphan_pool().len(), + inflight.total_inflight_count(), + tip.number() + ); + + let mut fetch = Vec::with_capacity(1); + // will remove it's task if timeout + if state.orphan_pool().len() > 2000 { + inflight.mark_slow_block(tip.number()); + } + + let header = self + .active_chain + .get_ancestor(&best_known.hash(), tip.number() + 1)?; + + let status = self.active_chain.get_block_status(&header.hash()); + + // if tip + 1 is STORED OR RECEIVED, do nothing + if status.contains(BlockStatus::BLOCK_STORED) { + } else if status.contains(BlockStatus::BLOCK_RECEIVED) { + + // if it is inflight now, here will return false + } else if inflight.insert(self.peer, (header.number(), header.hash()).into()) { + fetch.push(header); + } + + return Some( + fetch + .chunks(INIT_BLOCKS_IN_TRANSIT_PER_PEER) + .map(|headers| headers.iter().map(HeaderIndexView::hash).collect()) + .collect(), + ); + } let mut fetch = Vec::with_capacity(n_fetch); let now = unix_time_as_millis(); diff --git a/sync/src/tests/orphan_block_pool.rs b/sync/src/tests/orphan_block_pool.rs index 3824beeb082..f535871b03b 100644 --- a/sync/src/tests/orphan_block_pool.rs +++ b/sync/src/tests/orphan_block_pool.rs @@ -26,17 +26,21 @@ fn test_remove_blocks_by_parent() { let mut blocks = Vec::new(); let mut parent = consensus.genesis_block().header(); let pool = OrphanBlockPool::with_capacity(200); + let mut total_size = 0; for _ in 1..block_number { let new_block = gen_block(&parent); + total_size += new_block.data().total_size(); blocks.push(new_block.clone()); pool.insert(new_block.clone()); parent = new_block.header(); } + assert_eq!(total_size, pool.total_size()); let orphan = pool.remove_blocks_by_parent(&consensus.genesis_block().hash()); let orphan_set: HashSet = orphan.into_iter().collect(); let blocks_set: HashSet = blocks.into_iter().collect(); - assert_eq!(orphan_set, blocks_set) + assert_eq!(orphan_set, blocks_set); + assert_eq!(0, pool.total_size()); } #[test] @@ -145,4 +149,5 @@ fn test_remove_expired_blocks() { let v = pool.clean_expired_blocks(20_u64); assert_eq!(v.len(), 19); assert_eq!(pool.leaders_len(), 0); + assert_eq!(pool.total_size(), 0) } diff --git a/util/constant/src/sync.rs b/util/constant/src/sync.rs index 488e1faecd3..cc0737be4f1 100644 --- a/util/constant/src/sync.rs +++ b/util/constant/src/sync.rs @@ -53,6 +53,9 @@ pub const BLOCK_DOWNLOAD_TIMEOUT: u64 = 30 * 1000; // 30s // potential degree of disordering of blocks. pub const BLOCK_DOWNLOAD_WINDOW: u64 = 1024 * 8; // 1024 * default_outbound_peers +/// Orphan block pool max size +pub const MAX_ORPHAN_POOL_SIZE: usize = 1024 * 1024 * 500; + /// Interval between repeated inquiry transactions pub const RETRY_ASK_TX_TIMEOUT_INCREASE: Duration = Duration::from_secs(30);