diff --git a/chain/chain/src/block_processing_utils.rs b/chain/chain/src/block_processing_utils.rs index b1128b77166..c74730cfdcd 100644 --- a/chain/chain/src/block_processing_utils.rs +++ b/chain/chain/src/block_processing_utils.rs @@ -1,5 +1,6 @@ -use crate::chain::{BlockMissingChunks, OrphanMissingChunks}; +use crate::chain::BlockMissingChunks; use crate::near_chain_primitives::error::BlockKnownError::KnownInProcessing; +use crate::orphan::OrphanMissingChunks; use crate::Provenance; use near_primitives::block::Block; use near_primitives::challenge::{ChallengeBody, ChallengesResult}; diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index c09555852d5..1a347e933fc 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -6,7 +6,8 @@ use crate::crypto_hash_timer::CryptoHashTimer; use crate::lightclient::get_epoch_block_producers_view; use crate::metrics::{SHARD_LAYOUT_NUM_SHARDS, SHARD_LAYOUT_VERSION}; use crate::migrations::check_if_block_is_first_with_chunk_of_version; -use crate::missing_chunks::{BlockLike, MissingChunksPool}; +use crate::missing_chunks::MissingChunksPool; +use crate::orphan::{Orphan, OrphanBlockPool}; use crate::state_request_tracker::StateRequestTracker; use crate::state_snapshot_actor::SnapshotCallbacks; use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode}; @@ -102,30 +103,12 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use std::time::{Duration as TimeDuration, Instant}; +use std::time::Instant; use tracing::{debug, debug_span, error, info, warn, Span}; -/// Maximum number of orphans chain can store. -pub const MAX_ORPHAN_SIZE: usize = 1024; - -/// Maximum age of orphan to store in the chain. -const MAX_ORPHAN_AGE_SECS: u64 = 300; - -// Number of orphan ancestors should be checked to request chunks -// Orphans for which we will request for missing chunks must satisfy, -// its NUM_ORPHAN_ANCESTORS_CHECK'th ancestor has been accepted -pub const NUM_ORPHAN_ANCESTORS_CHECK: u64 = 3; - /// The size of the invalid_blocks in-memory pool pub const INVALID_CHUNKS_POOL_SIZE: usize = 5000; -// Maximum number of orphans that we can request missing chunks -// Note that if there are no forks, the maximum number of orphans we would -// request missing chunks will not exceed NUM_ORPHAN_ANCESTORS_CHECK, -// this number only adds another restriction when there are multiple forks. -// It should almost never be hit -const MAX_ORPHAN_MISSING_CHUNKS: usize = 5; - /// 10000 years in seconds. Big constant for sandbox to allow time traveling. #[cfg(feature = "sandbox")] const ACCEPTABLE_TIME_DIFFERENCE: i64 = 60 * 60 * 24 * 365 * 10000; @@ -152,208 +135,6 @@ enum ApplyChunksMode { NotCaughtUp, } -/// Orphan is a block whose previous block is not accepted (in store) yet. -/// Therefore, they are not ready to be processed yet. -/// We save these blocks in an in-memory orphan pool to be processed later -/// after their previous block is accepted. -pub struct Orphan { - block: MaybeValidated, - provenance: Provenance, - added: Instant, -} - -impl BlockLike for Orphan { - fn hash(&self) -> CryptoHash { - *self.block.hash() - } - - fn height(&self) -> u64 { - self.block.header().height() - } -} - -impl Orphan { - fn prev_hash(&self) -> &CryptoHash { - self.block.header().prev_hash() - } -} - -/// OrphanBlockPool stores information of all orphans that are waiting to be processed -/// A block is added to the orphan pool when process_block failed because the block is an orphan -/// A block is removed from the pool if -/// 1) it is ready to be processed -/// or -/// 2) size of the pool exceeds MAX_ORPHAN_SIZE and the orphan was added a long time ago -/// or the height is high -pub struct OrphanBlockPool { - /// A map from block hash to a orphan block - orphans: HashMap, - /// A set that contains all orphans for which we have requested missing chunks for them - /// An orphan can be added to this set when it was first added to the pool, or later - /// when certain requirements are satisfied (see check_orphans) - /// It can only be removed from this set when the orphan is removed from the pool - orphans_requested_missing_chunks: HashSet, - /// A map from block heights to orphan blocks at the height - /// It's used to evict orphans when the pool is saturated - height_idx: HashMap>, - /// A map from block hashes to orphan blocks whose prev block is the block - /// It's used to check which orphan blocks are ready to be processed when a block is accepted - prev_hash_idx: HashMap>, - /// number of orphans that were evicted - evicted: usize, -} - -impl OrphanBlockPool { - pub fn new() -> OrphanBlockPool { - OrphanBlockPool { - orphans: HashMap::default(), - orphans_requested_missing_chunks: HashSet::default(), - height_idx: HashMap::default(), - prev_hash_idx: HashMap::default(), - evicted: 0, - } - } - - pub fn len(&self) -> usize { - self.orphans.len() - } - - fn len_evicted(&self) -> usize { - self.evicted - } - - /// Add a block to the orphan pool - /// `requested_missing_chunks`: whether missing chunks has been requested for the orphan - fn add(&mut self, orphan: Orphan, requested_missing_chunks: bool) { - let block_hash = *orphan.block.hash(); - let height_hashes = self.height_idx.entry(orphan.block.header().height()).or_default(); - height_hashes.push(*orphan.block.hash()); - let prev_hash_entries = - self.prev_hash_idx.entry(*orphan.block.header().prev_hash()).or_default(); - prev_hash_entries.push(block_hash); - self.orphans.insert(block_hash, orphan); - if requested_missing_chunks { - self.orphans_requested_missing_chunks.insert(block_hash); - } - - if self.orphans.len() > MAX_ORPHAN_SIZE { - let old_len = self.orphans.len(); - - let mut removed_hashes: HashSet = HashSet::default(); - self.orphans.retain(|_, ref mut x| { - let keep = x.added.elapsed() < TimeDuration::from_secs(MAX_ORPHAN_AGE_SECS); - if !keep { - removed_hashes.insert(*x.block.hash()); - } - keep - }); - let mut heights = self.height_idx.keys().cloned().collect::>(); - heights.sort_unstable(); - for h in heights.iter().rev() { - if let Some(hash) = self.height_idx.remove(h) { - for h in hash { - let _ = self.orphans.remove(&h); - removed_hashes.insert(h); - } - } - if self.orphans.len() < MAX_ORPHAN_SIZE { - break; - } - } - self.height_idx.retain(|_, ref mut xs| xs.iter().any(|x| !removed_hashes.contains(x))); - self.prev_hash_idx - .retain(|_, ref mut xs| xs.iter().any(|x| !removed_hashes.contains(x))); - self.orphans_requested_missing_chunks.retain(|x| !removed_hashes.contains(x)); - - self.evicted += old_len - self.orphans.len(); - } - metrics::NUM_ORPHANS.set(self.orphans.len() as i64); - } - - pub fn contains(&self, hash: &CryptoHash) -> bool { - self.orphans.contains_key(hash) - } - - pub fn get(&self, hash: &CryptoHash) -> Option<&Orphan> { - self.orphans.get(hash) - } - - // Iterates over existing orphans. - pub fn map(&self, orphan_fn: &mut dyn FnMut(&CryptoHash, &Block, &Instant)) { - self.orphans - .iter() - .map(|it| orphan_fn(it.0, it.1.block.get_inner(), &it.1.added)) - .collect_vec(); - } - - /// Remove all orphans in the pool that can be "adopted" by block `prev_hash`, i.e., children - /// of `prev_hash` and return the list. - /// This function is called when `prev_hash` is accepted, thus its children can be removed - /// from the orphan pool and be processed. - pub fn remove_by_prev_hash(&mut self, prev_hash: CryptoHash) -> Option> { - let mut removed_hashes: HashSet = HashSet::default(); - let ret = self.prev_hash_idx.remove(&prev_hash).map(|hs| { - hs.iter() - .filter_map(|h| { - removed_hashes.insert(*h); - self.orphans_requested_missing_chunks.remove(h); - self.orphans.remove(h) - }) - .collect() - }); - - self.height_idx.retain(|_, ref mut xs| xs.iter().any(|x| !removed_hashes.contains(x))); - - metrics::NUM_ORPHANS.set(self.orphans.len() as i64); - ret - } - - /// Return a list of orphans that are among the `target_depth` immediate descendants of - /// the block `parent_hash` - pub fn get_orphans_within_depth( - &self, - parent_hash: CryptoHash, - target_depth: u64, - ) -> Vec { - let mut _visited = HashSet::new(); - - let mut res = vec![]; - let mut queue = vec![(parent_hash, 0)]; - while let Some((prev_hash, depth)) = queue.pop() { - if depth == target_depth { - break; - } - if let Some(block_hashes) = self.prev_hash_idx.get(&prev_hash) { - for hash in block_hashes { - queue.push((*hash, depth + 1)); - res.push(*hash); - // there should be no loop - debug_assert!(_visited.insert(*hash)); - } - } - - // probably something serious went wrong here because there shouldn't be so many forks - assert!( - res.len() <= 100 * target_depth as usize, - "found too many orphans {:?}, probably something is wrong with the chain", - res - ); - } - res - } - - /// Returns true if the block has not been requested yet and the number of orphans - /// for which we have requested missing chunks have not exceeded MAX_ORPHAN_MISSING_CHUNKS - fn can_request_missing_chunks_for_orphan(&self, block_hash: &CryptoHash) -> bool { - self.orphans_requested_missing_chunks.len() < MAX_ORPHAN_MISSING_CHUNKS - && !self.orphans_requested_missing_chunks.contains(block_hash) - } - - fn mark_missing_chunks_requested_for_orphan(&mut self, block_hash: CryptoHash) { - self.orphans_requested_missing_chunks.insert(block_hash); - } -} - /// Contains information for missing chunks in a block pub struct BlockMissingChunks { /// previous block hash @@ -370,28 +151,6 @@ impl Debug for BlockMissingChunks { } } -/// Contains information needed to request chunks for orphans -/// Fields will be used as arguments for `request_chunks_for_orphan` -pub struct OrphanMissingChunks { - pub missing_chunks: Vec, - /// epoch id for the block that has missing chunks - pub epoch_id: EpochId, - /// hash of an ancestor block of the block that has missing chunks - /// this is used as an argument for `request_chunks_for_orphan` - /// see comments in `request_chunks_for_orphan` for what `ancestor_hash` is used for - pub ancestor_hash: CryptoHash, -} - -impl Debug for OrphanMissingChunks { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("OrphanMissingChunks") - .field("epoch_id", &self.epoch_id) - .field("ancestor_hash", &self.ancestor_hash) - .field("num_missing_chunks", &self.missing_chunks.len()) - .finish() - } -} - /// Check if block header is known /// Returns Err(Error) if any error occurs when checking store /// Ok(Err(BlockKnownError)) if the block header is known @@ -466,7 +225,7 @@ pub struct Chain { pub epoch_manager: Arc, pub shard_tracker: ShardTracker, pub runtime_adapter: Arc, - orphans: OrphanBlockPool, + pub(crate) orphans: OrphanBlockPool, pub blocks_with_missing_chunks: MissingChunksPool, genesis: Block, pub transaction_validity_period: NumBlocks, @@ -861,11 +620,6 @@ impl Chain { if self.store.get_block(block.hash()).is_ok() { return Ok(()); } - if let Err(e) = self.validate_block(&block) { - byzantine_assert!(false); - return Err(e); - } - let mut chain_store_update = ChainStoreUpdate::new(&mut self.store); chain_store_update.save_block(block.into_inner()); @@ -876,25 +630,6 @@ impl Chain { Ok(()) } - pub fn save_orphan( - &mut self, - block: MaybeValidated, - requested_missing_chunks: bool, - ) -> Result<(), Error> { - if self.orphans.contains(block.hash()) { - return Ok(()); - } - if let Err(e) = self.validate_block(&block) { - byzantine_assert!(false); - return Err(e); - } - self.orphans.add( - Orphan { block, provenance: Provenance::NONE, added: StaticClock::instant() }, - requested_missing_chunks, - ); - Ok(()) - } - fn save_block_height_processed(&mut self, block_height: BlockHeight) -> Result<(), Error> { let mut chain_store_update = ChainStoreUpdate::new(&mut self.store); if !chain_store_update.is_height_processed(block_height)? { @@ -2176,7 +1911,6 @@ impl Chain { let tail_height = self.store.tail()?; // we only add blocks that couldn't have been gc'ed to the orphan pool. if block_height >= tail_height { - let block_hash = *block.hash(); let requested_missing_chunks = if let Some(orphan_missing_chunks) = self.should_request_chunks_for_orphan(me, &block) { @@ -2190,19 +1924,11 @@ impl Chain { let time = StaticClock::instant(); self.blocks_delay_tracker.mark_block_orphaned(block.hash(), time); - let orphan = Orphan { block, provenance, added: time }; - self.orphans.add(orphan, requested_missing_chunks); - - debug!( - target: "chain", - "Process block: orphan: {:?}, # orphans {}{}", - block_hash, - self.orphans.len(), - if self.orphans.len_evicted() > 0 { - format!(", # evicted {}", self.orphans.len_evicted()) - } else { - String::new() - }, + self.save_orphan( + block, + provenance, + Some(time), + requested_missing_chunks, ); } } @@ -2565,11 +2291,7 @@ impl Chain { return Err(Error::InvalidRandomnessBeaconOutput); } - let res = block.validate_with(|block| { - Chain::validate_block_impl(self.epoch_manager.as_ref(), &self.genesis, block) - .map(|_| true) - }); - if let Err(e) = res { + if let Err(e) = self.validate_block(block) { byzantine_assert!(false); return Err(e); } @@ -2663,73 +2385,6 @@ impl Chain { } } - /// Check if we can request chunks for this orphan. Conditions are - /// 1) Orphans that with outstanding missing chunks request has not exceed `MAX_ORPHAN_MISSING_CHUNKS` - /// 2) we haven't already requested missing chunks for the orphan - /// 3) All the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block are either accepted, - /// or orphans or in `blocks_with_missing_chunks` - /// 4) Among the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block at least one is - /// accepted(is in store), call it `ancestor` - /// 5) The next block of `ancestor` has the same epoch_id as the orphan block - /// (This is because when requesting chunks, we will use `ancestor` hash instead of the - /// previous block hash of the orphan to decide epoch id) - /// 6) The orphan has missing chunks - pub fn should_request_chunks_for_orphan( - &mut self, - me: &Option, - orphan: &Block, - ) -> Option { - // 1) Orphans that with outstanding missing chunks request has not exceed `MAX_ORPHAN_MISSING_CHUNKS` - // 2) we haven't already requested missing chunks for the orphan - if !self.orphans.can_request_missing_chunks_for_orphan(orphan.hash()) { - return None; - } - let mut block_hash = *orphan.header().prev_hash(); - for _ in 0..NUM_ORPHAN_ANCESTORS_CHECK { - // 3) All the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block are either accepted, - // or orphans or in `blocks_with_missing_chunks` - if let Some(block) = self.blocks_with_missing_chunks.get(&block_hash) { - block_hash = *block.prev_hash(); - continue; - } - if let Some(orphan) = self.orphans.get(&block_hash) { - block_hash = *orphan.prev_hash(); - continue; - } - // 4) Among the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block at least one is - // accepted(is in store), call it `ancestor` - if self.get_block(&block_hash).is_ok() { - if let Ok(epoch_id) = self.epoch_manager.get_epoch_id_from_prev_block(&block_hash) { - // 5) The next block of `ancestor` has the same epoch_id as the orphan block - if &epoch_id == orphan.header().epoch_id() { - // 6) The orphan has missing chunks - if let Err(e) = self.ping_missing_chunks(me, block_hash, orphan) { - return match e { - Error::ChunksMissing(missing_chunks) => { - debug!(target:"chain", "Request missing chunks for orphan {:?} {:?}", orphan.hash(), missing_chunks.iter().map(|chunk|{(chunk.shard_id(), chunk.chunk_hash())}).collect::>()); - Some(OrphanMissingChunks { - missing_chunks, - epoch_id, - ancestor_hash: block_hash, - }) - } - _ => None, - }; - } - } - } - return None; - } - return None; - } - None - } - - /// only used for test - pub fn check_orphan_partial_chunks_requested(&self, block_hash: &CryptoHash) -> bool { - self.orphans.orphans_requested_missing_chunks.contains(block_hash) - } - pub fn prev_block_is_caught_up( &self, prev_prev_hash: &CryptoHash, @@ -2836,65 +2491,6 @@ impl Chain { } } - /// Check for orphans that are ready to be processed or request missing chunks, process these blocks. - /// `prev_hash`: hash of the block that is just accepted - /// `block_accepted`: callback to be called when an orphan is accepted - /// `block_misses_chunks`: callback to be called when an orphan is added to the pool of blocks - /// that have missing chunks - /// `orphan_misses_chunks`: callback to be called when it is ready to request missing chunks for - /// an orphan - /// `on_challenge`: callback to be called when an orphan should be challenged - pub fn check_orphans( - &mut self, - me: &Option, - prev_hash: CryptoHash, - block_processing_artifacts: &mut BlockProcessingArtifact, - apply_chunks_done_callback: DoneApplyChunkCallback, - ) { - let _span = debug_span!( - target: "chain", - "check_orphans", - ?prev_hash, - num_orphans = self.orphans.len()) - .entered(); - // Check if there are orphans we can process. - // check within the descendents of `prev_hash` to see if there are orphans there that - // are ready to request missing chunks for - let orphans_to_check = - self.orphans.get_orphans_within_depth(prev_hash, NUM_ORPHAN_ANCESTORS_CHECK); - for orphan_hash in orphans_to_check { - let orphan = self.orphans.get(&orphan_hash).unwrap().block.clone(); - if let Some(orphan_missing_chunks) = self.should_request_chunks_for_orphan(me, &orphan) - { - block_processing_artifacts.orphans_missing_chunks.push(orphan_missing_chunks); - self.orphans.mark_missing_chunks_requested_for_orphan(orphan_hash); - } - } - if let Some(orphans) = self.orphans.remove_by_prev_hash(prev_hash) { - debug!(target: "chain", found_orphans = orphans.len(), "Check orphans"); - for orphan in orphans.into_iter() { - let block_hash = orphan.hash(); - self.blocks_delay_tracker - .mark_block_unorphaned(&block_hash, StaticClock::instant()); - let res = self.start_process_block_async( - me, - orphan.block, - orphan.provenance, - block_processing_artifacts, - apply_chunks_done_callback.clone(), - ); - if let Err(err) = res { - debug!(target: "chain", "Orphan {:?} declined, error: {:?}", block_hash, err); - } - } - debug!( - target: "chain", - remaining_orphans=self.orphans.len(), - "Check orphans", - ); - } - } - pub fn get_outgoing_receipts_for_shard( &self, prev_block_hash: CryptoHash, @@ -4982,12 +4578,6 @@ impl Chain { self.genesis.header() } - /// Returns number of orphans currently in the orphan pool. - #[inline] - pub fn orphans_len(&self) -> usize { - self.orphans.len() - } - /// Returns number of orphans currently in the orphan pool. #[inline] pub fn blocks_with_missing_chunks_len(&self) -> usize { @@ -4999,18 +4589,6 @@ impl Chain { self.blocks_in_processing.len() } - /// Returns number of evicted orphans. - #[inline] - pub fn orphans_evicted_len(&self) -> usize { - self.orphans.len_evicted() - } - - /// Check if hash is for a known orphan. - #[inline] - pub fn is_orphan(&self, hash: &CryptoHash) -> bool { - self.orphans.contains(hash) - } - /// Check if hash is for a known chunk orphan. #[inline] pub fn is_chunk_orphan(&self, hash: &CryptoHash) -> bool { diff --git a/chain/chain/src/lib.rs b/chain/chain/src/lib.rs index 51d0ad104d7..5e506ff65e9 100644 --- a/chain/chain/src/lib.rs +++ b/chain/chain/src/lib.rs @@ -1,5 +1,5 @@ pub use block_processing_utils::{BlockProcessingArtifact, DoneApplyChunkCallback}; -pub use chain::{check_known, collect_receipts, Chain, ChainUpdate, MAX_ORPHAN_SIZE}; +pub use chain::{check_known, collect_receipts, Chain, ChainUpdate}; pub use doomslug::{Doomslug, DoomslugBlockProductionReadiness, DoomslugThresholdMode}; pub use lightclient::{create_light_client_block_view, get_epoch_block_producers_view}; pub use near_chain_primitives::{self, Error}; @@ -19,6 +19,7 @@ mod lightclient; mod metrics; pub mod migrations; pub mod missing_chunks; +pub mod orphan; pub mod resharding; mod state_request_tracker; pub mod state_snapshot_actor; diff --git a/chain/chain/src/orphan.rs b/chain/chain/src/orphan.rs new file mode 100644 index 00000000000..7cb123de429 --- /dev/null +++ b/chain/chain/src/orphan.rs @@ -0,0 +1,431 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt::{Debug, Formatter}; +use std::time::{Duration, Instant}; + +use near_chain_primitives::Error; +use near_primitives::block::Block; +use near_primitives::hash::CryptoHash; +use near_primitives::sharding::ShardChunkHeader; +use near_primitives::static_clock::StaticClock; +use near_primitives::types::{AccountId, BlockHeight, EpochId}; +use near_primitives::utils::MaybeValidated; +use tracing::{debug, debug_span}; + +use crate::missing_chunks::BlockLike; +use crate::{metrics, BlockProcessingArtifact, Chain, DoneApplyChunkCallback, Provenance}; + +/// Maximum number of orphans chain can store. +const MAX_ORPHAN_SIZE: usize = 1024; + +/// Maximum age of orphan to store in the chain. +const MAX_ORPHAN_AGE_SECS: u64 = 300; + +// Number of orphan ancestors should be checked to request chunks +// Orphans for which we will request for missing chunks must satisfy, +// its NUM_ORPHAN_ANCESTORS_CHECK'th ancestor has been accepted +pub const NUM_ORPHAN_ANCESTORS_CHECK: u64 = 3; + +// Maximum number of orphans that we can request missing chunks +// Note that if there are no forks, the maximum number of orphans we would +// request missing chunks will not exceed NUM_ORPHAN_ANCESTORS_CHECK, +// this number only adds another restriction when there are multiple forks. +// It should almost never be hit +const MAX_ORPHAN_MISSING_CHUNKS: usize = 5; + +/// Orphan is a block whose previous block is not accepted (in store) yet. +/// Therefore, they are not ready to be processed yet. +/// We save these blocks in an in-memory orphan pool to be processed later +/// after their previous block is accepted. +pub struct Orphan { + pub(crate) block: MaybeValidated, + pub(crate) provenance: Provenance, + pub(crate) added: Instant, +} + +impl BlockLike for Orphan { + fn hash(&self) -> CryptoHash { + *self.block.hash() + } + + fn height(&self) -> u64 { + self.block.header().height() + } +} + +impl Orphan { + fn prev_hash(&self) -> &CryptoHash { + self.block.header().prev_hash() + } +} + +/// OrphanBlockPool stores information of all orphans that are waiting to be processed +/// A block is added to the orphan pool when process_block failed because the block is an orphan +/// A block is removed from the pool if +/// 1) it is ready to be processed +/// or +/// 2) size of the pool exceeds MAX_ORPHAN_SIZE and the orphan was added a long time ago +/// or the height is high +pub struct OrphanBlockPool { + /// A map from block hash to a orphan block + orphans: HashMap, + /// A set that contains all orphans for which we have requested missing chunks for them + /// An orphan can be added to this set when it was first added to the pool, or later + /// when certain requirements are satisfied (see check_orphans) + /// It can only be removed from this set when the orphan is removed from the pool + orphans_requested_missing_chunks: HashSet, + /// A map from block heights to orphan blocks at the height + /// It's used to evict orphans when the pool is saturated + height_idx: HashMap>, + /// A map from block hashes to orphan blocks whose prev block is the block + /// It's used to check which orphan blocks are ready to be processed when a block is accepted + prev_hash_idx: HashMap>, + /// number of orphans that were evicted + evicted: usize, +} + +impl OrphanBlockPool { + pub fn new() -> OrphanBlockPool { + OrphanBlockPool { + orphans: HashMap::default(), + orphans_requested_missing_chunks: HashSet::default(), + height_idx: HashMap::default(), + prev_hash_idx: HashMap::default(), + evicted: 0, + } + } + + pub fn len(&self) -> usize { + self.orphans.len() + } + + fn len_evicted(&self) -> usize { + self.evicted + } + + /// Add a block to the orphan pool + /// `requested_missing_chunks`: whether missing chunks has been requested for the orphan + fn add(&mut self, orphan: Orphan, requested_missing_chunks: bool) { + let block_hash = *orphan.block.hash(); + let height_hashes = self.height_idx.entry(orphan.block.header().height()).or_default(); + height_hashes.push(*orphan.block.hash()); + let prev_hash_entries = + self.prev_hash_idx.entry(*orphan.block.header().prev_hash()).or_default(); + prev_hash_entries.push(block_hash); + self.orphans.insert(block_hash, orphan); + if requested_missing_chunks { + self.orphans_requested_missing_chunks.insert(block_hash); + } + + if self.orphans.len() > MAX_ORPHAN_SIZE { + let old_len = self.orphans.len(); + + let mut removed_hashes: HashSet = HashSet::default(); + self.orphans.retain(|_, ref mut x| { + let keep = x.added.elapsed() < Duration::from_secs(MAX_ORPHAN_AGE_SECS); + if !keep { + removed_hashes.insert(*x.block.hash()); + } + keep + }); + let mut heights = self.height_idx.keys().cloned().collect::>(); + heights.sort_unstable(); + for h in heights.iter().rev() { + if let Some(hash) = self.height_idx.remove(h) { + for h in hash { + let _ = self.orphans.remove(&h); + removed_hashes.insert(h); + } + } + if self.orphans.len() < MAX_ORPHAN_SIZE { + break; + } + } + self.height_idx.retain(|_, ref mut xs| xs.iter().any(|x| !removed_hashes.contains(x))); + self.prev_hash_idx + .retain(|_, ref mut xs| xs.iter().any(|x| !removed_hashes.contains(x))); + self.orphans_requested_missing_chunks.retain(|x| !removed_hashes.contains(x)); + + self.evicted += old_len - self.orphans.len(); + } + metrics::NUM_ORPHANS.set(self.orphans.len() as i64); + } + + pub fn contains(&self, hash: &CryptoHash) -> bool { + self.orphans.contains_key(hash) + } + + pub fn get(&self, hash: &CryptoHash) -> Option<&Orphan> { + self.orphans.get(hash) + } + + // // Iterates over existing orphans. + // pub fn map(&self, orphan_fn: &mut dyn FnMut(&CryptoHash, &Block, &Instant)) { + // self.orphans + // .iter() + // .map(|it| orphan_fn(it.0, it.1.block.get_inner(), &it.1.added)) + // .collect_vec(); + // } + + /// Remove all orphans in the pool that can be "adopted" by block `prev_hash`, i.e., children + /// of `prev_hash` and return the list. + /// This function is called when `prev_hash` is accepted, thus its children can be removed + /// from the orphan pool and be processed. + pub fn remove_by_prev_hash(&mut self, prev_hash: CryptoHash) -> Option> { + let mut removed_hashes: HashSet = HashSet::default(); + let ret = self.prev_hash_idx.remove(&prev_hash).map(|hs| { + hs.iter() + .filter_map(|h| { + removed_hashes.insert(*h); + self.orphans_requested_missing_chunks.remove(h); + self.orphans.remove(h) + }) + .collect() + }); + + self.height_idx.retain(|_, ref mut xs| xs.iter().any(|x| !removed_hashes.contains(x))); + + metrics::NUM_ORPHANS.set(self.orphans.len() as i64); + ret + } + + /// Return a list of orphans that are among the `target_depth` immediate descendants of + /// the block `parent_hash` + pub fn get_orphans_within_depth( + &self, + parent_hash: CryptoHash, + target_depth: u64, + ) -> Vec { + let mut _visited = HashSet::new(); + + let mut res = vec![]; + let mut queue = vec![(parent_hash, 0)]; + while let Some((prev_hash, depth)) = queue.pop() { + if depth == target_depth { + break; + } + if let Some(block_hashes) = self.prev_hash_idx.get(&prev_hash) { + for hash in block_hashes { + queue.push((*hash, depth + 1)); + res.push(*hash); + // there should be no loop + debug_assert!(_visited.insert(*hash)); + } + } + + // probably something serious went wrong here because there shouldn't be so many forks + assert!( + res.len() <= 100 * target_depth as usize, + "found too many orphans {:?}, probably something is wrong with the chain", + res + ); + } + res + } + + /// Returns true if the block has not been requested yet and the number of orphans + /// for which we have requested missing chunks have not exceeded MAX_ORPHAN_MISSING_CHUNKS + fn can_request_missing_chunks_for_orphan(&self, block_hash: &CryptoHash) -> bool { + self.orphans_requested_missing_chunks.len() < MAX_ORPHAN_MISSING_CHUNKS + && !self.orphans_requested_missing_chunks.contains(block_hash) + } + + fn mark_missing_chunks_requested_for_orphan(&mut self, block_hash: CryptoHash) { + self.orphans_requested_missing_chunks.insert(block_hash); + } +} + +/// Contains information needed to request chunks for orphans +/// Fields will be used as arguments for `request_chunks_for_orphan` +pub struct OrphanMissingChunks { + pub missing_chunks: Vec, + /// epoch id for the block that has missing chunks + pub epoch_id: EpochId, + /// hash of an ancestor block of the block that has missing chunks + /// this is used as an argument for `request_chunks_for_orphan` + /// see comments in `request_chunks_for_orphan` for what `ancestor_hash` is used for + pub ancestor_hash: CryptoHash, +} + +impl Debug for OrphanMissingChunks { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OrphanMissingChunks") + .field("epoch_id", &self.epoch_id) + .field("ancestor_hash", &self.ancestor_hash) + .field("num_missing_chunks", &self.missing_chunks.len()) + .finish() + } +} + +impl Chain { + pub fn save_orphan( + &mut self, + block: MaybeValidated, + provenance: Provenance, + added: Option, + requested_missing_chunks: bool, + ) { + let block_hash = *block.hash(); + if !self.orphans.contains(block.hash()) { + self.orphans.add( + Orphan { block, provenance, added: added.unwrap_or(StaticClock::instant()) }, + requested_missing_chunks, + ); + } + + debug!( + target: "chain", + "Process block: orphan: {:?}, # orphans {}{}", + block_hash, + self.orphans.len(), + if self.orphans.len_evicted() > 0 { + format!(", # evicted {}", self.orphans.len_evicted()) + } else { + String::new() + }, + ); + } + + /// Check if we can request chunks for this orphan. Conditions are + /// 1) Orphans that with outstanding missing chunks request has not exceed `MAX_ORPHAN_MISSING_CHUNKS` + /// 2) we haven't already requested missing chunks for the orphan + /// 3) All the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block are either accepted, + /// or orphans or in `blocks_with_missing_chunks` + /// 4) Among the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block at least one is + /// accepted(is in store), call it `ancestor` + /// 5) The next block of `ancestor` has the same epoch_id as the orphan block + /// (This is because when requesting chunks, we will use `ancestor` hash instead of the + /// previous block hash of the orphan to decide epoch id) + /// 6) The orphan has missing chunks + pub fn should_request_chunks_for_orphan( + &mut self, + me: &Option, + orphan: &Block, + ) -> Option { + // 1) Orphans that with outstanding missing chunks request has not exceed `MAX_ORPHAN_MISSING_CHUNKS` + // 2) we haven't already requested missing chunks for the orphan + if !self.orphans.can_request_missing_chunks_for_orphan(orphan.hash()) { + return None; + } + let mut block_hash = *orphan.header().prev_hash(); + for _ in 0..NUM_ORPHAN_ANCESTORS_CHECK { + // 3) All the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block are either accepted, + // or orphans or in `blocks_with_missing_chunks` + if let Some(block) = self.blocks_with_missing_chunks.get(&block_hash) { + block_hash = *block.prev_hash(); + continue; + } + if let Some(orphan) = self.orphans.get(&block_hash) { + block_hash = *orphan.prev_hash(); + continue; + } + // 4) Among the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block at least one is + // accepted(is in store), call it `ancestor` + if self.get_block(&block_hash).is_ok() { + if let Ok(epoch_id) = self.epoch_manager.get_epoch_id_from_prev_block(&block_hash) { + // 5) The next block of `ancestor` has the same epoch_id as the orphan block + if &epoch_id == orphan.header().epoch_id() { + // 6) The orphan has missing chunks + if let Err(e) = self.ping_missing_chunks(me, block_hash, orphan) { + return match e { + Error::ChunksMissing(missing_chunks) => { + debug!(target:"chain", "Request missing chunks for orphan {:?} {:?}", orphan.hash(), missing_chunks.iter().map(|chunk|{(chunk.shard_id(), chunk.chunk_hash())}).collect::>()); + Some(OrphanMissingChunks { + missing_chunks, + epoch_id, + ancestor_hash: block_hash, + }) + } + _ => None, + }; + } + } + } + return None; + } + return None; + } + None + } + + /// only used for test + pub fn check_orphan_partial_chunks_requested(&self, block_hash: &CryptoHash) -> bool { + self.orphans.orphans_requested_missing_chunks.contains(block_hash) + } + + /// Check for orphans that are ready to be processed or request missing chunks, process these blocks. + /// `prev_hash`: hash of the block that is just accepted + /// `block_accepted`: callback to be called when an orphan is accepted + /// `block_misses_chunks`: callback to be called when an orphan is added to the pool of blocks + /// that have missing chunks + /// `orphan_misses_chunks`: callback to be called when it is ready to request missing chunks for + /// an orphan + /// `on_challenge`: callback to be called when an orphan should be challenged + pub fn check_orphans( + &mut self, + me: &Option, + prev_hash: CryptoHash, + block_processing_artifacts: &mut BlockProcessingArtifact, + apply_chunks_done_callback: DoneApplyChunkCallback, + ) { + let _span = debug_span!( + target: "chain", + "check_orphans", + ?prev_hash, + num_orphans = self.orphans.len()) + .entered(); + // Check if there are orphans we can process. + // check within the descendents of `prev_hash` to see if there are orphans there that + // are ready to request missing chunks for + let orphans_to_check = + self.orphans.get_orphans_within_depth(prev_hash, NUM_ORPHAN_ANCESTORS_CHECK); + for orphan_hash in orphans_to_check { + let orphan = self.orphans.get(&orphan_hash).unwrap().block.clone(); + if let Some(orphan_missing_chunks) = self.should_request_chunks_for_orphan(me, &orphan) + { + block_processing_artifacts.orphans_missing_chunks.push(orphan_missing_chunks); + self.orphans.mark_missing_chunks_requested_for_orphan(orphan_hash); + } + } + if let Some(orphans) = self.orphans.remove_by_prev_hash(prev_hash) { + debug!(target: "chain", found_orphans = orphans.len(), "Check orphans"); + for orphan in orphans.into_iter() { + let block_hash = orphan.hash(); + self.blocks_delay_tracker + .mark_block_unorphaned(&block_hash, StaticClock::instant()); + let res = self.start_process_block_async( + me, + orphan.block, + orphan.provenance, + block_processing_artifacts, + apply_chunks_done_callback.clone(), + ); + if let Err(err) = res { + debug!(target: "chain", "Orphan {:?} declined, error: {:?}", block_hash, err); + } + } + debug!( + target: "chain", + remaining_orphans=self.orphans.len(), + "Check orphans", + ); + } + } + + /// Returns number of orphans currently in the orphan pool. + #[inline] + pub fn orphans_len(&self) -> usize { + self.orphans.len() + } + + /// Returns number of evicted orphans. + #[inline] + pub fn orphans_evicted_len(&self) -> usize { + self.orphans.len_evicted() + } + + /// Check if hash is for a known orphan. + #[inline] + pub fn is_orphan(&self, hash: &CryptoHash) -> bool { + self.orphans.contains(hash) + } +} diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index aa9dbfe2bcc..56242cc5433 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -23,9 +23,9 @@ use near_async::messaging::{CanSend, Sender}; use near_chain::chain::VerifyBlockHashAndSignatureResult; use near_chain::chain::{ ApplyStatePartsRequest, BlockCatchUpRequest, BlockMissingChunks, BlocksCatchUpState, - OrphanMissingChunks, }; use near_chain::flat_storage_creator::FlatStorageCreator; +use near_chain::orphan::OrphanMissingChunks; use near_chain::resharding::StateSplitRequest; use near_chain::state_snapshot_actor::SnapshotCallbacks; use near_chain::test_utils::format_hash; diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 264731e3fb2..cbef1765b2e 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -1829,6 +1829,10 @@ impl ClientActor { if let Ok(header) = self.client.chain.get_block_header(&sync_hash) { let block: MaybeValidated = (*block).clone().into(); let block_hash = *block.hash(); + if let Err(err) = self.client.chain.validate_block(&block) { + byzantine_assert!(false); + error!(target: "client", ?err, ?block_hash, "Received an invalid block during state sync"); + } // Notice that two blocks are saved differently: // * save_block() for one block. // * save_orphan() for another block. @@ -1839,9 +1843,7 @@ impl ClientActor { } } else if block_hash == sync_hash { // The first block of the new epoch. - if let Err(err) = self.client.chain.save_orphan(block, false) { - error!(target: "client", ?err, ?block_hash, "Received an invalid block during state sync"); - } + self.client.chain.save_orphan(block, Provenance::NONE, None, false); } } true diff --git a/integration-tests/src/tests/client/features/access_key_nonce_for_implicit_accounts.rs b/integration-tests/src/tests/client/features/access_key_nonce_for_implicit_accounts.rs index fedf0f3c423..be71b821172 100644 --- a/integration-tests/src/tests/client/features/access_key_nonce_for_implicit_accounts.rs +++ b/integration-tests/src/tests/client/features/access_key_nonce_for_implicit_accounts.rs @@ -1,7 +1,7 @@ use crate::tests::client::process_blocks::produce_blocks_from_height; use assert_matches::assert_matches; use near_async::messaging::CanSend; -use near_chain::chain::NUM_ORPHAN_ANCESTORS_CHECK; +use near_chain::orphan::NUM_ORPHAN_ANCESTORS_CHECK; use near_chain::{ChainGenesis, Error, Provenance}; use near_chain_configs::Genesis; use near_chunks::metrics::PARTIAL_ENCODED_CHUNK_FORWARD_CACHED_WITHOUT_HEADER;