diff --git a/base_layer/core/src/base_node/sync/header_sync/error.rs b/base_layer/core/src/base_node/sync/header_sync/error.rs index 9d9a5e391d..f5ebc01dbe 100644 --- a/base_layer/core/src/base_node/sync/header_sync/error.rs +++ b/base_layer/core/src/base_node/sync/header_sync/error.rs @@ -53,10 +53,12 @@ pub enum BlockHeaderSyncError { NotInSync, #[error("Unable to locate start hash `{0}`")] StartHashNotFound(String), - #[error("Expected header height {0} got {1}")] - InvalidBlockHeight(u64, u64), + #[error("Expected header height {expected} got {actual}")] + InvalidBlockHeight { expected: u64, actual: u64 }, #[error("Unable to find chain split from peer `{0}`")] ChainSplitNotFound(NodeId), #[error("Node could not find any other node with which to sync. Silence.")] NetworkSilence, + #[error("Invalid protocol response: {0}")] + InvalidProtocolResponse(String), } diff --git a/base_layer/core/src/base_node/sync/header_sync/mod.rs b/base_layer/core/src/base_node/sync/header_sync/mod.rs index b4392e7c64..a5214d422d 100644 --- a/base_layer/core/src/base_node/sync/header_sync/mod.rs +++ b/base_layer/core/src/base_node/sync/header_sync/mod.rs @@ -20,9 +20,6 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#[cfg(test)] -mod test; - mod error; pub use error::BlockHeaderSyncError; diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index dafa896a5a..361f482613 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -24,7 +24,7 @@ use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError}; use crate::{ base_node::sync::{hooks::Hooks, rpc, BlockSyncConfig}, blocks::BlockHeader, - chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainBlock}, + chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainBlock, ChainHeader}, consensus::ConsensusManager, proof_of_work::randomx_factory::RandomXFactory, proto::{ @@ -37,7 +37,11 @@ use crate::{ }; use futures::{future, stream::FuturesUnordered, StreamExt}; use log::*; -use std::{convert::TryFrom, sync::Arc, time::Duration}; +use std::{ + convert::TryFrom, + sync::Arc, + time::{Duration, Instant}, +}; use tari_comms::{ connectivity::{ConnectivityError, ConnectivityRequester, ConnectivitySelection}, peer_manager::NodeId, @@ -47,6 +51,8 @@ use tari_comms::{ const LOG_TARGET: &str = "c::bn::header_sync"; +const NUM_INITIAL_HEADERS_TO_REQUEST: u64 = 1000; + pub struct HeaderSynchronizer<'a, B> { config: BlockSyncConfig, db: AsyncBlockchainDb, @@ -116,13 +122,16 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { debug!(target: LOG_TARGET, "Block header validation failed: {}", err); self.ban_peer_long(node_id, err.into()).await?; }, + Err(err @ BlockHeaderSyncError::InvalidBlockHeight { .. }) => { + debug!(target: LOG_TARGET, "{}", err); + self.ban_peer_long(node_id, BanReason::GeneralHeaderSyncFailure(err)) + .await?; + }, Err(err) => { debug!( target: LOG_TARGET, "Failed to synchronize headers from peer `{}`: {}", node_id, err ); - self.ban_peer_long(node_id, BanReason::GeneralHeaderSyncFailure(err)) - .await?; }, } } @@ -253,13 +262,16 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { latency.unwrap_or_default().as_millis() ); - let sync_complete = self.check_chain_split(&peer, &mut client).await?; - // If sync is not complete after the chain split check, synchronize the rest of the headers - if !sync_complete { - self.synchronize_headers(&peer, &mut client).await?; + let sync_status = self.determine_sync_status(&peer, &mut client).await?; + match sync_status { + SyncStatus::InSync => Ok(()), + // We're ahead of this peer, try another peer if possible + SyncStatus::Ahead => Err(BlockHeaderSyncError::NotInSync), + SyncStatus::Lagging(split_info) => { + self.synchronize_headers(&peer, &mut client, *split_info).await?; + Ok(()) + }, } - - Ok(()) } async fn find_chain_split( @@ -290,9 +302,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { .await?; debug!( target: LOG_TARGET, - "Determining where our chain splits with the remote peer `{}` ({} block hashes sent)", + "Determining if chain splits between {} and {} headers back from peer `{}`", + offset, + offset + NUM_CHAIN_SPLIT_HEADERS, peer, - block_hashes.len() ); let request = FindChainSplitRequest { @@ -317,18 +330,24 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } } - /// Check for a chain split with the given peer, validate and add header state. State will be rewound if a higher - /// proof of work is achieved. If, after this step, the headers are already in sync, true is returned, otherwise a - /// header sync should proceed to download the remaining headers. - async fn check_chain_split( + /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information + /// of the chain split (see [SyncStatus]). + /// + /// If the local node is behind the remote chain (i.e. `SyncStatus::Lagging`), the appropriate `ChainSplitInfo` is + /// returned, the header validator is initialized and the preliminary headers are validated. + async fn determine_sync_status( &mut self, peer: &NodeId, client: &mut rpc::BaseNodeSyncRpcClient, - ) -> Result + ) -> Result { - const NUM_HEADERS_TO_REQUEST: u64 = 1000; - let (resp, block_hashes, steps_back) = self.find_chain_split(peer, client, NUM_HEADERS_TO_REQUEST).await?; - if resp.headers.len() > NUM_HEADERS_TO_REQUEST as usize { + // Fetch the local tip header at the beginning of the sync process + let local_tip_header = self.db.fetch_tip_header().await?; + + let (resp, block_hashes, steps_back) = self + .find_chain_split(peer, client, NUM_INITIAL_HEADERS_TO_REQUEST) + .await?; + if resp.headers.len() > NUM_INITIAL_HEADERS_TO_REQUEST as usize { self.ban_peer_long(peer.clone(), BanReason::PeerSentTooManyHeaders(resp.headers.len())) .await?; return Err(BlockHeaderSyncError::NotInSync); @@ -368,18 +387,14 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { peer, fork_hash_index ); - // Another peer will be attempted, if possible - return Err(BlockHeaderSyncError::NotInSync); + + return Ok(SyncStatus::Ahead); } debug!(target: LOG_TARGET, "Already in sync with peer `{}`.", peer); - return Ok(true); + return Ok(SyncStatus::InSync); } - // We can trust that the header associated with this hash exists because block_hashes is data this node - // supplied. usize conversion overflow has already been checked above - let chain_split_hash = block_hashes[fork_hash_index as usize].clone(); - let headers = headers .into_iter() .map(BlockHeader::try_from) @@ -387,8 +402,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?; let num_new_headers = headers.len(); - self.header_validator.initialize_state(chain_split_hash).await?; - let mut chain_headers = Vec::with_capacity(headers.len()); + // We can trust that the header associated with this hash exists because block_hashes is data this node + // supplied. usize conversion overflow has already been checked above + let chain_split_hash = block_hashes[fork_hash_index as usize].clone(); + + self.header_validator.initialize_state(chain_split_hash.clone()).await?; for header in headers { debug!( target: LOG_TARGET, @@ -396,54 +414,45 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { header.height, header.pow_algo(), ); - let height = header.height; - chain_headers.push(self.header_validator.validate_and_calculate_metadata(header)?); - debug!(target: LOG_TARGET, "Header #{} is VALID", height,) + self.header_validator.validate(header)?; } - if fork_hash_index > 0 { - // If the peer is telling us that we have to rewind, check their headers are stronger than our current tip - self.header_validator - .check_stronger_chain(chain_headers.last().expect("already_checked")) - .await?; - - // TODO: We've established that the peer has a chain that forks from ours, can provide valid headers for - // _part_ of that chain and is stronger than our chain at the same (or less) height. - // However, to know that the full chain is stronger than our tip header, we need to - // download all headers and compare. + debug!( + target: LOG_TARGET, + "Peer {} has submitted {} valid header(s)", peer, num_new_headers + ); - debug!(target: LOG_TARGET, "Rewinding the chain by {} block(s)", steps_back); - let blocks = self.rewind_blockchain(steps_back).await?; - self.hooks.call_on_rewind_hooks(blocks); + // Basic sanity check that the peer sent tip height greater than the split. + let split_height = local_tip_header.height().saturating_sub(steps_back); + if remote_tip_height < split_height { + self.ban_peer_short(peer.clone(), BanReason::PeerSentInvalidTipHeight { + actual: remote_tip_height, + expected: split_height, + }) + .await?; + return Err(BlockHeaderSyncError::InvalidProtocolResponse(format!( + "Peer {} sent invalid remote tip height", + peer + ))); } - let mut txn = self.db.write_transaction(); - let current_height = chain_headers.last().map(|h| h.height()).unwrap_or(remote_tip_height); - chain_headers.into_iter().for_each(|header| { - debug!(target: LOG_TARGET, "Adding header: #{}", header.header.height); - txn.insert_header(header.header, header.accumulated_data); - }); - txn.commit().await?; - - self.hooks - .call_on_progress_header_hooks(current_height, remote_tip_height, self.sync_peers); - - // If less headers were returned than requested, the peer is indicating that we have the tip header. - // To indicate that sync is complete, true is returned, otherwise false - Ok(num_new_headers < NUM_HEADERS_TO_REQUEST as usize) + let chain_split_info = ChainSplitInfo { + local_tip_header, + remote_tip_height, + reorg_steps_back: steps_back, + chain_split_hash, + }; + Ok(SyncStatus::Lagging(Box::new(chain_split_info))) } - async fn rewind_blockchain(&self, steps_back: u64) -> Result>, BlockHeaderSyncError> { + async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result>, BlockHeaderSyncError> { debug!( target: LOG_TARGET, - "Deleting {} header(s) that no longer form part of the main chain", steps_back + "Deleting headers that no longer form part of the main chain up until split at {}", + split_hash.to_hex() ); - let tip_header = self.db.fetch_last_header().await?; - let new_tip_height = tip_header.height - steps_back; - debug!(target: LOG_TARGET, "Rewinding to height {}", new_tip_height); - - let blocks = self.db.rewind_to_height(new_tip_height).await?; + let blocks = self.db.rewind_to_hash(split_hash).await?; debug!( target: LOG_TARGET, "Rewound {} block(s) in preparation for header sync", @@ -456,29 +465,55 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { &mut self, peer: &NodeId, client: &mut rpc::BaseNodeSyncRpcClient, + split_info: ChainSplitInfo, ) -> Result<(), BlockHeaderSyncError> { - let tip_header = self.db.fetch_last_header().await?; + const COMMIT_EVERY_N_HEADERS: usize = 1000; + + // Peer returned less than the max headers. This indicates that there are no further headers to request. + if self.header_validator.valid_headers().len() < NUM_INITIAL_HEADERS_TO_REQUEST as usize { + debug!(target: LOG_TARGET, "No further headers to download"); + if !self.pending_chain_has_higher_pow(&split_info.local_tip_header)? { + return Err(BlockHeaderSyncError::WeakerChain); + } + + debug!( + target: LOG_TARGET, + "Remote chain from peer {} has higher PoW. Switching", peer + ); + // PoW is higher, switching over to the new chain + self.switch_to_pending_chain(&split_info).await?; + + return Ok(()); + } + + // Find the hash to start syncing the rest of the headers. + // The expectation cannot fail because the number of headers has been checked in determine_sync_status + let start_header = + self.header_validator.valid_headers().last().expect( + "synchronize_headers: expected there to be at least one valid pending header but there were none", + ); + debug!( target: LOG_TARGET, - "Requesting header stream starting from tip header #{} from peer `{}`", tip_header.height, peer + "Download remaining headers starting from header #{} from peer `{}`", start_header.header.height, peer ); let request = SyncHeadersRequest { - start_hash: tip_header.hash(), + start_hash: start_header.header.hash(), // To the tip! count: 0, }; - let mut header_stream = client.sync_headers(request).await?; + let mut header_stream = client.sync_headers(request).await?; debug!(target: LOG_TARGET, "Reading headers from peer `{}`", peer); - // Reset the header validator state to be sure we're using the correct data - self.header_validator.initialize_state(tip_header.hash()).await?; + let mut has_switched_to_new_chain = false; + while let Some(header) = header_stream.next().await { let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?; debug!( target: LOG_TARGET, - "Validating and adding header: #{} (PoW = {}), ", + "Validating header: #{} (PoW = {})", header.height, header.pow_algo() ); @@ -494,17 +529,101 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { ); continue; } - let chain_header = self.header_validator.validate_and_calculate_metadata(header)?; - let current_height = chain_header.height(); - self.db - .write_transaction() - .insert_header(chain_header.header, chain_header.accumulated_data) - .commit() - .await?; + let current_height = header.height; + self.header_validator.validate(header)?; + + if has_switched_to_new_chain { + // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers + if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS { + self.commit_pending_headers().await?; + } + } else { + // The remote chain has not (yet) been accepted. + // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is + // achieved. + if self.pending_chain_has_higher_pow(&split_info.local_tip_header)? { + self.switch_to_pending_chain(&split_info).await?; + has_switched_to_new_chain = true; + } + } self.hooks - .call_on_progress_header_hooks(current_height, current_height, self.sync_peers); + .call_on_progress_header_hooks(current_height, split_info.remote_tip_height, self.sync_peers); + } + + if !has_switched_to_new_chain { + return Err(BlockHeaderSyncError::WeakerChain); + } + + // Commit the last blocks that don't fit into the COMMIT_EVENT_N_HEADERS blocks + if !self.header_validator.valid_headers().is_empty() { + self.commit_pending_headers().await?; } + + Ok(()) + } + + async fn commit_pending_headers(&mut self) -> Result { + let chain_headers = self.header_validator.take_valid_headers(); + let num_headers = chain_headers.len(); + let start = Instant::now(); + + let new_tip = chain_headers.last().cloned().unwrap(); + let mut txn = self.db.write_transaction(); + chain_headers.into_iter().for_each(|chain_header| { + txn.insert_header(chain_header.header, chain_header.accumulated_data); + }); + + txn.commit().await?; + + debug!( + target: LOG_TARGET, + "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}", + num_headers, + new_tip.height(), + start.elapsed() + ); + + Ok(new_tip) + } + + fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> Result { + let chain_headers = self.header_validator.valid_headers(); + if chain_headers.is_empty() { + return Ok(false); + } + + // Check that the remote tip is stronger than the local tip + let proposed_tip = chain_headers.last().unwrap(); + match self.header_validator.check_stronger_chain(current_tip, proposed_tip) { + Ok(_) => Ok(true), + Err(BlockHeaderSyncError::WeakerChain) => Ok(false), + Err(err) => Err(err), + } + } + + async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> { + // Reorg if required + if split_info.reorg_steps_back > 0 { + debug!( + target: LOG_TARGET, + "Reorg: Rewinding the chain by {} block(s) (split hash = {})", + split_info.reorg_steps_back, + split_info.chain_split_hash.to_hex() + ); + let blocks = self.rewind_blockchain(split_info.chain_split_hash.clone()).await?; + // NOTE: `blocks` only contains full blocks that were reorged out, and not the headers. + // This may be unexpected for implementers of the rewind hook. + self.hooks.call_on_rewind_hooks(blocks); + } + + // Commit the forked chain. At this point + // 1. Headers have been validated + // 2. The forked chain has a higher PoW than the local chain + // + // After this we commit headers every `n` blocks + self.commit_pending_headers().await?; + Ok(()) } } @@ -513,6 +632,8 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { enum BanReason { #[error("This peer sent too many headers ({0}) in response to a chain split request")] PeerSentTooManyHeaders(usize), + #[error("This peer sent an invalid tip height {actual} expected a height greater than or equal to {expected}")] + PeerSentInvalidTipHeight { actual: u64, expected: u64 }, #[error( "This peer sent a split hash index ({fork_hash_index}) greater than the number of block hashes sent \ ({num_block_hashes})" @@ -530,3 +651,19 @@ enum BanReason { #[error("Peer did not respond timeously during RPC negotiation")] RpcNegotiationTimedOut, } + +struct ChainSplitInfo { + local_tip_header: ChainHeader, + remote_tip_height: u64, + reorg_steps_back: u64, + chain_split_hash: HashOutput, +} + +enum SyncStatus { + /// Local and remote node are in sync + InSync, + /// Local node is ahead of the remote node + Ahead, + /// Local node is lagging behind remote node + Lagging(Box), +} diff --git a/base_layer/core/src/base_node/sync/header_sync/test.rs b/base_layer/core/src/base_node/sync/header_sync/test.rs deleted file mode 100644 index c27791475d..0000000000 --- a/base_layer/core/src/base_node/sync/header_sync/test.rs +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2020, The Tari Project -// -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the -// following disclaimer in the documentation and/or other materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote -// products derived from this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/base_layer/core/src/base_node/sync/header_sync/validator.rs b/base_layer/core/src/base_node/sync/header_sync/validator.rs index e015c843d3..de767d8572 100644 --- a/base_layer/core/src/base_node/sync/header_sync/validator.rs +++ b/base_layer/core/src/base_node/sync/header_sync/validator.rs @@ -30,7 +30,6 @@ use crate::{ BlockchainBackend, ChainHeader, ChainStorageError, - Optional, TargetDifficulties, }, common::rolling_vec::RollingVec, @@ -64,6 +63,7 @@ struct State { timestamps: RollingVec, target_difficulties: TargetDifficulties, previous_accum: BlockHeaderAccumulatedData, + valid_headers: Vec, } impl BlockHeaderSyncValidator { @@ -105,23 +105,24 @@ impl BlockHeaderSyncValidator { timestamps, target_difficulties, previous_accum, + // One large allocation is usually better even if it is not always used. + valid_headers: Vec::with_capacity(1000), }); Ok(()) } - pub fn validate_and_calculate_metadata( - &mut self, - header: BlockHeader, - ) -> Result - { - let expected_height = self.state().current_height + 1; + pub fn validate(&mut self, header: BlockHeader) -> Result<(), BlockHeaderSyncError> { + let state = self.state(); + let expected_height = state.current_height + 1; if header.height != expected_height { - return Err(BlockHeaderSyncError::InvalidBlockHeight(expected_height, header.height)); + return Err(BlockHeaderSyncError::InvalidBlockHeight { + expected: expected_height, + actual: header.height, + }); } check_timestamp_ftl(&header, &self.consensus_rules)?; - let state = self.state(); check_header_timestamp_greater_than_median(&header, &state.timestamps)?; let constants = self.consensus_rules.consensus_constants(header.height); @@ -154,29 +155,38 @@ impl BlockHeaderSyncValidator { // Add a "more recent" datapoint onto the target difficulty state.target_difficulties.add_back(&header, target_difficulty); state.previous_accum = metadata.clone(); - - Ok(ChainHeader { + state.valid_headers.push(ChainHeader { header, accumulated_data: metadata, - }) + }); + + Ok(()) } - pub async fn check_stronger_chain(&mut self, their_header: &ChainHeader) -> Result<(), BlockHeaderSyncError> { - // Compare their header to ours at the same height, or if we don't have a header at that height, our current tip - // header - let our_header = match self - .db - .fetch_header_and_accumulated_data(their_header.height()) - .await - .optional()? - { - Some(h) => ChainHeader { - header: h.0, - accumulated_data: h.1, - }, - None => self.db.fetch_tip_header().await?, - }; + /// Drains and returns all the headers that were validated. + /// + /// ## Panics + /// + /// Panics if initialize_state was not called prior to calling this function + pub fn take_valid_headers(&mut self) -> Vec { + self.state_mut().valid_headers.drain(..).collect::>() + } + /// Returns a slice containing the current valid headers + /// + /// ## Panics + /// + /// Panics if initialize_state was not called prior to calling this function + pub fn valid_headers(&self) -> &[ChainHeader] { + &self.state().valid_headers + } + + pub fn check_stronger_chain( + &self, + our_header: &ChainHeader, + their_header: &ChainHeader, + ) -> Result<(), BlockHeaderSyncError> + { debug!( target: LOG_TARGET, "Comparing PoW on remote header #{} and local header #{}", @@ -190,7 +200,7 @@ impl BlockHeaderSyncValidator { .compare(&our_header, their_header) { Ordering::Less => Ok(()), - Ordering::Equal | Ordering::Greater => Err(BlockHeaderSyncError::WeakerChain), + Ordering::Greater | Ordering::Equal => Err(BlockHeaderSyncError::WeakerChain), } } @@ -206,3 +216,113 @@ impl BlockHeaderSyncValidator { .expect("state() called before state was initialized (using the `begin` method)") } } + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + blocks::BlockHeader, + chain_storage::{async_db::AsyncBlockchainDb, BlockHeaderAccumulatedData}, + consensus::{ConsensusManager, Network}, + crypto::tari_utilities::{hex::Hex, Hashable}, + proof_of_work::{randomx_factory::RandomXFactory, PowAlgorithm}, + test_helpers::blockchain::{create_new_blockchain, TempDatabase}, + }; + use tari_test_utils::unpack_enum; + + fn setup() -> (BlockHeaderSyncValidator, AsyncBlockchainDb) { + let rules = ConsensusManager::builder(Network::LocalNet).build(); + let randomx_factory = RandomXFactory::default(); + let db = create_new_blockchain(); + ( + BlockHeaderSyncValidator::new(db.clone().into(), rules, randomx_factory), + db.into(), + ) + } + + async fn setup_with_headers( + n: usize, + ) -> ( + BlockHeaderSyncValidator, + AsyncBlockchainDb, + ChainHeader, + ) { + let (validator, db) = setup(); + let mut tip = db.fetch_tip_header().await.unwrap(); + for _ in 0..n { + let mut header = BlockHeader::from_previous(&tip.header).unwrap(); + // Needed to have unique keys for the blockchain db mmr count indexes (MDB_KEY_EXIST error) + header.kernel_mmr_size += 1; + header.output_mmr_size += 1; + let acc_data = BlockHeaderAccumulatedData { + hash: header.hash(), + ..Default::default() + }; + + db.insert_valid_headers(vec![(header.clone(), acc_data.clone())]) + .await + .unwrap(); + tip = ChainHeader { + header, + accumulated_data: acc_data, + }; + } + + (validator, db, tip) + } + + mod initialize_state { + use super::*; + + #[tokio_macros::test_basic] + async fn it_initializes_state_to_given_header() { + let (mut validator, _, tip) = setup_with_headers(1).await; + validator.initialize_state(tip.header.hash()).await.unwrap(); + let state = validator.state(); + assert!(state.valid_headers.is_empty()); + assert_eq!(state.target_difficulties.get(PowAlgorithm::Sha3).len(), 2); + assert!(state.target_difficulties.get(PowAlgorithm::Monero).is_empty()); + assert_eq!(state.timestamps.len(), 2); + assert_eq!(state.current_height, 1); + } + + #[tokio_macros::test_basic] + async fn it_errors_if_hash_does_not_exist() { + let (mut validator, _) = setup(); + let start_hash = vec![0; 32]; + let err = validator.initialize_state(start_hash.clone()).await.unwrap_err(); + unpack_enum!(BlockHeaderSyncError::StartHashNotFound(hash) = err); + assert_eq!(hash, start_hash.to_hex()); + } + } + + mod validate { + use super::*; + + #[tokio_macros::test_basic] + async fn it_passes_if_headers_are_valid() { + let (mut validator, _, tip) = setup_with_headers(1).await; + validator.initialize_state(tip.header.hash()).await.unwrap(); + assert!(validator.valid_headers().is_empty()); + let next = BlockHeader::from_previous(&tip.header).unwrap(); + validator.validate(next).unwrap(); + assert_eq!(validator.valid_headers().len(), 1); + let tip = validator.valid_headers().last().cloned().unwrap(); + let next = BlockHeader::from_previous(&tip.header).unwrap(); + validator.validate(next).unwrap(); + assert_eq!(validator.valid_headers().len(), 2); + } + + #[tokio_macros::test_basic] + async fn it_fails_if_height_is_not_serial() { + let (mut validator, _, tip) = setup_with_headers(2).await; + validator.initialize_state(tip.header.hash()).await.unwrap(); + let mut next = BlockHeader::from_previous(&tip.header).unwrap(); + next.height = 10; + let err = validator.validate(next).unwrap_err(); + unpack_enum!(BlockHeaderSyncError::InvalidBlockHeight { expected, actual } = err); + assert_eq!(actual, 10); + assert_eq!(expected, 3); + } + } +} diff --git a/base_layer/core/src/chain_storage/accumulated_data.rs b/base_layer/core/src/chain_storage/accumulated_data.rs index a346f5e570..b4ad4041c2 100644 --- a/base_layer/core/src/chain_storage/accumulated_data.rs +++ b/base_layer/core/src/chain_storage/accumulated_data.rs @@ -287,7 +287,7 @@ impl Display for BlockHeaderAccumulatedData { } } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct ChainHeader { pub header: BlockHeader, pub accumulated_data: BlockHeaderAccumulatedData, diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index 366b4c35c6..c3e714700f 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -156,6 +156,8 @@ impl AsyncBlockchainDb { make_async_fn!(rewind_to_height(height: u64) -> Vec>, "rewind_to_height"); + make_async_fn!(rewind_to_hash(hash: BlockHash) -> Vec>, "rewind_to_hash"); + //---------------------------------- Headers --------------------------------------------// make_async_fn!(fetch_header(height: u64) -> Option, "fetch_header"); diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 0162d5eb45..6ce2de482a 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -971,6 +971,17 @@ where B: BlockchainBackend rewind_to_height(&mut *db, height) } + /// Rewind the blockchain state to the block hash making the block at that hash the new tip. + /// Returns the removed blocks. + /// + /// The operation will fail if + /// * The block hash does not exist + /// * The block hash is before the horizon block height determined by the pruning horizon + pub fn rewind_to_hash(&self, hash: BlockHash) -> Result>, ChainStorageError> { + let mut db = self.db_write_access()?; + rewind_to_hash(&mut *db, hash) + } + pub fn fetch_horizon_data(&self) -> Result, ChainStorageError> { let db = self.db_read_access()?; db.fetch_horizon_data() @@ -1537,6 +1548,21 @@ fn rewind_to_height( Ok(removed_blocks) } +fn rewind_to_hash( + db: &mut T, + block_hash: BlockHash, +) -> Result>, ChainStorageError> +{ + let block_hash_hex = block_hash.to_hex(); + let target_header = + fetch_header_by_block_hash(&*db, block_hash)?.ok_or_else(|| ChainStorageError::ValueNotFound { + entity: "BlockHeader".to_string(), + field: "block_hash".to_string(), + value: block_hash_hex, + })?; + rewind_to_height(db, target_header.height) +} + // Checks whether we should add the block as an orphan. If it is the case, the orphan block is added and the chain // is reorganised if necessary. fn handle_possible_reorg( diff --git a/integration_tests/features/Sync.feature b/integration_tests/features/Sync.feature index f6043f428f..ada01fd906 100644 --- a/integration_tests/features/Sync.feature +++ b/integration_tests/features/Sync.feature @@ -32,9 +32,37 @@ Feature: Block Sync Then NODE1 should have 11 peers Then NODE2 should have 11 peers + @critical @reorg + Scenario: Full block sync with small reorg + Given I have a base node NODE1 connected to all seed nodes + Given I have a base node NODE2 connected to node NODE1 + When I mine 5 blocks on NODE1 + Then all nodes are at height 5 + Given I stop NODE2 + Then I mine 5 blocks on NODE1 + Given I stop NODE1 + And I start NODE2 + Then I mine 7 blocks on NODE2 + When I start NODE1 + Then all nodes are on the same chain at height 12 + + @critical @reorg @long-running + Scenario: Full block sync with large reorg + Given I have a base node NODE1 connected to all seed nodes + Given I have a base node NODE2 connected to node NODE1 + When I mine 5 blocks on NODE1 + Then all nodes are at height 5 + Given I stop NODE2 + Then I mine 1001 blocks on NODE1 + Given I stop NODE1 + And I start NODE2 + Then I mine 1500 blocks on NODE2 + When I start NODE1 + Then all nodes are on the same chain at height 1505 + @critical Scenario: Pruned mode - #TODO: Merge steps into single lines + # TODO: Merge steps into single lines Given I have a base node NODE1 connected to all seed nodes When I mine a block on NODE1 with coinbase CB1 When I mine a block on NODE1 with coinbase CB2 diff --git a/integration_tests/features/support/steps.js b/integration_tests/features/support/steps.js index e4b727e2d0..c8b1115a98 100644 --- a/integration_tests/features/support/steps.js +++ b/integration_tests/features/support/steps.js @@ -290,6 +290,23 @@ Then(/node (.*) is at height (\d+)/, { timeout: 120 * 1000 }, async function (na expect(await client.getTipHeight()).to.equal(height); }); +Then('all nodes are on the same chain at height {int}', {timeout: 1200 * 1000}, async function (height) { + let tipHash = null; + await this.forEachClientAsync(async (client, name) => { + await waitFor(async() => client.getTipHeight(), height, 115 * 1000); + const currTip = await client.getTipHeader(); + expect(currTip.height).to.equal(height); + if (!tipHash) { + tipHash = currTip.hash.toString('hex'); + console.log(`Node ${name} is at tip: ${tipHash}`); + } else { + let currTipHash = currTip.hash.toString('hex'); + console.log(`Node ${name} is at tip: ${currTipHash} (should be ${tipHash})`); + expect(currTipHash).to.equal(tipHash); + } + }) +}); + Then("all nodes are at height {int}", { timeout: 1200 * 1000 }, async function (height) { await this.forEachClientAsync(async (client, name) => { await waitFor(async () => client.getTipHeight(), height, 115 * 1000); @@ -305,7 +322,7 @@ Then("all nodes are at current tip height", { timeout: 1200 * 1000 }, async func await this.forEachClientAsync(async (client, name) => { await waitFor(async () => client.getTipHeight(), height, 1200 * 1000); const currTip = await client.getTipHeight(); - console.log(`Node ${name} is at tip: ${currTip} (should be`, height, `)`); + console.log(`Node ${name} is at tip: ${currTip} (expected ${height})`); expect(currTip).to.equal(height); }); }); @@ -433,7 +450,9 @@ Then(/node (.*) is at tip (.*)/, async function (node, name) { let client = this.getClient(node); let header = await client.getTipHeader(); // console.log("headers:", this.headers); - expect(this.headers[name].hash).to.equal(header.hash); + const existingHeader = this.headers[name]; + expect(existingHeader).to.not.be.null; + expect(existingHeader.hash.toString('hex')).to.equal(header.hash.toString('hex')); }); When(/I mine a block on (.*) with coinbase (.*)/, { timeout: 600 * 1000 }, async function (name, coinbaseName) { @@ -441,12 +460,15 @@ When(/I mine a block on (.*) with coinbase (.*)/, { timeout: 600 * 1000 }, async this.addOutput(coinbaseName, candidate.originalTemplate.coinbase); return candidate; }); + this.tipHeight += 1; }); -When(/I mine (\d+) custom weight blocks on (.*) with weight (\d+)/, { timeout: 600 * 1000 }, async function (numBlocks, name, weight) { - for (let i = 0; i < numBlocks; i++) { - await this.mineBlock(name, 17); +When(/I mine (\d+) custom weight blocks on (.*) with weight (\d+)/, { timeout: -1}, async function (numBlocks, name, weight) { + for(let i=0;i { - return header; + .then((headers) => { + const header = headers[0]; + return Object.assign(header, { + height: +header.height, + }); }); } + getTipHeight() { + return this.client + .getTipInfo() + .sendMessage({}) + .then((tip) => parseInt(tip.metadata.height_of_longest_chain)); + } + getPreviousBlockTemplate(height) { return cloneDeep(this.blockTemplates["height" + height]); } @@ -152,15 +162,6 @@ class BaseNodeClient { }); } - getTipHeight() { - return this.client - .getTipInfo() - .sendMessage({}) - .then((tip) => { - return parseInt(tip.metadata.height_of_longest_chain); - }); - } - fetchMatchingUtxos(hashes) { return this.client .fetchMatchingUtxos() @@ -273,7 +274,7 @@ class BaseNodeClient { let template = await this.getMinedCandidateBlock(weight); return this.submitTemplate(template, beforeSubmit).then( async () => { - let tip = await this.getTipHeight(); + // let tip = await this.getTipHeight(); // console.log("Node is at tip:", tip); }, (err) => { diff --git a/integration_tests/helpers/config.js b/integration_tests/helpers/config.js index f26fdc1bb6..4992f629d4 100644 --- a/integration_tests/helpers/config.js +++ b/integration_tests/helpers/config.js @@ -98,7 +98,7 @@ function createEnv( TARI_BASE_NODE__LOCALNET__GRPC_CONSOLE_WALLET_ADDRESS: `${walletGrpcAddress}:${walletGrpcPort}`, TARI_BASE_NODE__LOCALNET__BASE_NODE_IDENTITY_FILE: `${nodeFile}`, TARI_BASE_NODE__LOCALNET__TCP_LISTENER_ADDRESS: - "/ip4/0.0.0.0/tcp/" + (isWallet ? `${walletPort}` : `${baseNodePort}`), + "/ip4/127.0.0.1/tcp/" + (isWallet ? `${walletPort}` : `${baseNodePort}`), TARI_BASE_NODE__LOCALNET__PUBLIC_ADDRESS: "/ip4/127.0.0.1/tcp/" + (isWallet ? `${walletPort}` : `${baseNodePort}`), TARI_MERGE_MINING_PROXY__LOCALNET__PROXY_HOST_ADDRESS: `${proxyFullAddress}`, diff --git a/integration_tests/helpers/util.js b/integration_tests/helpers/util.js index fd67fea7de..03deed1264 100644 --- a/integration_tests/helpers/util.js +++ b/integration_tests/helpers/util.js @@ -1,8 +1,8 @@ -var net = require("net"); +const net = require("net"); const fs = require("fs"); const readline = require("readline"); -var { blake2bInit, blake2bUpdate, blake2bFinal } = require("blakejs"); +const { blake2bInit, blake2bUpdate, blake2bFinal } = require("blakejs"); function getRandomInt(min, max) { min = Math.ceil(min); @@ -16,6 +16,16 @@ function sleep(ms) { }); } +function withTimeout(ms, promise, message = "") { + const timeout = new Promise((_resolve, reject) => { + setTimeout( + () => reject(new Error(message || `Timed out after ${ms}ms`)), + ms + ); + }); + return Promise.race([timeout, promise]); +} + async function waitFor( asyncTestFn, toBe, @@ -23,7 +33,7 @@ async function waitFor( timeOut = 500, skipLog = 50 ) { - var now = new Date(); + let now = new Date(); let i = 0; while (new Date() - now < maxTime) { @@ -34,7 +44,7 @@ async function waitFor( } break; } - if (i % skipLog == 0 && i > 1) { + if (i % skipLog === 0 && i > 1) { console.log("waiting for process...", timeOut, i); } await sleep(timeOut); @@ -74,8 +84,8 @@ function hexSwitchEndianness(val) { } // Thanks to https://stackoverflow.com/questions/29860354/in-nodejs-how-do-i-check-if-a-port-is-listening-or-in-use -var portInUse = function (port, callback) { - var server = net.createServer(function (socket) { +let portInUse = function (port, callback) { + let server = net.createServer(function (socket) { socket.write("Echo server\r\n"); socket.pipe(socket); }); @@ -90,8 +100,8 @@ var portInUse = function (port, callback) { }); }; -var index = 0; -var getFreePort = async function (from, to) { +let index = 0; +let getFreePort = async function (from, to) { function testPort(port) { return new Promise((r) => { portInUse(port, (v) => { @@ -123,12 +133,12 @@ var getFreePort = async function (from, to) { // WIP this doesn't hash properly const getTransactionOutputHash = function (output) { - var KEY = null; // optional key - var OUTPUT_LENGTH = 32; // bytes - var context = blake2bInit(OUTPUT_LENGTH, KEY); + let KEY = null; // optional key + let OUTPUT_LENGTH = 32; // bytes + let context = blake2bInit(OUTPUT_LENGTH, KEY); let flags = Buffer.alloc(1); flags[0] = output.features.flags; - var buffer = Buffer.concat([ + let buffer = Buffer.concat([ flags, toLittleEndian(parseInt(output.features.maturity), 64), ]); @@ -139,8 +149,8 @@ const getTransactionOutputHash = function (output) { }; function consoleLogTransactionDetails(txnDetails, txId) { - var found = txnDetails[0]; - var status = txnDetails[1]; + let found = txnDetails[0]; + let status = txnDetails[1]; if (found) { console.log( " Transaction " + @@ -186,7 +196,7 @@ function consoleLogCoinbaseDetails(txnDetails) { } function pad(str, length, padLeft = true) { - var padding = Array(length).join(" "); + let padding = Array(length).join(" "); if (typeof str === "undefined") return padding; if (padLeft) { return (padding + str).slice(-padding.length); @@ -207,4 +217,5 @@ module.exports = { consoleLogTransactionDetails, consoleLogBalance, consoleLogCoinbaseDetails, + withTimeout, };