diff --git a/chain/src/chain_service.rs b/chain/src/chain_service.rs index e60effadc8e..ee825230b1b 100644 --- a/chain/src/chain_service.rs +++ b/chain/src/chain_service.rs @@ -1,8 +1,9 @@ //! CKB chain service. #![allow(missing_docs)] -use crate::{LonelyBlock, ProcessBlockRequest}; -use ckb_channel::{select, Receiver, Sender}; +use crate::orphan_broker::OrphanBroker; +use crate::{LonelyBlock, LonelyBlockHash, ProcessBlockRequest}; +use ckb_channel::{select, Receiver}; use ckb_error::{Error, InternalErrorKind}; use ckb_logger::{self, debug, error, info, warn}; use ckb_shared::block_status::BlockStatus; @@ -13,13 +14,12 @@ use ckb_verification::{BlockVerifier, NonContextualBlockTxsVerifier}; use ckb_verification_traits::Verifier; /// Chain background service to receive LonelyBlock and only do `non_contextual_verify` -#[derive(Clone)] pub(crate) struct ChainService { shared: Shared, process_block_rx: Receiver, - lonely_block_tx: Sender, + orphan_broker: OrphanBroker, } impl ChainService { /// Create a new ChainService instance with shared. @@ -27,12 +27,12 @@ impl ChainService { shared: Shared, process_block_rx: Receiver, - lonely_block_tx: Sender, + consume_orphan: OrphanBroker, ) -> ChainService { ChainService { shared, process_block_rx, - lonely_block_tx, + orphan_broker: consume_orphan, } } @@ -127,25 +127,24 @@ impl ChainService { } } - if let Some(metrics) = ckb_metrics::handle() { - metrics - .ckb_chain_lonely_block_ch_len - .set(self.lonely_block_tx.len() as i64) + if let Err(err) = self.insert_block(&lonely_block) { + error!( + "insert block {}-{} failed: {:?}", + block_number, block_hash, err + ); + self.shared.block_status_map().remove(&block_hash); + lonely_block.execute_callback(Err(err)); + return; } - match self.lonely_block_tx.send(lonely_block) { - Ok(_) => { - debug!( - "processing block: {}-{}, (tip:unverified_tip):({}:{})", - block_number, - block_hash, - self.shared.snapshot().tip_number(), - self.shared.get_unverified_tip().number(), - ); - } - Err(_) => { - error!("Failed to notify new block to orphan pool, It seems that the orphan pool has exited."); - } - } + let lonely_block_hash: LonelyBlockHash = lonely_block.into(); + self.orphan_broker.process_lonely_block(lonely_block_hash); + } + + fn insert_block(&self, lonely_block: &LonelyBlock) -> Result<(), ckb_error::Error> { + let db_txn = self.shared.store().begin_transaction(); + db_txn.insert_block(lonely_block.block())?; + db_txn.commit()?; + Ok(()) } } diff --git a/chain/src/consume_orphan.rs b/chain/src/consume_orphan.rs deleted file mode 100644 index b1a3721fca0..00000000000 --- a/chain/src/consume_orphan.rs +++ /dev/null @@ -1,388 +0,0 @@ -#![allow(missing_docs)] - -use crate::utils::orphan_block_pool::OrphanBlockPool; -use crate::{LonelyBlock, LonelyBlockHash}; -use ckb_channel::{select, Receiver, Sender}; -use ckb_error::Error; -use ckb_logger::internal::trace; -use ckb_logger::{debug, error, info}; -use ckb_shared::block_status::BlockStatus; -use ckb_shared::Shared; -use ckb_store::ChainStore; -use ckb_systemtime::unix_time_as_millis; -use ckb_types::core::{BlockExt, BlockView, EpochNumber, EpochNumberWithFraction, HeaderView}; -use ckb_types::U256; -use ckb_verification::InvalidParentError; -use dashmap::mapref::entry::Entry; -use std::sync::Arc; - -pub(crate) struct ConsumeDescendantProcessor { - pub shared: Shared, - pub unverified_blocks_tx: Sender, -} - -// Store the an unverified block to the database. We may usually do this -// for an orphan block with unknown parent. But this function is also useful in testing. -pub fn store_unverified_block( - shared: &Shared, - block: Arc, -) -> Result<(HeaderView, U256), Error> { - let (block_number, block_hash) = (block.number(), block.hash()); - - let parent_header = shared - .store() - .get_block_header(&block.data().header().raw().parent_hash()) - .expect("parent already store"); - - if let Some(ext) = shared.store().get_block_ext(&block.hash()) { - debug!( - "block {}-{} has stored BlockExt: {:?}", - block_number, block_hash, ext - ); - match ext.verified { - Some(true) => { - return Ok((parent_header, ext.total_difficulty)); - } - Some(false) => { - return Err(InvalidParentError { - parent_hash: parent_header.hash(), - } - .into()); - } - None => { - // continue to process - } - } - } - - trace!("begin accept block: {}-{}", block.number(), block.hash()); - - let parent_ext = shared - .store() - .get_block_ext(&block.data().header().raw().parent_hash()) - .expect("parent already store"); - - if parent_ext.verified == Some(false) { - return Err(InvalidParentError { - parent_hash: parent_header.hash(), - } - .into()); - } - - let cannon_total_difficulty = - parent_ext.total_difficulty.to_owned() + block.header().difficulty(); - - let db_txn = Arc::new(shared.store().begin_transaction()); - - db_txn.insert_block(block.as_ref())?; - - let next_block_epoch = shared - .consensus() - .next_epoch_ext(&parent_header, &db_txn.borrow_as_data_loader()) - .expect("epoch should be stored"); - let new_epoch = next_block_epoch.is_head(); - let epoch = next_block_epoch.epoch(); - - db_txn.insert_block_epoch_index( - &block.header().hash(), - &epoch.last_block_hash_in_previous_epoch(), - )?; - if new_epoch { - db_txn.insert_epoch_ext(&epoch.last_block_hash_in_previous_epoch(), &epoch)?; - } - - let ext = BlockExt { - received_at: unix_time_as_millis(), - total_difficulty: cannon_total_difficulty.clone(), - total_uncles_count: parent_ext.total_uncles_count + block.data().uncles().len() as u64, - verified: None, - txs_fees: vec![], - cycles: None, - txs_sizes: None, - }; - - db_txn.insert_block_ext(&block.header().hash(), &ext)?; - - db_txn.commit()?; - - Ok((parent_header, cannon_total_difficulty)) -} - -impl ConsumeDescendantProcessor { - fn send_unverified_block(&self, lonely_block: LonelyBlockHash, total_difficulty: U256) { - let block_number = lonely_block.block_number_and_hash.number(); - let block_hash = lonely_block.block_number_and_hash.hash(); - if let Some(metrics) = ckb_metrics::handle() { - metrics - .ckb_chain_unverified_block_ch_len - .set(self.unverified_blocks_tx.len() as i64) - }; - - match self.unverified_blocks_tx.send(lonely_block) { - Ok(_) => { - debug!( - "process desendant block success {}-{}", - block_number, block_hash - ); - } - Err(_) => { - error!("send unverified_block_tx failed, the receiver has been closed"); - return; - } - }; - - if total_difficulty.gt(self.shared.get_unverified_tip().total_difficulty()) { - self.shared.set_unverified_tip(ckb_shared::HeaderIndex::new( - block_number, - block_hash.clone(), - total_difficulty, - )); - if let Some(handle) = ckb_metrics::handle() { - handle.ckb_chain_unverified_tip.set(block_number as i64); - } - debug!( - "set unverified_tip to {}-{}, while unverified_tip - verified_tip = {}", - block_number.clone(), - block_hash.clone(), - block_number.saturating_sub(self.shared.snapshot().tip_number()) - ) - } else { - debug!( - "received a block {}-{} with lower or equal difficulty than unverified_tip {}-{}", - block_number, - block_hash, - self.shared.get_unverified_tip().number(), - self.shared.get_unverified_tip().hash(), - ); - } - } - - pub(crate) fn process_descendant(&self, lonely_block: LonelyBlock) -> Result<(), Error> { - return match store_unverified_block(&self.shared, lonely_block.block().to_owned()) { - Ok((_parent_header, total_difficulty)) => { - self.shared - .insert_block_status(lonely_block.block().hash(), BlockStatus::BLOCK_STORED); - - let lonely_block_hash: LonelyBlockHash = lonely_block.into(); - - self.send_unverified_block(lonely_block_hash, total_difficulty); - Ok(()) - } - - Err(err) => { - if let Some(_invalid_parent_err) = err.downcast_ref::() { - self.shared - .block_status_map() - .insert(lonely_block.block().hash(), BlockStatus::BLOCK_INVALID); - } - - lonely_block.execute_callback(Err(err.clone())); - Err(err) - } - }; - } - - fn accept_descendants(&self, descendants: Vec) { - let mut has_parent_invalid_error = false; - for descendant_block in descendants { - let block_number = descendant_block.block().number(); - let block_hash = descendant_block.block().hash(); - - if has_parent_invalid_error { - self.shared - .block_status_map() - .insert(block_hash.clone(), BlockStatus::BLOCK_INVALID); - let err = Err(InvalidParentError { - parent_hash: descendant_block.block().parent_hash(), - } - .into()); - - error!( - "process descendant {}-{}, failed {:?}", - block_number, - block_hash.clone(), - err - ); - - descendant_block.execute_callback(err); - continue; - } - - if let Err(err) = self.process_descendant(descendant_block) { - error!( - "process descendant {}-{}, failed {:?}", - block_number, block_hash, err - ); - - if let Some(_invalid_parent_err) = err.downcast_ref::() { - has_parent_invalid_error = true; - } - } - } - } -} - -pub(crate) struct ConsumeOrphan { - shared: Shared, - - descendant_processor: ConsumeDescendantProcessor, - - orphan_blocks_broker: Arc, - lonely_blocks_rx: Receiver, - - stop_rx: Receiver<()>, -} - -impl ConsumeOrphan { - pub(crate) fn new( - shared: Shared, - orphan_block_pool: Arc, - unverified_blocks_tx: Sender, - lonely_blocks_rx: Receiver, - stop_rx: Receiver<()>, - ) -> ConsumeOrphan { - ConsumeOrphan { - shared: shared.clone(), - descendant_processor: ConsumeDescendantProcessor { - shared, - unverified_blocks_tx, - }, - orphan_blocks_broker: orphan_block_pool, - lonely_blocks_rx, - stop_rx, - } - } - - pub(crate) fn start(&self) { - let mut last_check_expired_orphans_epoch: EpochNumber = 0; - loop { - select! { - recv(self.lonely_blocks_rx) -> msg => match msg { - Ok(lonely_block) => { - let lonely_block_epoch: EpochNumberWithFraction = lonely_block.block().epoch(); - - let _trace_now = minstant::Instant::now(); - self.process_lonely_block(lonely_block); - if let Some(handle) = ckb_metrics::handle() { - handle.ckb_chain_process_lonely_block_duration.observe(_trace_now.elapsed().as_secs_f64()) - } - - if lonely_block_epoch.number() > last_check_expired_orphans_epoch { - self.clean_expired_orphan_blocks(); - last_check_expired_orphans_epoch = lonely_block_epoch.number(); - } - }, - Err(err) => { - error!("lonely_block_rx err: {}", err); - return - } - }, - recv(self.stop_rx) -> _ => { - info!("unverified_queue_consumer got exit signal, exit now"); - return; - }, - } - } - } - - fn clean_expired_orphan_blocks(&self) { - let epoch = self.shared.snapshot().tip_header().epoch(); - let expired_blocks = self - .orphan_blocks_broker - .clean_expired_blocks(epoch.number()); - if expired_blocks.is_empty() { - return; - } - let expired_blocks_count = expired_blocks.len(); - for block_hash in expired_blocks { - self.shared.remove_header_view(&block_hash); - } - debug!("cleaned {} expired orphan blocks", expired_blocks_count); - } - - fn search_orphan_pool(&self) { - for leader_hash in self.orphan_blocks_broker.clone_leaders() { - if !self.shared.contains_block_status( - self.shared.store(), - &leader_hash, - BlockStatus::BLOCK_STORED, - ) { - trace!("orphan leader: {} not stored", leader_hash); - continue; - } - - let descendants: Vec = self - .orphan_blocks_broker - .remove_blocks_by_parent(&leader_hash); - if descendants.is_empty() { - error!( - "leader {} does not have any descendants, this shouldn't happen", - leader_hash - ); - continue; - } - self.descendant_processor.accept_descendants(descendants); - } - } - - fn process_lonely_block(&self, lonely_block: LonelyBlock) { - let parent_hash = lonely_block.block().parent_hash(); - let block_hash = lonely_block.block().hash(); - let block_number = lonely_block.block().number(); - - { - // Is this block still verifying by ConsumeUnverified? - // If yes, skip it. - if let Entry::Occupied(entry) = self.shared.block_status_map().entry(block_hash.clone()) - { - if entry.get().eq(&BlockStatus::BLOCK_STORED) { - debug!( - "in process_lonely_block, {} is BLOCK_STORED in block_status_map, it is still verifying by ConsumeUnverified thread", - block_hash, - ); - return; - } - } - } - - let parent_status = self - .shared - .get_block_status(self.shared.store(), &parent_hash); - if parent_status.contains(BlockStatus::BLOCK_STORED) { - debug!( - "parent {} has stored: {:?}, processing descendant directly {}-{}", - parent_hash, parent_status, block_number, block_hash, - ); - - if let Err(err) = self.descendant_processor.process_descendant(lonely_block) { - error!( - "process descendant {}-{}, failed {:?}", - block_number, block_hash, err - ); - } - } else if parent_status.eq(&BlockStatus::BLOCK_INVALID) { - // don't accept this block, because parent block is invalid - error!( - "parent: {} is INVALID, won't accept this block {}-{}", - parent_hash, block_number, block_hash, - ); - self.shared - .block_status_map() - .insert(lonely_block.block().hash(), BlockStatus::BLOCK_INVALID); - let err = Err(InvalidParentError { - parent_hash: parent_hash.clone(), - } - .into()); - lonely_block.execute_callback(err); - } else { - self.orphan_blocks_broker.insert(lonely_block); - } - self.search_orphan_pool(); - - if let Some(metrics) = ckb_metrics::handle() { - metrics - .ckb_chain_orphan_count - .set(self.orphan_blocks_broker.len() as i64) - }; - } -} diff --git a/chain/src/consume_unverified.rs b/chain/src/consume_unverified.rs index 2fb0be60880..6c09078f845 100644 --- a/chain/src/consume_unverified.rs +++ b/chain/src/consume_unverified.rs @@ -1,4 +1,4 @@ -use crate::LonelyBlockHash; +use crate::UnverifiedBlock; use crate::{utils::forkchanges::ForkChanges, GlobalIndex, TruncateRequest, VerifyResult}; use ckb_channel::{select, Receiver}; use ckb_error::{Error, InternalErrorKind}; @@ -23,19 +23,21 @@ use ckb_verification::cache::Completed; use ckb_verification::InvalidParentError; use ckb_verification_contextual::{ContextualBlockVerifier, VerifyContext}; use ckb_verification_traits::Switch; +use dashmap::DashSet; use std::cmp; use std::collections::HashSet; use std::sync::Arc; pub(crate) struct ConsumeUnverifiedBlockProcessor { pub(crate) shared: Shared, + pub(crate) is_pending_verify: Arc>, pub(crate) proposal_table: ProposalTable, } pub(crate) struct ConsumeUnverifiedBlocks { tx_pool_controller: TxPoolController, - unverified_block_rx: Receiver, + unverified_block_rx: Receiver, truncate_block_rx: Receiver, stop_rx: Receiver<()>, @@ -45,9 +47,10 @@ pub(crate) struct ConsumeUnverifiedBlocks { impl ConsumeUnverifiedBlocks { pub(crate) fn new( shared: Shared, - unverified_blocks_rx: Receiver, + unverified_blocks_rx: Receiver, truncate_block_rx: Receiver, proposal_table: ProposalTable, + is_pending_verify: Arc>, stop_rx: Receiver<()>, ) -> Self { ConsumeUnverifiedBlocks { @@ -57,6 +60,7 @@ impl ConsumeUnverifiedBlocks { stop_rx, processor: ConsumeUnverifiedBlockProcessor { shared, + is_pending_verify, proposal_table, }, } @@ -94,7 +98,7 @@ impl ConsumeUnverifiedBlocks { let _ = self.tx_pool_controller.continue_chunk_process(); }, Err(err) => { - error!("truncate_block_tx has been closed,err: {}", err); + info!("truncate_block_tx has been closed,err: {}", err); return; }, }, @@ -109,52 +113,31 @@ impl ConsumeUnverifiedBlocks { } impl ConsumeUnverifiedBlockProcessor { - fn load_unverified_block_and_parent_header( - &self, - block_hash: &Byte32, - ) -> (BlockView, HeaderView) { - let block_view = self - .shared - .store() - .get_block(block_hash) - .expect("block stored"); - let parent_header_view = self - .shared - .store() - .get_block_header(&block_view.data().header().raw().parent_hash()) - .expect("parent header stored"); - - (block_view, parent_header_view) - } - - pub(crate) fn consume_unverified_blocks(&mut self, lonely_block_hash: LonelyBlockHash) { - let LonelyBlockHash { - block_number_and_hash, + pub(crate) fn consume_unverified_blocks(&mut self, unverified_block: UnverifiedBlock) { + let UnverifiedBlock { + block, switch, verify_callback, - } = lonely_block_hash; - let (unverified_block, parent_header) = - self.load_unverified_block_and_parent_header(&block_number_and_hash.hash); + parent_header, + } = unverified_block; + let block_hash = block.hash(); // process this unverified block - let verify_result = self.verify_block(&unverified_block, &parent_header, switch); + let verify_result = self.verify_block(&block, &parent_header, switch); match &verify_result { Ok(_) => { let log_now = std::time::Instant::now(); - self.shared.remove_block_status(&block_number_and_hash.hash); + self.shared.remove_block_status(&block_hash); let log_elapsed_remove_block_status = log_now.elapsed(); - self.shared.remove_header_view(&block_number_and_hash.hash); + self.shared.remove_header_view(&block_hash); debug!( "block {} remove_block_status cost: {:?}, and header_view cost: {:?}", - block_number_and_hash.hash, + block_hash, log_elapsed_remove_block_status, log_now.elapsed() ); } Err(err) => { - error!( - "verify block {} failed: {}", - block_number_and_hash.hash, err - ); + error!("verify block {} failed: {}", block_hash, err); let tip = self .shared @@ -174,17 +157,19 @@ impl ConsumeUnverifiedBlockProcessor { )); self.shared - .insert_block_status(block_number_and_hash.hash(), BlockStatus::BLOCK_INVALID); + .insert_block_status(block_hash.clone(), BlockStatus::BLOCK_INVALID); error!( "set_unverified tip to {}-{}, because verify {} failed: {}", tip.number(), tip.hash(), - block_number_and_hash.hash, + block_hash, err ); } } + self.is_pending_verify.remove(&block_hash); + if let Some(callback) = verify_callback { callback(verify_result); } @@ -280,6 +265,14 @@ impl ConsumeUnverifiedBlockProcessor { let txn_snapshot = db_txn.get_snapshot(); let _snapshot_tip_hash = db_txn.get_update_for_tip_hash(&txn_snapshot); + db_txn.insert_block_epoch_index( + &block.header().hash(), + &epoch.last_block_hash_in_previous_epoch(), + )?; + if new_epoch { + db_txn.insert_epoch_ext(&epoch.last_block_hash_in_previous_epoch(), &epoch)?; + } + if new_best_block { info!( "[verify block] new best block found: {} => {:#x}, difficulty diff = {:#x}, unverified_tip: {}", diff --git a/chain/src/init.rs b/chain/src/init.rs index 2759a75cb46..2e2c1d5df66 100644 --- a/chain/src/init.rs +++ b/chain/src/init.rs @@ -1,41 +1,53 @@ #![allow(missing_docs)] -//! Bootstrap ChainService, ConsumeOrphan and ConsumeUnverified threads. +//! Bootstrap InitLoadUnverified, PreloadUnverifiedBlock, ChainService and ConsumeUnverified threads. use crate::chain_service::ChainService; use crate::consume_unverified::ConsumeUnverifiedBlocks; use crate::init_load_unverified::InitLoadUnverified; +use crate::orphan_broker::OrphanBroker; +use crate::preload_unverified_blocks_channel::PreloadUnverifiedBlocksChannel; use crate::utils::orphan_block_pool::OrphanBlockPool; -use crate::{ChainController, LonelyBlock, LonelyBlockHash}; +use crate::{chain_controller::ChainController, LonelyBlockHash, UnverifiedBlock}; use ckb_channel::{self as channel, SendError}; use ckb_constant::sync::BLOCK_DOWNLOAD_WINDOW; use ckb_logger::warn; use ckb_shared::ChainServicesBuilder; use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread}; +use ckb_types::packed::Byte32; +use dashmap::DashSet; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::thread; -const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 2) as usize; +const ORPHAN_BLOCK_SIZE: usize = BLOCK_DOWNLOAD_WINDOW as usize; pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController { let orphan_blocks_broker = Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE)); let (truncate_block_tx, truncate_block_rx) = channel::bounded(1); + let (preload_unverified_stop_tx, preload_unverified_stop_rx) = ckb_channel::bounded::<()>(1); + + let (preload_unverified_tx, preload_unverified_rx) = + channel::bounded::(BLOCK_DOWNLOAD_WINDOW as usize * 10); + let (unverified_queue_stop_tx, unverified_queue_stop_rx) = ckb_channel::bounded::<()>(1); - let (unverified_tx, unverified_rx) = - channel::bounded::(BLOCK_DOWNLOAD_WINDOW as usize * 3); + let (unverified_block_tx, unverified_block_rx) = channel::bounded::(128usize); + + let is_pending_verify: Arc> = Arc::new(DashSet::new()); let consumer_unverified_thread = thread::Builder::new() .name("consume_unverified_blocks".into()) .spawn({ let shared = builder.shared.clone(); + let is_pending_verify = Arc::clone(&is_pending_verify); move || { let consume_unverified = ConsumeUnverifiedBlocks::new( shared, - unverified_rx, + unverified_block_rx, truncate_block_rx, builder.proposal_table, + is_pending_verify, unverified_queue_stop_rx, ); @@ -44,38 +56,30 @@ pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController { }) .expect("start unverified_queue consumer thread should ok"); - let (lonely_block_tx, lonely_block_rx) = - channel::bounded::(BLOCK_DOWNLOAD_WINDOW as usize); - - let (search_orphan_pool_stop_tx, search_orphan_pool_stop_rx) = ckb_channel::bounded::<()>(1); - - let search_orphan_pool_thread = thread::Builder::new() - .name("consume_orphan_blocks".into()) + let preload_unverified_block_thread = thread::Builder::new() + .name("preload_unverified_block".into()) .spawn({ - let orphan_blocks_broker = Arc::clone(&orphan_blocks_broker); let shared = builder.shared.clone(); - use crate::consume_orphan::ConsumeOrphan; move || { - let consume_orphan = ConsumeOrphan::new( + let preload_unverified_block = PreloadUnverifiedBlocksChannel::new( shared, - orphan_blocks_broker, - unverified_tx, - lonely_block_rx, - search_orphan_pool_stop_rx, + preload_unverified_rx, + unverified_block_tx, + preload_unverified_stop_rx, ); - consume_orphan.start(); + preload_unverified_block.start() } }) - .expect("start search_orphan_pool thread should ok"); + .expect("start preload_unverified_block should ok"); - let (process_block_tx, process_block_rx) = channel::bounded(BLOCK_DOWNLOAD_WINDOW as usize); + let (process_block_tx, process_block_rx) = channel::bounded(0); let is_verifying_unverified_blocks_on_startup = Arc::new(AtomicBool::new(true)); let chain_controller = ChainController::new( process_block_tx, truncate_block_tx, - orphan_blocks_broker, + Arc::clone(&orphan_blocks_broker), Arc::clone(&is_verifying_unverified_blocks_on_startup), ); @@ -90,16 +94,23 @@ pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController { let init_load_unverified: InitLoadUnverified = InitLoadUnverified::new( shared, chain_controller, - signal_receiver, is_verifying_unverified_blocks_on_startup, + signal_receiver, ); init_load_unverified.start(); } }) .expect("start unverified_queue consumer thread should ok"); + let consume_orphan = OrphanBroker::new( + builder.shared.clone(), + orphan_blocks_broker, + preload_unverified_tx, + is_pending_verify, + ); + let chain_service: ChainService = - ChainService::new(builder.shared, process_block_rx, lonely_block_tx); + ChainService::new(builder.shared, process_block_rx, consume_orphan); let chain_service_thread = thread::Builder::new() .name("ChainService".into()) .spawn({ @@ -108,10 +119,10 @@ pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController { let _ = init_load_unverified_thread.join(); - if let Err(SendError(_)) = search_orphan_pool_stop_tx.send(()) { - warn!("trying to notify search_orphan_pool thread to stop, but search_orphan_pool_stop_tx already closed") + if preload_unverified_stop_tx.send(()).is_err(){ + warn!("trying to notify preload unverified thread to stop, but preload_unverified_stop_tx already closed"); } - let _ = search_orphan_pool_thread.join(); + let _ = preload_unverified_block_thread.join(); if let Err(SendError(_)) = unverified_queue_stop_tx.send(()) { warn!("trying to notify consume unverified thread to stop, but unverified_queue_stop_tx already closed"); diff --git a/chain/src/init_load_unverified.rs b/chain/src/init_load_unverified.rs index 4d02b7dfc22..9e8274d6aed 100644 --- a/chain/src/init_load_unverified.rs +++ b/chain/src/init_load_unverified.rs @@ -1,19 +1,23 @@ +use crate::utils::orphan_block_pool::EXPIRED_EPOCH; use crate::{ChainController, LonelyBlock}; use ckb_channel::{select, Receiver}; +use ckb_constant::sync::BLOCK_DOWNLOAD_WINDOW; use ckb_db::{Direction, IteratorMode}; use ckb_db_schema::COLUMN_NUMBER_HASH; use ckb_logger::info; +use ckb_shared::block_status::BlockStatus; use ckb_shared::Shared; use ckb_store::ChainStore; use ckb_types::core::{BlockNumber, BlockView}; use ckb_types::packed; use ckb_types::prelude::{Entity, FromSliceShouldBeOk, Pack, Reader}; +use std::cmp; use std::sync::atomic::AtomicBool; use std::sync::Arc; - pub(crate) struct InitLoadUnverified { shared: Shared, chain_controller: ChainController, + is_verifying_unverified_blocks_on_startup: Arc, stop_rx: Receiver<()>, @@ -23,8 +27,8 @@ impl InitLoadUnverified { pub(crate) fn new( shared: Shared, chain_controller: ChainController, - stop_rx: Receiver<()>, is_verifying_unverified_blocks_on_startup: Arc, + stop_rx: Receiver<()>, ) -> Self { InitLoadUnverified { shared, @@ -33,30 +37,13 @@ impl InitLoadUnverified { stop_rx, } } - fn print_unverified_blocks_count(&self) { - let tip_number: BlockNumber = self.shared.snapshot().tip_number(); - let mut check_unverified_number = tip_number + 1; - let mut unverified_block_count = 0; - loop { - // start checking `check_unverified_number` have COLUMN_NUMBER_HASH value in db? - let unverified_hashes: Vec = - self.find_unverified_block_hashes(check_unverified_number); - unverified_block_count += unverified_hashes.len(); - if unverified_hashes.is_empty() { - info!( - "found {} unverified blocks, verifying...", - unverified_block_count - ); - break; - } - check_unverified_number += 1; - } - } fn find_unverified_block_hashes(&self, check_unverified_number: u64) -> Vec { let pack_number: packed::Uint64 = check_unverified_number.pack(); let prefix = pack_number.as_slice(); + // If a block has `COLUMN_NUMBER_HASH` but not `BlockExt`, + // it indicates an unverified block inserted during the last shutdown. let unverified_hashes: Vec = self .shared .store() @@ -71,6 +58,7 @@ impl InitLoadUnverified { let unverified_block_hash = reader.block_hash().to_entity(); unverified_block_hash }) + .filter(|hash| self.shared.store().get_block_ext(hash).is_none()) .collect::>(); unverified_hashes } @@ -81,23 +69,30 @@ impl InitLoadUnverified { self.shared.snapshot().tip_number(), self.shared.snapshot().tip_hash() ); - self.print_unverified_blocks_count(); self.find_and_verify_unverified_blocks(); self.is_verifying_unverified_blocks_on_startup .store(false, std::sync::atomic::Ordering::Release); + info!("find unverified blocks finished"); } - fn find_and_verify_unverified_blocks(&self) { + fn find_unverified_blocks(&self, f: F) + where + F: Fn(&packed::Byte32), + { let tip_number: BlockNumber = self.shared.snapshot().tip_number(); - let mut check_unverified_number = tip_number + 1; + let start_check_number = cmp::max( + 1, + tip_number.saturating_sub(EXPIRED_EPOCH * self.shared.consensus().max_epoch_length()), + ); + let end_check_number = tip_number + BLOCK_DOWNLOAD_WINDOW * 10; - loop { + (start_check_number..=end_check_number).for_each(|check_unverified_number| { select! { recv(self.stop_rx) -> _msg => { info!("init_unverified_blocks thread received exit signal, exit now"); - break; + return; }, default => {} } @@ -106,34 +101,28 @@ impl InitLoadUnverified { let unverified_hashes: Vec = self.find_unverified_block_hashes(check_unverified_number); - if unverified_hashes.is_empty() { - if check_unverified_number == tip_number + 1 { - info!("no unverified blocks found."); - } else { - info!( - "found and verify unverified blocks finish, current tip: {}-{}", - self.shared.snapshot().tip_number(), - self.shared.snapshot().tip_header() - ); - } - return; - } - for unverified_hash in unverified_hashes { - let unverified_block: BlockView = self - .shared - .store() - .get_block(&unverified_hash) - .expect("unverified block must be in db"); - self.chain_controller - .asynchronous_process_lonely_block(LonelyBlock { - block: Arc::new(unverified_block), - switch: None, - verify_callback: None, - }); + f(&unverified_hash); } + }); + } - check_unverified_number += 1; - } + fn find_and_verify_unverified_blocks(&self) { + self.find_unverified_blocks(|unverified_hash| { + let unverified_block: BlockView = self + .shared + .store() + .get_block(unverified_hash) + .expect("unverified block must be in db"); + self.shared + .block_status_map() + .insert(unverified_hash.to_owned(), BlockStatus::BLOCK_RECEIVED); + self.chain_controller + .asynchronous_process_lonely_block(LonelyBlock { + block: Arc::new(unverified_block), + switch: None, + verify_callback: None, + }); + }); } } diff --git a/chain/src/lib.rs b/chain/src/lib.rs index d656a1ba1d2..03ca70129d4 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -10,23 +10,25 @@ use ckb_error::Error; use ckb_shared::types::BlockNumberAndHash; use ckb_types::core::service::Request; -use ckb_types::core::{BlockNumber, BlockView}; +use ckb_types::core::{BlockNumber, BlockView, EpochNumber, HeaderView}; use ckb_types::packed::Byte32; use ckb_verification_traits::Switch; use std::sync::Arc; mod chain_controller; mod chain_service; -mod consume_orphan; -mod consume_unverified; +pub mod consume_unverified; mod init; mod init_load_unverified; +mod orphan_broker; +mod preload_unverified_blocks_channel; #[cfg(test)] mod tests; mod utils; pub use chain_controller::ChainController; -pub use consume_orphan::store_unverified_block; +use ckb_types::prelude::{Pack, Unpack}; +use ckb_types::H256; pub use init::start_chain_services; type ProcessBlockRequest = Request; @@ -68,6 +70,10 @@ pub struct LonelyBlockHash { /// block pub block_number_and_hash: BlockNumberAndHash, + pub parent_hash: Byte32, + + pub epoch_number: EpochNumber, + /// The Switch to control the verification process pub switch: Option, @@ -77,17 +83,60 @@ pub struct LonelyBlockHash { impl From for LonelyBlockHash { fn from(val: LonelyBlock) -> Self { + let LonelyBlock { + block, + switch, + verify_callback, + } = val; + let block_hash_h256: H256 = block.hash().unpack(); + let block_number: BlockNumber = block.number(); + let parent_hash_h256: H256 = block.parent_hash().unpack(); + let block_hash = block_hash_h256.pack(); + let parent_hash = parent_hash_h256.pack(); + + let epoch_number: EpochNumber = block.epoch().number(); + LonelyBlockHash { block_number_and_hash: BlockNumberAndHash { - number: val.block.number(), - hash: val.block.hash(), + number: block_number, + hash: block_hash, }, - switch: val.switch, - verify_callback: val.verify_callback, + parent_hash, + epoch_number, + switch, + verify_callback, } } } +impl LonelyBlockHash { + pub fn execute_callback(self, verify_result: VerifyResult) { + if let Some(verify_callback) = self.verify_callback { + verify_callback(verify_result); + } + } + + pub fn number_hash(&self) -> BlockNumberAndHash { + self.block_number_and_hash.clone() + } + + pub fn epoch_number(&self) -> EpochNumber { + self.epoch_number + } + + pub fn hash(&self) -> Byte32 { + self.block_number_and_hash.hash() + } + + pub fn parent_hash(&self) -> Byte32 { + self.parent_hash.clone() + } + + pub fn number(&self) -> BlockNumber { + self.block_number_and_hash.number() + } +} + impl LonelyBlock { pub(crate) fn block(&self) -> &Arc { &self.block @@ -124,3 +173,15 @@ impl GlobalIndex { self.hash = hash; } } + +/// UnverifiedBlock will be consumed by ConsumeUnverified thread +struct UnverifiedBlock { + // block + block: Arc, + // the switch to control the verification process + switch: Option, + // verify callback + verify_callback: Option, + // parent header + parent_header: HeaderView, +} diff --git a/chain/src/orphan_broker.rs b/chain/src/orphan_broker.rs new file mode 100644 index 00000000000..017480ab06a --- /dev/null +++ b/chain/src/orphan_broker.rs @@ -0,0 +1,229 @@ +#![allow(missing_docs)] + +use crate::utils::orphan_block_pool::{OrphanBlockPool, ParentHash}; +use crate::{LonelyBlockHash, VerifyResult}; +use ckb_channel::Sender; +use ckb_error::InternalErrorKind; +use ckb_logger::internal::trace; +use ckb_logger::{debug, error, info}; +use ckb_shared::block_status::BlockStatus; +use ckb_shared::Shared; +use ckb_store::ChainStore; +use ckb_types::{core::BlockView, packed::Byte32, U256}; +use dashmap::DashSet; +use std::sync::Arc; + +pub(crate) struct OrphanBroker { + shared: Shared, + + orphan_blocks_broker: Arc, + is_pending_verify: Arc>, + preload_unverified_tx: Sender, +} + +impl OrphanBroker { + pub(crate) fn new( + shared: Shared, + orphan_block_pool: Arc, + preload_unverified_tx: Sender, + is_pending_verify: Arc>, + ) -> OrphanBroker { + OrphanBroker { + shared: shared.clone(), + orphan_blocks_broker: orphan_block_pool, + is_pending_verify, + preload_unverified_tx, + } + } + + fn search_orphan_leader(&self, leader_hash: ParentHash) { + let leader_status = self + .shared + .get_block_status(self.shared.store(), &leader_hash); + + if leader_status.eq(&BlockStatus::BLOCK_INVALID) { + let descendants: Vec = self + .orphan_blocks_broker + .remove_blocks_by_parent(&leader_hash); + for descendant in descendants { + self.process_invalid_block(descendant); + } + return; + } + + let leader_is_pending_verify = self.is_pending_verify.contains(&leader_hash); + if !leader_is_pending_verify && !leader_status.contains(BlockStatus::BLOCK_STORED) { + trace!( + "orphan leader: {} not stored {:?} and not in is_pending_verify: {}", + leader_hash, + leader_status, + leader_is_pending_verify + ); + return; + } + + let descendants: Vec = self + .orphan_blocks_broker + .remove_blocks_by_parent(&leader_hash); + if descendants.is_empty() { + error!( + "leader {} does not have any descendants, this shouldn't happen", + leader_hash + ); + return; + } + self.accept_descendants(descendants); + } + + fn search_orphan_leaders(&self) { + for leader_hash in self.orphan_blocks_broker.clone_leaders() { + self.search_orphan_leader(leader_hash); + } + } + + fn delete_invalid_block(&self, lonely_block: &LonelyBlockHash) { + let block_hash = lonely_block.block_number_and_hash.hash(); + let block_number = lonely_block.block_number_and_hash.number(); + let parent_hash = lonely_block.parent_hash(); + + info!( + "parent: {} is INVALID, deleting this block {}-{}", + parent_hash, block_number, block_hash, + ); + + let db_txn = self.shared.store().begin_transaction(); + let invalid_block_op: Option = db_txn.get_block(&block_hash); + match invalid_block_op { + Some(invalid_block) => { + if let Err(err) = db_txn.delete_block(&invalid_block) { + error!( + "delete invalid block {}-{} failed {:?}", + block_number, block_hash, err + ); + return; + } + if let Err(err) = db_txn.commit() { + error!( + "commit delete invalid block {}-{} failed {:?}", + block_number, block_hash, err + ); + return; + } + + info!( + "parent: {} is INVALID, deleted this block {}-{}", + parent_hash, block_number, block_hash, + ); + } + None => { + error!( + "want to delete block {}-{}, but it not found in db", + block_number, block_hash + ); + } + } + } + + fn process_invalid_block(&self, lonely_block: LonelyBlockHash) { + let block_hash = lonely_block.block_number_and_hash.hash(); + let block_number = lonely_block.block_number_and_hash.number(); + let parent_hash = lonely_block.parent_hash(); + + self.delete_invalid_block(&lonely_block); + + let err: VerifyResult = Err(InternalErrorKind::Other + .other(format!( + "parent {} is invalid, so block {}-{} is invalid too", + parent_hash, block_number, block_hash + )) + .into()); + lonely_block.execute_callback(err); + } + + pub(crate) fn process_lonely_block(&self, lonely_block: LonelyBlockHash) { + let block_hash = lonely_block.block_number_and_hash.hash(); + let block_number = lonely_block.block_number_and_hash.number(); + let parent_hash = lonely_block.parent_hash(); + let parent_is_pending_verify = self.is_pending_verify.contains(&parent_hash); + let parent_status = self + .shared + .get_block_status(self.shared.store(), &parent_hash); + if parent_is_pending_verify || parent_status.contains(BlockStatus::BLOCK_STORED) { + debug!( + "parent {} has stored: {:?} or is_pending_verify: {}, processing descendant directly {}-{}", + parent_hash, + parent_status, + parent_is_pending_verify, + block_number, + block_hash, + ); + self.process_descendant(lonely_block); + } else if parent_status.eq(&BlockStatus::BLOCK_INVALID) { + self.process_invalid_block(lonely_block); + } else { + self.orphan_blocks_broker.insert(lonely_block); + } + + self.search_orphan_leaders(); + + if let Some(metrics) = ckb_metrics::handle() { + metrics + .ckb_chain_orphan_count + .set(self.orphan_blocks_broker.len() as i64) + } + } + + fn send_unverified_block(&self, lonely_block: LonelyBlockHash) { + let block_number = lonely_block.block_number_and_hash.number(); + let block_hash = lonely_block.block_number_and_hash.hash(); + + if let Some(metrics) = ckb_metrics::handle() { + metrics + .ckb_chain_preload_unverified_block_ch_len + .set(self.preload_unverified_tx.len() as i64) + } + + match self.preload_unverified_tx.send(lonely_block) { + Ok(_) => { + debug!( + "process desendant block success {}-{}", + block_number, block_hash + ); + } + Err(_) => { + info!("send unverified_block_tx failed, the receiver has been closed"); + return; + } + }; + if block_number > self.shared.snapshot().tip_number() { + self.shared.set_unverified_tip(ckb_shared::HeaderIndex::new( + block_number, + block_hash.clone(), + U256::from(0u64), + )); + + if let Some(handle) = ckb_metrics::handle() { + handle.ckb_chain_unverified_tip.set(block_number as i64); + } + debug!( + "set unverified_tip to {}-{}, while unverified_tip - verified_tip = {}", + block_number.clone(), + block_hash.clone(), + block_number.saturating_sub(self.shared.snapshot().tip_number()) + ) + } + } + + pub(crate) fn process_descendant(&self, lonely_block: LonelyBlockHash) { + self.is_pending_verify + .insert(lonely_block.block_number_and_hash.hash()); + + self.send_unverified_block(lonely_block) + } + + fn accept_descendants(&self, descendants: Vec) { + for descendant_block in descendants { + self.process_descendant(descendant_block); + } + } +} diff --git a/chain/src/preload_unverified_blocks_channel.rs b/chain/src/preload_unverified_blocks_channel.rs new file mode 100644 index 00000000000..23f593bd79a --- /dev/null +++ b/chain/src/preload_unverified_blocks_channel.rs @@ -0,0 +1,105 @@ +use crate::{LonelyBlockHash, UnverifiedBlock}; +use ckb_channel::{Receiver, Sender}; +use ckb_logger::{debug, error, info}; +use ckb_shared::Shared; +use ckb_store::ChainStore; +use crossbeam::select; +use std::sync::Arc; + +pub(crate) struct PreloadUnverifiedBlocksChannel { + shared: Shared, + preload_unverified_rx: Receiver, + + unverified_block_tx: Sender, + + stop_rx: Receiver<()>, +} + +impl PreloadUnverifiedBlocksChannel { + pub(crate) fn new( + shared: Shared, + preload_unverified_rx: Receiver, + unverified_block_tx: Sender, + stop_rx: Receiver<()>, + ) -> Self { + PreloadUnverifiedBlocksChannel { + shared, + preload_unverified_rx, + unverified_block_tx, + stop_rx, + } + } + + pub(crate) fn start(&self) { + loop { + select! { + recv(self.preload_unverified_rx) -> msg => match msg { + Ok(preload_unverified_block_task) =>{ + self.preload_unverified_channel(preload_unverified_block_task); + }, + Err(err) =>{ + error!("recv preload_task_rx failed, err: {:?}", err); + break; + } + }, + recv(self.stop_rx) -> _ => { + info!("preload_unverified_blocks thread received exit signal, exit now"); + break; + } + } + } + } + + fn preload_unverified_channel(&self, task: LonelyBlockHash) { + let block_number = task.block_number_and_hash.number(); + let block_hash = task.block_number_and_hash.hash(); + let unverified_block: UnverifiedBlock = self.load_full_unverified_block_by_hash(task); + + if let Some(metrics) = ckb_metrics::handle() { + metrics + .ckb_chain_unverified_block_ch_len + .set(self.unverified_block_tx.len() as i64) + }; + + if self.unverified_block_tx.send(unverified_block).is_err() { + info!( + "send unverified_block to unverified_block_tx failed, the receiver has been closed" + ); + } else { + debug!("preload unverified block {}-{}", block_number, block_hash,); + } + } + + fn load_full_unverified_block_by_hash(&self, task: LonelyBlockHash) -> UnverifiedBlock { + let _trace_timecost = ckb_metrics::handle() + .map(|metrics| metrics.ckb_chain_load_full_unverified_block.start_timer()); + + let LonelyBlockHash { + block_number_and_hash, + parent_hash, + epoch_number: _epoch_number, + switch, + verify_callback, + } = task; + + let block_view = self + .shared + .store() + .get_block(&block_number_and_hash.hash()) + .expect("block stored"); + let block = Arc::new(block_view); + let parent_header = { + self.shared + .store() + .get_block_header(&parent_hash) + .expect("parent header stored") + }; + + UnverifiedBlock { + block, + switch, + verify_callback, + parent_header, + } + } +} diff --git a/chain/src/tests/find_fork.rs b/chain/src/tests/find_fork.rs index b07f2a3725e..e5b8ad39ce4 100644 --- a/chain/src/tests/find_fork.rs +++ b/chain/src/tests/find_fork.rs @@ -1,10 +1,8 @@ -use crate::consume_orphan::ConsumeDescendantProcessor; use crate::consume_unverified::ConsumeUnverifiedBlockProcessor; use crate::utils::forkchanges::ForkChanges; -use crate::{start_chain_services, LonelyBlock, LonelyBlockHash}; +use crate::{start_chain_services, UnverifiedBlock}; use ckb_chain_spec::consensus::{Consensus, ProposalWindow}; use ckb_proposal_table::ProposalTable; -use ckb_shared::types::BlockNumberAndHash; use ckb_shared::SharedBuilder; use ckb_store::ChainStore; use ckb_systemtime::unix_time_as_millis; @@ -16,33 +14,29 @@ use ckb_types::{ U256, }; use ckb_verification_traits::Switch; -use crossbeam::channel; +use dashmap::DashSet; use std::collections::HashSet; use std::sync::Arc; fn process_block( - consume_descendant_processor: &ConsumeDescendantProcessor, consume_unverified_block_processor: &mut ConsumeUnverifiedBlockProcessor, blk: &BlockView, switch: Switch, ) { - let lonely_block_hash = LonelyBlockHash { - switch: Some(switch), - block_number_and_hash: BlockNumberAndHash::new(blk.number(), blk.hash()), - verify_callback: None, - }; + let store = consume_unverified_block_processor.shared.store(); + let db_txn = store.begin_transaction(); + db_txn.insert_block(blk).unwrap(); + db_txn.commit().unwrap(); - let lonely_block = LonelyBlock { - switch: Some(switch), + let parent_header = store.get_block_header(&blk.parent_hash()).unwrap(); + let unverified_block = UnverifiedBlock { block: Arc::new(blk.to_owned()), + switch: Some(switch), verify_callback: None, + parent_header, }; - consume_descendant_processor - .process_descendant(lonely_block) - .unwrap(); - - consume_unverified_block_processor.consume_unverified_blocks(lonely_block_hash); + consume_unverified_block_processor.consume_unverified_blocks(unverified_block); } // 0--1--2--3--4 @@ -73,20 +67,18 @@ fn test_find_fork_case1() { fork2.gen_empty_block_with_diff(90u64, &mock_store); } - let (unverified_blocks_tx, _unverified_blocks_rx) = channel::unbounded::(); - let consume_descendant_processor = ConsumeDescendantProcessor { - shared: shared.clone(), - unverified_blocks_tx, - }; + let is_pending_verify = Arc::new(DashSet::new()); + let mut consume_unverified_block_processor = ConsumeUnverifiedBlockProcessor { shared: shared.clone(), + is_pending_verify, proposal_table, }; // fork1 total_difficulty 400 for blk in fork1.blocks() { + println!("proceb1, fork1 block: {}-{}", blk.number(), blk.hash()); process_block( - &consume_descendant_processor, &mut consume_unverified_block_processor, blk, Switch::DISABLE_ALL, @@ -95,8 +87,8 @@ fn test_find_fork_case1() { // fork2 total_difficulty 270 for blk in fork2.blocks() { + println!("procb2, fork1 block: {}-{}", blk.number(), blk.hash()); process_block( - &consume_descendant_processor, &mut consume_unverified_block_processor, blk, Switch::DISABLE_ALL, @@ -161,20 +153,15 @@ fn test_find_fork_case2() { fork2.gen_empty_block_with_diff(90u64, &mock_store); } let proposal_table = ProposalTable::new(consensus.tx_proposal_window()); - let (unverified_blocks_tx, _unverified_blocks_rx) = channel::unbounded::(); - let consume_descendant_processor = ConsumeDescendantProcessor { - shared: shared.clone(), - unverified_blocks_tx, - }; let mut consume_unverified_block_processor = ConsumeUnverifiedBlockProcessor { shared: shared.clone(), + is_pending_verify: Arc::new(DashSet::new()), proposal_table, }; // fork1 total_difficulty 400 for blk in fork1.blocks() { process_block( - &consume_descendant_processor, &mut consume_unverified_block_processor, blk, Switch::DISABLE_ALL, @@ -184,7 +171,6 @@ fn test_find_fork_case2() { // fork2 total_difficulty 280 for blk in fork2.blocks() { process_block( - &consume_descendant_processor, &mut consume_unverified_block_processor, blk, Switch::DISABLE_ALL, @@ -250,19 +236,14 @@ fn test_find_fork_case3() { fork2.gen_empty_block_with_diff(40u64, &mock_store) } let proposal_table = ProposalTable::new(consensus.tx_proposal_window()); - let (unverified_blocks_tx, _unverified_blocks_rx) = channel::unbounded::(); - let consume_descendant_processor = ConsumeDescendantProcessor { - shared: shared.clone(), - unverified_blocks_tx, - }; let mut consume_unverified_block_processor = ConsumeUnverifiedBlockProcessor { shared: shared.clone(), + is_pending_verify: Arc::new(DashSet::new()), proposal_table, }; // fork1 total_difficulty 240 for blk in fork1.blocks() { process_block( - &consume_descendant_processor, &mut consume_unverified_block_processor, blk, Switch::DISABLE_ALL, @@ -272,7 +253,6 @@ fn test_find_fork_case3() { // fork2 total_difficulty 200 for blk in fork2.blocks() { process_block( - &consume_descendant_processor, &mut consume_unverified_block_processor, blk, Switch::DISABLE_ALL, @@ -337,20 +317,15 @@ fn test_find_fork_case4() { fork2.gen_empty_block_with_diff(80u64, &mock_store); } let proposal_table = ProposalTable::new(consensus.tx_proposal_window()); - let (unverified_blocks_tx, _unverified_blocks_rx) = channel::unbounded::(); - let consume_descendant_processor = ConsumeDescendantProcessor { - shared: shared.clone(), - unverified_blocks_tx, - }; let mut consume_unverified_block_processor = ConsumeUnverifiedBlockProcessor { shared: shared.clone(), + is_pending_verify: Arc::new(DashSet::new()), proposal_table, }; // fork1 total_difficulty 200 for blk in fork1.blocks() { process_block( - &consume_descendant_processor, &mut consume_unverified_block_processor, blk, Switch::DISABLE_ALL, @@ -360,7 +335,6 @@ fn test_find_fork_case4() { // fork2 total_difficulty 160 for blk in fork2.blocks() { process_block( - &consume_descendant_processor, &mut consume_unverified_block_processor, blk, Switch::DISABLE_ALL, @@ -425,19 +399,14 @@ fn repeatedly_switch_fork() { fork2.gen_empty_block_with_nonce(2u128, &mock_store); } let proposal_table = ProposalTable::new(consensus.tx_proposal_window()); - let (unverified_blocks_tx, _unverified_blocks_rx) = channel::unbounded::(); - let consume_descendant_processor = ConsumeDescendantProcessor { - shared: shared.clone(), - unverified_blocks_tx, - }; let mut consume_unverified_block_processor = ConsumeUnverifiedBlockProcessor { shared: shared.clone(), + is_pending_verify: Arc::new(DashSet::new()), proposal_table, }; for blk in fork1.blocks() { process_block( - &consume_descendant_processor, &mut consume_unverified_block_processor, blk, Switch::DISABLE_ALL, @@ -446,7 +415,6 @@ fn repeatedly_switch_fork() { for blk in fork2.blocks() { process_block( - &consume_descendant_processor, &mut consume_unverified_block_processor, blk, Switch::DISABLE_ALL, diff --git a/chain/src/tests/orphan_block_pool.rs b/chain/src/tests/orphan_block_pool.rs index 3c14890fbac..f3977be13b1 100644 --- a/chain/src/tests/orphan_block_pool.rs +++ b/chain/src/tests/orphan_block_pool.rs @@ -1,8 +1,9 @@ #![allow(dead_code)] -use crate::LonelyBlock; +use crate::{LonelyBlock, LonelyBlockHash}; use ckb_chain_spec::consensus::ConsensusBuilder; use ckb_systemtime::unix_time_as_millis; -use ckb_types::core::{BlockBuilder, BlockView, EpochNumberWithFraction, HeaderView}; +use ckb_types::core::{BlockBuilder, EpochNumberWithFraction, HeaderView}; +use ckb_types::packed::Byte32; use ckb_types::prelude::*; use std::collections::HashSet; use std::sync::Arc; @@ -38,8 +39,8 @@ fn assert_leaders_have_children(pool: &OrphanBlockPool) { } } -fn assert_blocks_are_sorted(blocks: &[LonelyBlock]) { - let mut parent_hash = blocks[0].block.header().parent_hash(); +fn assert_blocks_are_sorted(blocks: &[LonelyBlockHash]) { + let mut parent_hash = blocks[0].parent_hash(); let mut windows = blocks.windows(2); // Orphans are sorted in a breadth-first search manner. We iterate through them and // check that this is the case. @@ -48,19 +49,16 @@ fn assert_blocks_are_sorted(blocks: &[LonelyBlock]) { while let Some([parent_or_sibling, child_or_sibling]) = windows.next() { // `parent_or_sibling` is a child of the block with current `parent_hash`. // Make `parent_or_sibling`'s parent the current `parent_hash`. - if parent_or_sibling.block.header().parent_hash() != parent_hash { - parent_hash = parent_or_sibling.block.header().parent_hash(); + if parent_or_sibling.parent_hash() != parent_hash { + parent_hash = parent_or_sibling.parent_hash(); } // If `child_or_sibling`'s parent is not the current `parent_hash`, i.e. it is not a sibling of // `parent_or_sibling`, then it must be a child of `parent_or_sibling`. - if child_or_sibling.block.header().parent_hash() != parent_hash { - assert_eq!( - child_or_sibling.block.header().parent_hash(), - parent_or_sibling.block.header().hash() - ); + if child_or_sibling.parent_hash() != parent_hash { + assert_eq!(child_or_sibling.parent_hash(), parent_or_sibling.hash()); // Move `parent_hash` forward. - parent_hash = child_or_sibling.block.header().parent_hash(); + parent_hash = child_or_sibling.parent_hash(); } } } @@ -83,19 +81,16 @@ fn test_remove_blocks_by_parent() { blocks.push(new_block_clone); parent = new_block.block().header(); - pool.insert(new_block); + pool.insert(new_block.into()); } let orphan = pool.remove_blocks_by_parent(&consensus.genesis_block().hash()); - assert_eq!( - orphan[0].block.header().parent_hash(), - consensus.genesis_block().hash() - ); + assert_eq!(orphan[0].parent_hash(), consensus.genesis_block().hash()); assert_blocks_are_sorted(orphan.as_slice()); - let orphan_set: HashSet<_> = orphan.into_iter().map(|b| b.block).collect(); - let blocks_set: HashSet<_> = blocks.into_iter().map(|b| b.to_owned()).collect(); + let orphan_set: HashSet<_> = orphan.into_iter().map(|b| b.hash()).collect(); + let blocks_set: HashSet<_> = blocks.into_iter().map(|b| b.hash()).collect(); assert_eq!(orphan_set, blocks_set) } @@ -113,7 +108,7 @@ fn test_remove_blocks_by_parent_and_get_block_should_not_deadlock() { switch: None, verify_callback: None, }; - pool.insert(new_block_clone); + pool.insert(new_block_clone.into()); header = new_block.header(); hashes.push(header.hash()); } @@ -149,27 +144,33 @@ fn test_leaders() { blocks.push(lonely_block); parent = new_block.block().header(); if i % 5 != 0 { - pool.insert(new_block); + pool.insert(new_block.into()); } } assert_leaders_have_children(&pool); assert_eq!(pool.len(), 15); assert_eq!(pool.leaders_len(), 4); - pool.insert(LonelyBlock { - block: Arc::clone(blocks[5].block()), - switch: None, - verify_callback: None, - }); + pool.insert( + LonelyBlock { + block: Arc::clone(blocks[5].block()), + switch: None, + verify_callback: None, + } + .into(), + ); assert_leaders_have_children(&pool); assert_eq!(pool.len(), 16); assert_eq!(pool.leaders_len(), 3); - pool.insert(LonelyBlock { - block: Arc::clone(blocks[10].block()), - switch: None, - verify_callback: None, - }); + pool.insert( + LonelyBlock { + block: Arc::clone(blocks[10].block()), + switch: None, + verify_callback: None, + } + .into(), + ); assert_leaders_have_children(&pool); assert_eq!(pool.len(), 17); assert_eq!(pool.leaders_len(), 2); @@ -180,11 +181,14 @@ fn test_leaders() { assert_eq!(pool.len(), 17); assert_eq!(pool.leaders_len(), 2); - pool.insert(LonelyBlock { - block: Arc::clone(blocks[0].block()), - switch: None, - verify_callback: None, - }); + pool.insert( + LonelyBlock { + block: Arc::clone(blocks[0].block()), + switch: None, + verify_callback: None, + } + .into(), + ); assert_leaders_have_children(&pool); assert_eq!(pool.len(), 18); assert_eq!(pool.leaders_len(), 2); @@ -193,23 +197,26 @@ fn test_leaders() { assert_eq!(pool.len(), 3); assert_eq!(pool.leaders_len(), 1); - pool.insert(LonelyBlock { - block: Arc::clone(blocks[15].block()), - switch: None, - verify_callback: None, - }); + pool.insert( + LonelyBlock { + block: Arc::clone(blocks[15].block()), + switch: None, + verify_callback: None, + } + .into(), + ); assert_leaders_have_children(&pool); assert_eq!(pool.len(), 4); assert_eq!(pool.leaders_len(), 1); let orphan_1 = pool.remove_blocks_by_parent(&blocks[14].block.hash()); - let orphan_set: HashSet> = orphan + let orphan_set: HashSet = orphan .into_iter() - .map(|b| b.block) - .chain(orphan_1.into_iter().map(|b| b.block)) + .map(|b| b.hash()) + .chain(orphan_1.into_iter().map(|b| b.hash())) .collect(); - let blocks_set: HashSet> = blocks.into_iter().map(|b| b.block).collect(); + let blocks_set: HashSet = blocks.into_iter().map(|b| b.block().hash()).collect(); assert_eq!(orphan_set, blocks_set); assert_eq!(pool.len(), 0); assert_eq!(pool.leaders_len(), 0); @@ -239,7 +246,7 @@ fn test_remove_expired_blocks() { switch: None, verify_callback: None, }; - pool.insert(lonely_block); + pool.insert(lonely_block.into()); } assert_eq!(pool.leaders_len(), 1); diff --git a/chain/src/utils/orphan_block_pool.rs b/chain/src/utils/orphan_block_pool.rs index ff6dd63b498..51880a89a5f 100644 --- a/chain/src/utils/orphan_block_pool.rs +++ b/chain/src/utils/orphan_block_pool.rs @@ -1,5 +1,5 @@ #![allow(dead_code)] -use crate::LonelyBlock; +use crate::LonelyBlockHash; use ckb_logger::debug; use ckb_types::core::{BlockView, EpochNumber}; use ckb_types::packed; @@ -10,12 +10,12 @@ use std::sync::Arc; pub type ParentHash = packed::Byte32; const SHRINK_THRESHOLD: usize = 100; -const EXPIRED_EPOCH: u64 = 6; +pub const EXPIRED_EPOCH: u64 = 6; #[derive(Default)] struct InnerPool { // Group by blocks in the pool by the parent hash. - blocks: HashMap>, + blocks: HashMap>, // The map tells the parent hash when given the hash of a block in the pool. // // The block is in the orphan pool if and only if the block hash exists as a key in this map. @@ -33,9 +33,9 @@ impl InnerPool { } } - fn insert(&mut self, lonely_block: LonelyBlock) { - let hash = lonely_block.block().header().hash(); - let parent_hash = lonely_block.block().data().header().raw().parent_hash(); + fn insert(&mut self, lonely_block: LonelyBlockHash) { + let hash = lonely_block.hash(); + let parent_hash = lonely_block.parent_hash(); self.blocks .entry(parent_hash.clone()) .or_default() @@ -53,7 +53,7 @@ impl InnerPool { self.parents.insert(hash, parent_hash); } - pub fn remove_blocks_by_parent(&mut self, parent_hash: &ParentHash) -> Vec { + pub fn remove_blocks_by_parent(&mut self, parent_hash: &ParentHash) -> Vec { // try remove leaders first if !self.leaders.remove(parent_hash) { return Vec::new(); @@ -62,7 +62,7 @@ impl InnerPool { let mut queue: VecDeque = VecDeque::new(); queue.push_back(parent_hash.to_owned()); - let mut removed: Vec = Vec::new(); + let mut removed: Vec = Vec::new(); while let Some(parent_hash) = queue.pop_front() { if let Some(orphaned) = self.blocks.remove(&parent_hash) { let (hashes, blocks): (Vec<_>, Vec<_>) = orphaned.into_iter().unzip(); @@ -87,14 +87,16 @@ impl InnerPool { removed } - pub fn get_block(&self, hash: &packed::Byte32) -> Option> { - self.parents.get(hash).and_then(|parent_hash| { - self.blocks.get(parent_hash).and_then(|blocks| { - blocks - .get(hash) - .map(|lonely_block| Arc::clone(lonely_block.block())) - }) - }) + pub fn get_block(&self, _hash: &packed::Byte32) -> Option> { + // TODO get_block from db + None + // self.parents.get(hash).and_then(|parent_hash| { + // self.blocks.get(parent_hash).and_then(|blocks| { + // blocks + // .get(hash) + // .map(|lonely_block| Arc::clone(lonely_block.block())) + // }) + // }) } pub fn contains_block(&self, hash: &packed::Byte32) -> bool { @@ -109,11 +111,7 @@ impl InnerPool { if self.need_clean(hash, tip_epoch) { // remove items in orphan pool and return hash to callee(clean header map) let descendants = self.remove_blocks_by_parent(hash); - result.extend( - descendants - .iter() - .map(|lonely_block| lonely_block.block().hash()), - ); + result.extend(descendants.iter().map(|lonely_block| lonely_block.hash())); } } result @@ -125,7 +123,7 @@ impl InnerPool { .get(parent_hash) .and_then(|map| { map.iter().next().map(|(_, lonely_block)| { - lonely_block.block().header().epoch().number() + EXPIRED_EPOCH < tip_epoch + lonely_block.epoch_number() + EXPIRED_EPOCH < tip_epoch }) }) .unwrap_or_default() @@ -148,11 +146,11 @@ impl OrphanBlockPool { } /// Insert orphaned block, for which we have already requested its parent block - pub fn insert(&self, lonely_block: LonelyBlock) { + pub fn insert(&self, lonely_block: LonelyBlockHash) { self.inner.write().insert(lonely_block); } - pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec { + pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec { self.inner.write().remove_blocks_by_parent(parent_hash) } diff --git a/docs/ckb_async_block_sync.mermaid b/docs/ckb_async_block_sync.mermaid index cef652da5da..eb28cd0eb0e 100644 --- a/docs/ckb_async_block_sync.mermaid +++ b/docs/ckb_async_block_sync.mermaid @@ -1,74 +1,80 @@ sequenceDiagram autonumber - participant Sr as Synchronizer::received participant BP as BlockProcess - participant Sp as Synchronizer::poll - participant C as main thread - participant CO as OrphanBlockPool thread + participant PU as PreloadUnverified thread participant CV as ConsumeUnverifiedBlocks thread box crate:ckb-sync - participant Sr - participant Sp - participant BP + participant Sr + participant Sp + participant BP end box crate:ckb-chain participant C - participant CO + participant PU participant CV end - - Note left of Sr: synchronizer received
Block(122) from remote peer Note over Sr: try_process SyncMessageUnionReader::SendBlock - Sr->>+BP: BlockProcess::execute(Block(122)) - BP->>+C: asynchronous_process_block(Block(122)) + Sr ->>+ BP: BlockProcess::execute(Block(122)) + BP ->>+ C: asynchronous_process_block(Block(122)) Note over C: non_contextual_verify(Block(122)) - C->>+CO: send Block(122) to OrphanBlockPool via channel - C->>-BP: return - BP->>-Sr: return - - Note over CO: insert Block(122) to OrphanBlockPool - + Note over C: insert_block(Block(122)) + Note over C: OrphanBroker.process_lonly_block(Block(122)) + + alt parent is BLOCK_STORED or parent is_pending_veryfing + Note over C: OrphanBroker.process_lonly_block(Block(122)) + Note over C: increase unverified_tip to Block(122) + C ->>+ PU: send Block(122) to PreloadUnverified via channel + else parent not found + Note over C: OrphanBroker.process_lonly_block(Block(122)) + Note over C: insert Block(122) to OrphanBroker + end + C ->>+ PU: send Block(123) to PreloadUnverified via channel + C ->>- BP: return + BP ->>- Sr: return Note left of Sr: synchronizer received
Block(123) from remote peer Note over Sr: try_process SyncMessageUnionReader::SendBlock - Sr->>+BP: BlockProcess::execute(Block(123)) - BP->>+C: asynchronous_process_block(Block(123)) + Sr ->>+ BP: BlockProcess::execute(Block(123)) + BP ->>+ C: asynchronous_process_block(Block(123)) Note over C: non_contextual_verify(Block(123)) - C->>+CO: send Block(123) to OrphanBlockPool via channel - C->>-BP: return - BP->>-Sr: return - - Note over CO: insert Block(123) to OrphanBlockPool + Note over C: insert_block(Block(123)) + Note over C: OrphanBroker.process_lonly_block(Block(123)) + alt parent is BLOCK_STORED or parent is_pending_veryfing + Note over C: OrphanBroker.process_lonly_block(Block(123)) + Note over C: increase unverified_tip to Block(123) + C ->>+ PU: send Block(123) to PreloadUnverified via channel + else parent not found + Note over C: OrphanBroker.process_lonly_block(Block(123)) + Note over C: insert Block(123) to OrphanBroker + end + C ->>- BP: return + BP ->>- Sr: return - loop Search Orphan Pool - Note over CO: if a leader block have descendants - Note over CO: load all descendants from OrphanBlockPool - Note over CO: assume these descendants are valid, let BlockExt.verified = None - Note over CO: insert them to RocksDB - Note over CO: Increase Unverified TIP - CO->>+CV: send the UnverifiedBlock to ConsumeUnverifiedBlocks via channel + loop load unverified + Note over PU: receive LonelyBlockHash + Note over PU: load UnverifiedBlock from db + PU ->>+ CV: send UnverifiedBlock to ConsumeUnverifiedBlocks end loop Consume Unverified Blocks Note over CV: start verify UnverifiedBlock if the channel is not empty - Note over CV: Verify Block in CKB VM - alt Block is Valid Note over CV: remove Block block_status and HeaderMap else Block is Invalid + Note over CV: mark block as BLOCK_INVALID in block_status_map Note over CV: Decrease Unverified TIP - CV->>Sp: I received a Invalid Block, please punish the malicious peer - Note over Sp: call nc.ban_peer() to punish the malicious peer end + opt Execute Callback + Note over CV: execute callback to punish the malicious peer if block is invalid Note over CV: callback: Box) + Send + Sync> end diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index c2c4ce0eb00..d573d7ed386 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -95,6 +95,17 @@ impl BlockFetcher { ckb_metrics::handle().map(|handle| handle.ckb_sync_block_fetch_duration.start_timer()) }; + if self.sync_shared.shared().get_unverified_tip().number() + >= self.sync_shared.active_chain().tip_number() + BLOCK_DOWNLOAD_WINDOW * 9 + { + trace!( + "unverified_tip - tip > BLOCK_DOWNLOAD_WINDOW * 9, skip fetch, unverified_tip: {}, tip: {}", + self.sync_shared.shared().get_unverified_tip().number(), + self.sync_shared.active_chain().tip_number() + ); + return None; + } + if self.reached_inflight_limit() { trace!( "[block_fetcher] inflight count has reached the limit, preventing further downloads from peer {}", @@ -202,14 +213,6 @@ impl BlockFetcher { .get_ancestor(&best_known.hash(), start + span - 1), } }?; - debug!( - "get_ancestor({}, {}) -> {}-{}; IBD: {:?}", - best_known.hash(), - start + span - 1, - header.number(), - header.hash(), - self.ibd, - ); let mut status = self .sync_shared diff --git a/sync/src/tests/sync_shared.rs b/sync/src/tests/sync_shared.rs index 54c3a91f9a4..4db891a063f 100644 --- a/sync/src/tests/sync_shared.rs +++ b/sync/src/tests/sync_shared.rs @@ -3,11 +3,11 @@ use crate::tests::util::{build_chain, inherit_block}; use crate::SyncShared; -use ckb_chain::{start_chain_services, store_unverified_block, RemoteBlock, VerifyResult}; +use ckb_chain::{start_chain_services, RemoteBlock, VerifyResult}; use ckb_logger::info; use ckb_logger_service::LoggerInitGuard; use ckb_shared::block_status::BlockStatus; -use ckb_shared::SharedBuilder; +use ckb_shared::{Shared, SharedBuilder}; use ckb_store::{self, ChainStore}; use ckb_test_chain_utils::always_success_cellbase; use ckb_types::core::{BlockBuilder, BlockView, Capacity}; @@ -23,7 +23,9 @@ fn wait_for_expected_block_status( ) -> bool { let now = std::time::Instant::now(); while now.elapsed().as_secs() < 2 { - let current_status = shared.active_chain().get_block_status(hash); + let current_status = shared + .shared() + .get_block_status(shared.shared().snapshot().as_ref(), hash); if current_status == expect_status { return true; } @@ -175,22 +177,6 @@ fn test_insert_parent_unknown_block() { #[test] fn test_insert_child_block_with_stored_but_unverified_parent() { let (shared1, _) = build_chain(2); - let (shared, chain) = { - let (shared, mut pack) = SharedBuilder::with_temp_db() - .consensus(shared1.consensus().clone()) - .build() - .unwrap(); - let chain_controller = start_chain_services(pack.take_chain_services_builder()); - - while chain_controller.is_verifying_unverified_blocks_on_startup() { - std::thread::sleep(std::time::Duration::from_millis(10)); - } - - ( - SyncShared::new(shared, Default::default(), pack.take_relay_tx_receiver()), - chain_controller, - ) - }; let block = shared1 .store() @@ -203,20 +189,40 @@ fn test_insert_child_block_with_stored_but_unverified_parent() { .unwrap(); Arc::new(parent) }; + + let _logger = ckb_logger_service::init_for_test("info,ckb-chain=debug").expect("init log"); + let parent_hash = parent.header().hash(); let child = Arc::new(block); let child_hash = child.header().hash(); - store_unverified_block(shared.shared(), Arc::clone(&parent)).expect("store parent block"); + let (shared, chain) = { + let (shared, mut pack) = SharedBuilder::with_temp_db() + .consensus(shared1.consensus().clone()) + .build() + .unwrap(); + + let db_txn = shared.store().begin_transaction(); + info!("inserting parent: {}-{}", parent.number(), parent.hash()); + db_txn.insert_block(&parent).expect("insert parent"); + db_txn.commit().expect("commit parent"); - // Note that we will not find the block status obtained from - // shared.active_chain().get_block_status(&parent_hash) to be BLOCK_STORED, - // because `get_block_status` does not read the block status from the database, - // it use snapshot to get the block status, and the snapshot is not updated. - assert!( - shared.store().get_block_ext(&parent_hash).is_some(), - "parent block should be stored" - ); + assert!( + shared.store().get_block(&parent_hash).is_some(), + "parent block should be stored" + ); + + let chain_controller = start_chain_services(pack.take_chain_services_builder()); + + while chain_controller.is_verifying_unverified_blocks_on_startup() { + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + ( + SyncShared::new(shared, Default::default(), pack.take_relay_tx_receiver()), + chain_controller, + ) + }; assert!(shared .blocking_insert_new_block(&chain, Arc::clone(&child)) diff --git a/util/metrics/src/lib.rs b/util/metrics/src/lib.rs index 1cd4827d68a..3609524743d 100644 --- a/util/metrics/src/lib.rs +++ b/util/metrics/src/lib.rs @@ -72,6 +72,8 @@ pub struct Metrics { pub ckb_chain_orphan_count: IntGauge, pub ckb_chain_lonely_block_ch_len: IntGauge, pub ckb_chain_unverified_block_ch_len: IntGauge, + pub ckb_chain_preload_unverified_block_ch_len: IntGauge, + pub ckb_chain_load_full_unverified_block: Histogram, /// ckb_sync_msg_process duration (seconds) pub ckb_sync_msg_process_duration: HistogramVec, /// ckb_sync_block_fetch duraiton (seconds) @@ -163,6 +165,14 @@ static METRICS: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { "ckb_chain_unverified_block_ch_len", "The CKB chain unverified block channel length", ).unwrap(), + ckb_chain_preload_unverified_block_ch_len: register_int_gauge!( + "ckb_chain_preload_unverified_block_ch_len", + "The CKB chain fill unverified block channel length", + ).unwrap(), + ckb_chain_load_full_unverified_block: register_histogram!( + "ckb_chain_load_full_unverified_block", + "The CKB chain load_full_unverified_block duration (seconds)" + ).unwrap(), ckb_sync_msg_process_duration: register_histogram_vec!( "ckb_sync_msg_process_duration", "The CKB sync message process duration (seconds)",