From 1d4ae967c0048fb4d88ca76a99f06cb4255ec7a9 Mon Sep 17 00:00:00 2001 From: driftluo Date: Thu, 14 Mar 2024 14:57:12 +0800 Subject: [PATCH] feat: limit ibd orphan pool size --- rpc/README.md | 5 +++- rpc/src/module/net.rs | 4 ++- sync/Cargo.toml | 12 ++++----- sync/src/orphan_block_pool.rs | 25 +++++++++++++++++- sync/src/synchronizer/block_fetcher.rs | 35 ++++++++++++++++++++++++-- sync/src/tests/orphan_block_pool.rs | 7 +++++- util/constant/src/sync.rs | 3 +++ util/jsonrpc-types/src/net.rs | 2 ++ 8 files changed, 81 insertions(+), 12 deletions(-) diff --git a/rpc/README.md b/rpc/README.md index 6d7c390837..a9887e2269 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -4125,7 +4125,8 @@ Response "inflight_blocks_count": "0x0", "low_time": "0x5dc", "normal_time": "0x4e2", - "orphan_blocks_count": "0x0" + "orphan_blocks_count": "0x0", + "orphan_blocks_size": "0x0" } } ``` @@ -6512,6 +6513,8 @@ The overall chain synchronization state of this local node. If this number is too high, it indicates that block download has stuck at some block. +* `orphan_blocks_size`: [`Uint64`](#type-uint64) - The size of all download orphan blocks + ### Type `Timestamp` The Unix timestamp in milliseconds (1 second is 1000 milliseconds). diff --git a/rpc/src/module/net.rs b/rpc/src/module/net.rs index 05c82f8db7..658011f80e 100644 --- a/rpc/src/module/net.rs +++ b/rpc/src/module/net.rs @@ -368,7 +368,8 @@ pub trait NetRpc { /// "inflight_blocks_count": "0x0", /// "low_time": "0x5dc", /// "normal_time": "0x4e2", - /// "orphan_blocks_count": "0x0" + /// "orphan_blocks_count": "0x0", + /// "orphan_blocks_size": "0x0" /// } /// } /// ``` @@ -723,6 +724,7 @@ impl NetRpc for NetRpcImpl { best_known_block_number: best_known.number().into(), best_known_block_timestamp: best_known.timestamp().into(), orphan_blocks_count: (state.orphan_pool().len() as u64).into(), + orphan_blocks_size: (state.orphan_pool().total_size() as u64).into(), inflight_blocks_count: (state.read_inflight_blocks().total_inflight_count() as u64) .into(), fast_time: fast_time.into(), diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 43b39d7543..4e6b7e0a39 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -12,18 +12,18 @@ repository = "https://github.com/nervosnetwork/ckb" ckb-chain = { path = "../chain", version = "= 0.115.0-pre" } ckb-shared = { path = "../shared", version = "= 0.115.0-pre" } ckb-store = { path = "../store", version = "= 0.115.0-pre" } -ckb-app-config = {path = "../util/app-config", version = "= 0.115.0-pre"} -ckb-types = {path = "../util/types", version = "= 0.115.0-pre"} +ckb-app-config = { path = "../util/app-config", version = "= 0.115.0-pre" } +ckb-types = { path = "../util/types", version = "= 0.115.0-pre" } ckb-network = { path = "../network", version = "= 0.115.0-pre" } -ckb-logger = {path = "../util/logger", version = "= 0.115.0-pre"} -ckb-metrics = {path = "../util/metrics", version = "= 0.115.0-pre"} +ckb-logger = { path = "../util/logger", version = "= 0.115.0-pre" } +ckb-metrics = { path = "../util/metrics", version = "= 0.115.0-pre" } ckb-util = { path = "../util", version = "= 0.115.0-pre" } ckb-verification = { path = "../verification", version = "= 0.115.0-pre" } ckb-verification-traits = { path = "../verification/traits", version = "= 0.115.0-pre" } ckb-chain-spec = { path = "../spec", version = "= 0.115.0-pre" } ckb-channel = { path = "../util/channel", version = "= 0.115.0-pre" } ckb-traits = { path = "../traits", version = "= 0.115.0-pre" } -ckb-error = {path = "../error", version = "= 0.115.0-pre"} +ckb-error = { path = "../error", version = "= 0.115.0-pre" } ckb-tx-pool = { path = "../tx-pool", version = "= 0.115.0-pre" } sentry = { version = "0.26.0", optional = true } ckb-constant = { path = "../util/constant", version = "= 0.115.0-pre" } @@ -50,7 +50,7 @@ ckb-reward-calculator = { path = "../util/reward-calculator", version = "= 0.115 ckb-chain = { path = "../chain", version = "= 0.115.0-pre", features = ["mock"] } faux = "^0.1" once_cell = "1.8.0" -ckb-systemtime = { path = "../util/systemtime", version = "= 0.115.0-pre" , features = ["enable_faketime"]} +ckb-systemtime = { path = "../util/systemtime", version = "= 0.115.0-pre", features = ["enable_faketime"] } ckb-proposal-table = { path = "../util/proposal-table", version = "= 0.115.0-pre" } [features] diff --git a/sync/src/orphan_block_pool.rs b/sync/src/orphan_block_pool.rs index 786c6d1f23..20d6eda26d 100644 --- a/sync/src/orphan_block_pool.rs +++ b/sync/src/orphan_block_pool.rs @@ -1,4 +1,4 @@ -use ckb_logger::debug; +use ckb_logger::{debug, error}; use ckb_types::core::EpochNumber; use ckb_types::{core, packed}; use ckb_util::{parking_lot::RwLock, shrink_to_fit}; @@ -20,6 +20,8 @@ struct InnerPool { parents: HashMap, // Leaders are blocks not in the orphan pool but having at least a child in the pool. leaders: HashSet, + // block size of pool + block_size: usize, } impl InnerPool { @@ -28,16 +30,26 @@ impl InnerPool { blocks: HashMap::with_capacity(capacity), parents: HashMap::new(), leaders: HashSet::new(), + block_size: 0, } } fn insert(&mut self, block: core::BlockView) { let hash = block.header().hash(); let parent_hash = block.data().header().raw().parent_hash(); + + self.block_size = self + .block_size + .checked_add(block.data().total_size()) + .unwrap_or_else(|| { + error!("orphan pool block size add overflow"); + usize::MAX + }); self.blocks .entry(parent_hash.clone()) .or_default() .insert(hash.clone(), block); + // Out-of-order insertion needs to be deduplicated self.leaders.remove(&hash); // It is a possible optimization to make the judgment in advance, @@ -72,6 +84,13 @@ impl InnerPool { } } + self.block_size = self + .block_size + .checked_sub(removed.iter().map(|b| b.data().total_size()).sum::()) + .unwrap_or_else(|| { + error!("orphan pool block size sub overflow"); + 0 + }); debug!("orphan pool pop chain len: {}", removed.len()); debug_assert_ne!( removed.len(), @@ -160,6 +179,10 @@ impl OrphanBlockPool { self.len() == 0 } + pub fn total_size(&self) -> usize { + self.inner.read().block_size + } + pub fn clone_leaders(&self) -> Vec { self.inner.read().leaders.iter().cloned().collect() } diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index e880716d6d..5ba4fcee8e 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -3,6 +3,7 @@ use crate::types::{ActiveChain, BlockNumberAndHash, HeaderIndex, HeaderIndexView use crate::SyncShared; use ckb_constant::sync::{ BLOCK_DOWNLOAD_WINDOW, CHECK_POINT_WINDOW, INIT_BLOCKS_IN_TRANSIT_PER_PEER, + MAX_ORPHAN_POOL_SIZE, }; use ckb_logger::{debug, trace}; use ckb_network::PeerIndex; @@ -138,10 +139,40 @@ impl BlockFetcher { return None; } + let mut block_download_window = BLOCK_DOWNLOAD_WINDOW; let state = self.sync_shared.state(); let mut inflight = state.write_inflight_blocks(); + + // During IBD, if the total block size of the orphan block pool is greater than MAX_ORPHAN_POOL_SIZE, + // we will enter a special download mode. In this mode, the node will only allow downloading + // the tip+1 block to reduce memory usage as quickly as possible. + // + // If there are more than CHECK_POINT_WINDOW blocks(ckb block maximum is 570kb) in + // the orphan block pool, immediately trace the tip + 1 block being downloaded, and + // re-select the target for downloading after timeout. + // + // Also try to send a chunk download request for tip + 1 + if state.orphan_pool().total_size() >= MAX_ORPHAN_POOL_SIZE { + let tip = self.active_chain.tip_number(); + // set download window to 2 + block_download_window = 2; + debug!( + "[Enter special download mode], orphan pool total size = {}, \ + orphan len = {}, inflight_len = {}, tip = {}", + state.orphan_pool().total_size(), + state.orphan_pool().len(), + inflight.total_inflight_count(), + tip + ); + + // will remove it's task if timeout + if state.orphan_pool().len() > CHECK_POINT_WINDOW as usize { + inflight.mark_slow_block(tip); + } + } + let mut start = last_common.number() + 1; - let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW); + let mut end = min(best_known.number(), start + block_download_window); let n_fetch = min( end.saturating_sub(start) as usize + 1, inflight.peer_can_fetch_count(self.peer), @@ -170,7 +201,7 @@ impl BlockFetcher { .state() .peers() .set_last_common_header(self.peer, header.number_and_hash()); - end = min(best_known.number(), header.number() + BLOCK_DOWNLOAD_WINDOW); + end = min(best_known.number(), header.number() + block_download_window); break; } else if status.contains(BlockStatus::BLOCK_RECEIVED) { // Do not download repeatedly diff --git a/sync/src/tests/orphan_block_pool.rs b/sync/src/tests/orphan_block_pool.rs index 3824beeb08..f535871b03 100644 --- a/sync/src/tests/orphan_block_pool.rs +++ b/sync/src/tests/orphan_block_pool.rs @@ -26,17 +26,21 @@ fn test_remove_blocks_by_parent() { let mut blocks = Vec::new(); let mut parent = consensus.genesis_block().header(); let pool = OrphanBlockPool::with_capacity(200); + let mut total_size = 0; for _ in 1..block_number { let new_block = gen_block(&parent); + total_size += new_block.data().total_size(); blocks.push(new_block.clone()); pool.insert(new_block.clone()); parent = new_block.header(); } + assert_eq!(total_size, pool.total_size()); let orphan = pool.remove_blocks_by_parent(&consensus.genesis_block().hash()); let orphan_set: HashSet = orphan.into_iter().collect(); let blocks_set: HashSet = blocks.into_iter().collect(); - assert_eq!(orphan_set, blocks_set) + assert_eq!(orphan_set, blocks_set); + assert_eq!(0, pool.total_size()); } #[test] @@ -145,4 +149,5 @@ fn test_remove_expired_blocks() { let v = pool.clean_expired_blocks(20_u64); assert_eq!(v.len(), 19); assert_eq!(pool.leaders_len(), 0); + assert_eq!(pool.total_size(), 0) } diff --git a/util/constant/src/sync.rs b/util/constant/src/sync.rs index 488e1faecd..1462fc31fe 100644 --- a/util/constant/src/sync.rs +++ b/util/constant/src/sync.rs @@ -53,6 +53,9 @@ pub const BLOCK_DOWNLOAD_TIMEOUT: u64 = 30 * 1000; // 30s // potential degree of disordering of blocks. pub const BLOCK_DOWNLOAD_WINDOW: u64 = 1024 * 8; // 1024 * default_outbound_peers +/// Orphan block pool max size +pub const MAX_ORPHAN_POOL_SIZE: usize = 1024 * 1024 * 256; + /// Interval between repeated inquiry transactions pub const RETRY_ASK_TX_TIMEOUT_INCREASE: Duration = Duration::from_secs(30); diff --git a/util/jsonrpc-types/src/net.rs b/util/jsonrpc-types/src/net.rs index 09a83e90b9..502ee0f753 100644 --- a/util/jsonrpc-types/src/net.rs +++ b/util/jsonrpc-types/src/net.rs @@ -276,6 +276,8 @@ pub struct SyncState { /// /// If this number is too high, it indicates that block download has stuck at some block. pub orphan_blocks_count: Uint64, + /// The size of all download orphan blocks + pub orphan_blocks_size: Uint64, /// Count of downloading blocks. pub inflight_blocks_count: Uint64, /// The download scheduler's time analysis data, the fast is the 1/3 of the cut-off point, unit ms