From baa411faa6aa0d54f7e5bd5038b3d326b09c28c2 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Tue, 16 Mar 2021 18:04:35 +0400 Subject: [PATCH] [core] Fix handling of large chain reorgs in header sync Changes header sync to continue downloading headers until a higher accumulated PoW than the current tip is acheived. Added cucumber tests for sync with large reorg --- .../src/base_node/sync/header_sync/error.rs | 2 + .../src/base_node/sync/header_sync/mod.rs | 3 - .../sync/header_sync/synchronizer.rs | 298 +++++++++++++----- .../src/base_node/sync/header_sync/test.rs | 21 -- .../base_node/sync/header_sync/validator.rs | 63 ++-- .../src/chain_storage/accumulated_data.rs | 2 +- base_layer/core/src/chain_storage/async_db.rs | 2 + .../src/chain_storage/blockchain_database.rs | 26 ++ integration_tests/features/Sync.feature | 31 +- integration_tests/features/support/steps.js | 44 ++- integration_tests/helpers/baseNodeClient.js | 25 +- integration_tests/helpers/config.js | 2 +- integration_tests/helpers/util.js | 14 +- 13 files changed, 369 insertions(+), 164 deletions(-) delete mode 100644 base_layer/core/src/base_node/sync/header_sync/test.rs diff --git a/base_layer/core/src/base_node/sync/header_sync/error.rs b/base_layer/core/src/base_node/sync/header_sync/error.rs index 9d9a5e391d7..85428d87d85 100644 --- a/base_layer/core/src/base_node/sync/header_sync/error.rs +++ b/base_layer/core/src/base_node/sync/header_sync/error.rs @@ -59,4 +59,6 @@ pub enum BlockHeaderSyncError { ChainSplitNotFound(NodeId), #[error("Node could not find any other node with which to sync. Silence.")] NetworkSilence, + #[error("Invalid protocol response: {0}")] + InvalidProtocolResponse(String), } diff --git a/base_layer/core/src/base_node/sync/header_sync/mod.rs b/base_layer/core/src/base_node/sync/header_sync/mod.rs index b4392e7c641..a5214d422d1 100644 --- a/base_layer/core/src/base_node/sync/header_sync/mod.rs +++ b/base_layer/core/src/base_node/sync/header_sync/mod.rs @@ -20,9 +20,6 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#[cfg(test)] -mod test; - mod error; pub use error::BlockHeaderSyncError; diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index 2b815bbf5e0..20a380e66e5 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -24,7 +24,7 @@ use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError}; use crate::{ base_node::sync::{hooks::Hooks, rpc, BlockSyncConfig}, blocks::BlockHeader, - chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainBlock}, + chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainBlock, ChainHeader}, consensus::ConsensusManager, proof_of_work::randomx_factory::RandomXFactory, proto::{ @@ -37,7 +37,11 @@ use crate::{ }; use futures::{future, stream::FuturesUnordered, StreamExt}; use log::*; -use std::{convert::TryFrom, sync::Arc, time::Duration}; +use std::{ + convert::TryFrom, + sync::Arc, + time::{Duration, Instant}, +}; use tari_comms::{ connectivity::{ConnectivityError, ConnectivityRequester, ConnectivitySelection}, peer_manager::NodeId, @@ -47,6 +51,8 @@ use tari_comms::{ const LOG_TARGET: &str = "c::bn::header_sync"; +const NUM_INITIAL_HEADERS_TO_REQUEST: u64 = 1000; + pub struct HeaderSynchronizer<'a, B> { config: BlockSyncConfig, db: AsyncBlockchainDb, @@ -116,13 +122,16 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { debug!(target: LOG_TARGET, "Block header validation failed: {}", err); self.ban_peer_long(node_id, err.into()).await?; }, + Err(err @ BlockHeaderSyncError::InvalidBlockHeight(_, _)) => { + debug!(target: LOG_TARGET, "{}", err); + self.ban_peer_long(node_id, BanReason::GeneralHeaderSyncFailure(err)) + .await?; + }, Err(err) => { debug!( target: LOG_TARGET, "Failed to synchronize headers from peer `{}`: {}", node_id, err ); - self.ban_peer_long(node_id, BanReason::GeneralHeaderSyncFailure(err)) - .await?; }, } } @@ -253,13 +262,16 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { latency.unwrap_or_default().as_millis() ); - let sync_complete = self.check_chain_split(&peer, &mut client).await?; - // If sync is not complete after the chain split check, synchronize the rest of the headers - if !sync_complete { - self.synchronize_headers(&peer, &mut client).await?; + let sync_status = self.determine_sync_status(&peer, &mut client).await?; + match sync_status { + SyncStatus::InSync => Ok(()), + // We're ahead of this peer, try another peer if possible + SyncStatus::Ahead => Err(BlockHeaderSyncError::NotInSync), + SyncStatus::Lagging(split_info) => { + self.synchronize_headers(&peer, &mut client, *split_info).await?; + Ok(()) + }, } - - Ok(()) } async fn find_chain_split( @@ -290,9 +302,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { .await?; debug!( target: LOG_TARGET, - "Determining where our chain splits with the remote peer `{}` ({} block hashes sent)", + "Determining if chain splits between {} and {} headers back from peer `{}`", + offset, + offset + NUM_CHAIN_SPLIT_HEADERS, peer, - block_hashes.len() ); let request = FindChainSplitRequest { @@ -317,18 +330,24 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } } - /// Check for a chain split with the given peer, validate and add header state. State will be rewound if a higher - /// proof of work is achieved. If, after this step, the headers are already in sync, true is returned, otherwise a - /// header sync should proceed to download the remaining headers. - async fn check_chain_split( + /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information + /// of the chain split (see [SyncStatus]). + /// + /// If the local node is behind the remote chain (i.e. `SyncStatus::Lagging`), the appropriate `ChainSplitInfo` is + /// returned, the header validator is initialized and the preliminary headers are validated. + async fn determine_sync_status( &mut self, peer: &NodeId, client: &mut rpc::BaseNodeSyncRpcClient, - ) -> Result + ) -> Result { - const NUM_HEADERS_TO_REQUEST: u64 = 1000; - let (resp, block_hashes, steps_back) = self.find_chain_split(peer, client, NUM_HEADERS_TO_REQUEST).await?; - if resp.headers.len() > NUM_HEADERS_TO_REQUEST as usize { + // Fetch the local tip header at the beginning of the sync process + let local_tip_header = self.db.fetch_tip_header().await?; + + let (resp, block_hashes, steps_back) = self + .find_chain_split(peer, client, NUM_INITIAL_HEADERS_TO_REQUEST) + .await?; + if resp.headers.len() > NUM_INITIAL_HEADERS_TO_REQUEST as usize { self.ban_peer_long(peer.clone(), BanReason::PeerSentTooManyHeaders(resp.headers.len())) .await?; return Err(BlockHeaderSyncError::NotInSync); @@ -368,18 +387,14 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { peer, fork_hash_index ); - // Another peer will be attempted, if possible - return Err(BlockHeaderSyncError::NotInSync); + + return Ok(SyncStatus::Ahead); } debug!(target: LOG_TARGET, "Already in sync with peer `{}`.", peer); - return Ok(true); + return Ok(SyncStatus::InSync); } - // We can trust that the header associated with this hash exists because block_hashes is data this node - // supplied. usize conversion overflow has already been checked above - let chain_split_hash = block_hashes[fork_hash_index as usize].clone(); - let headers = headers .into_iter() .map(BlockHeader::try_from) @@ -387,8 +402,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?; let num_new_headers = headers.len(); - self.header_validator.initialize_state(chain_split_hash).await?; - let mut chain_headers = Vec::with_capacity(headers.len()); + // We can trust that the header associated with this hash exists because block_hashes is data this node + // supplied. usize conversion overflow has already been checked above + let chain_split_hash = block_hashes[fork_hash_index as usize].clone(); + + self.header_validator.initialize_state(chain_split_hash.clone()).await?; for header in headers { debug!( target: LOG_TARGET, @@ -396,53 +414,45 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { header.height, header.pow_algo(), ); - let height = header.height; - chain_headers.push(self.header_validator.validate_and_calculate_metadata(header)?); - debug!(target: LOG_TARGET, "Header #{} is VALID", height,) + self.header_validator.validate(header)?; } - if fork_hash_index > 0 { - // If the peer is telling us that we have to rewind, check their headers are stronger than our current tip - self.header_validator - .check_stronger_chain(chain_headers.last().expect("already_checked")) - .await?; - - // TODO: We've established that the peer has a chain that forks from ours, can provide valid headers for - // _part_ of that chain and is stronger than our chain at the same (or less) height. - // However, to know that the full chain is stronger than our tip header, we need to - // download all headers and compare. + debug!( + target: LOG_TARGET, + "Peer {} has submitted {} valid header(s)", peer, num_new_headers + ); - debug!(target: LOG_TARGET, "Rewinding the chain by {} block(s)", steps_back); - let blocks = self.rewind_blockchain(steps_back).await?; - self.hooks.call_on_rewind_hooks(blocks); + // Basic sanity check that the peer sent tip height greater than the split. + let split_height = local_tip_header.height().saturating_sub(steps_back); + if remote_tip_height < split_height { + self.ban_peer_short(peer.clone(), BanReason::PeerSentInvalidTipHeight { + actual: remote_tip_height, + expected: split_height, + }) + .await?; + return Err(BlockHeaderSyncError::InvalidProtocolResponse(format!( + "Peer {} sent invalid remote tip height", + peer + ))); } - let mut txn = self.db.write_transaction(); - let current_height = chain_headers.last().map(|h| h.height()).unwrap_or(remote_tip_height); - chain_headers.into_iter().for_each(|header| { - debug!(target: LOG_TARGET, "Adding header: #{}", header.header.height); - txn.insert_header(header.header, header.accumulated_data); - }); - txn.commit().await?; - - self.hooks - .call_on_progress_header_hooks(current_height, remote_tip_height, self.sync_peers); - - // If less headers were returned than requested, the peer is indicating that we have the tip header. - // To indicate that sync is complete, true is returned, otherwise false - Ok(num_new_headers < NUM_HEADERS_TO_REQUEST as usize) + let chain_split_info = ChainSplitInfo { + local_tip_header, + remote_tip_height, + reorg_steps_back: steps_back, + chain_split_hash, + }; + Ok(SyncStatus::Lagging(Box::new(chain_split_info))) } - async fn rewind_blockchain(&self, steps_back: u64) -> Result>, BlockHeaderSyncError> { + async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result>, BlockHeaderSyncError> { debug!( target: LOG_TARGET, - "Deleting {} header(s) that no longer form part of the main chain", steps_back + "Deleting headers that no longer form part of the main chain up until split at {}", + split_hash.to_hex() ); - let tip_header = self.db.fetch_last_header().await?; - let new_tip_height = tip_header.height - steps_back; - - let blocks = self.db.rewind_to_height(new_tip_height).await?; + let blocks = self.db.rewind_to_hash(split_hash).await?; debug!( target: LOG_TARGET, "Rewound {} block(s) in preparation for header sync", @@ -455,29 +465,55 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { &mut self, peer: &NodeId, client: &mut rpc::BaseNodeSyncRpcClient, + split_info: ChainSplitInfo, ) -> Result<(), BlockHeaderSyncError> { - let tip_header = self.db.fetch_last_header().await?; + const COMMIT_EVERY_N_HEADERS: usize = 1000; + + // Peer returned less than the max headers. This indicates that there are no further headers to request. + if self.header_validator.valid_headers().len() < NUM_INITIAL_HEADERS_TO_REQUEST as usize { + debug!(target: LOG_TARGET, "No further headers to download"); + if !self.pending_chain_has_higher_pow(&split_info.local_tip_header)? { + return Err(BlockHeaderSyncError::WeakerChain); + } + + debug!( + target: LOG_TARGET, + "Remote chain from peer {} has higher PoW. Switching", peer + ); + // PoW is higher, switching over to the new chain + self.switch_to_pending_chain(&split_info).await?; + + return Ok(()); + } + + // Find the hash to start syncing the rest of the headers. + // The expectation cannot fail because the number of headers has been checked in determine_sync_status + let start_header = + self.header_validator.valid_headers().last().expect( + "synchronize_headers: expected there to be at least one valid pending header but there were none", + ); + debug!( target: LOG_TARGET, - "Requesting header stream starting from tip header #{} from peer `{}`", tip_header.height, peer + "Download remaining headers starting from header #{} from peer `{}`", start_header.header.height, peer ); let request = SyncHeadersRequest { - start_hash: tip_header.hash(), + start_hash: start_header.header.hash(), // To the tip! count: 0, }; - let mut header_stream = client.sync_headers(request).await?; + let mut header_stream = client.sync_headers(request).await?; debug!(target: LOG_TARGET, "Reading headers from peer `{}`", peer); - // Reset the header validator state to be sure we're using the correct data - self.header_validator.initialize_state(tip_header.hash()).await?; + let mut has_switched_to_new_chain = false; + while let Some(header) = header_stream.next().await { let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?; debug!( target: LOG_TARGET, - "Validating and adding header: #{} (PoW = {}), ", + "Validating header: #{} (PoW = {})", header.height, header.pow_algo() ); @@ -493,17 +529,101 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { ); continue; } - let chain_header = self.header_validator.validate_and_calculate_metadata(header)?; - let current_height = chain_header.height(); - self.db - .write_transaction() - .insert_header(chain_header.header, chain_header.accumulated_data) - .commit() - .await?; + let current_height = header.height; + self.header_validator.validate(header)?; + + if has_switched_to_new_chain { + // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers + if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS { + self.commit_pending_headers().await?; + } + } else { + // The remote chain has not (yet) been accepted. + // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is + // achieved. + if self.pending_chain_has_higher_pow(&split_info.local_tip_header)? { + self.switch_to_pending_chain(&split_info).await?; + has_switched_to_new_chain = true; + } + } self.hooks - .call_on_progress_header_hooks(current_height, current_height, self.sync_peers); + .call_on_progress_header_hooks(current_height, split_info.remote_tip_height, self.sync_peers); + } + + if !has_switched_to_new_chain { + return Err(BlockHeaderSyncError::WeakerChain); + } + + // Commit the last blocks that don't fit into the COMMIT_EVENT_N_HEADERS blocks + if !self.header_validator.valid_headers().is_empty() { + self.commit_pending_headers().await?; } + + Ok(()) + } + + async fn commit_pending_headers(&mut self) -> Result { + let chain_headers = self.header_validator.take_valid_headers(); + let num_headers = chain_headers.len(); + let start = Instant::now(); + + let new_tip = chain_headers.last().cloned().unwrap(); + let mut txn = self.db.write_transaction(); + chain_headers.into_iter().for_each(|chain_header| { + txn.insert_header(chain_header.header, chain_header.accumulated_data); + }); + + txn.commit().await?; + + debug!( + target: LOG_TARGET, + "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}", + num_headers, + new_tip.height(), + start.elapsed() + ); + + Ok(new_tip) + } + + fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> Result { + let chain_headers = self.header_validator.valid_headers(); + if chain_headers.is_empty() { + return Ok(false); + } + + // Check that the remote tip is stronger than the local tip + let proposed_tip = chain_headers.last().unwrap(); + match self.header_validator.check_stronger_chain(current_tip, proposed_tip) { + Ok(_) => Ok(true), + Err(BlockHeaderSyncError::WeakerChain) => Ok(false), + Err(err) => Err(err), + } + } + + async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> { + // Reorg if required + if split_info.reorg_steps_back > 0 { + debug!( + target: LOG_TARGET, + "Reorg: Rewinding the chain by {} block(s) (split hash = {})", + split_info.reorg_steps_back, + split_info.chain_split_hash.to_hex() + ); + let blocks = self.rewind_blockchain(split_info.chain_split_hash.clone()).await?; + // NOTE: `blocks` only contains full blocks that were reorged out, and not the headers. + // This may be unexpected for implementers of the rewind hook. + self.hooks.call_on_rewind_hooks(blocks); + } + + // Commit the forked chain. At this point + // 1. Headers have been validated + // 2. The forked chain has a higher PoW than the local chain + // + // After this we commit headers every `n` blocks + self.commit_pending_headers().await?; + Ok(()) } } @@ -512,6 +632,8 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { enum BanReason { #[error("This peer sent too many headers ({0}) in response to a chain split request")] PeerSentTooManyHeaders(usize), + #[error("This peer sent an invalid tip height {actual} expected a height greater than or equal to {expected}")] + PeerSentInvalidTipHeight { actual: u64, expected: u64 }, #[error( "This peer sent a split hash index ({fork_hash_index}) greater than the number of block hashes sent \ ({num_block_hashes})" @@ -529,3 +651,19 @@ enum BanReason { #[error("Peer did not respond timeously during RPC negotiation")] RpcNegotiationTimedOut, } + +struct ChainSplitInfo { + local_tip_header: ChainHeader, + remote_tip_height: u64, + reorg_steps_back: u64, + chain_split_hash: HashOutput, +} + +enum SyncStatus { + /// Local and remote node are in sync + InSync, + /// Local node is ahead of the remote node + Ahead, + /// Local node is lagging behind remote node + Lagging(Box), +} diff --git a/base_layer/core/src/base_node/sync/header_sync/test.rs b/base_layer/core/src/base_node/sync/header_sync/test.rs deleted file mode 100644 index c27791475dc..00000000000 --- a/base_layer/core/src/base_node/sync/header_sync/test.rs +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2020, The Tari Project -// -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the -// following disclaimer in the documentation and/or other materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote -// products derived from this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/base_layer/core/src/base_node/sync/header_sync/validator.rs b/base_layer/core/src/base_node/sync/header_sync/validator.rs index e015c843d31..75ab0363e85 100644 --- a/base_layer/core/src/base_node/sync/header_sync/validator.rs +++ b/base_layer/core/src/base_node/sync/header_sync/validator.rs @@ -30,7 +30,6 @@ use crate::{ BlockchainBackend, ChainHeader, ChainStorageError, - Optional, TargetDifficulties, }, common::rolling_vec::RollingVec, @@ -64,6 +63,7 @@ struct State { timestamps: RollingVec, target_difficulties: TargetDifficulties, previous_accum: BlockHeaderAccumulatedData, + valid_headers: Vec, } impl BlockHeaderSyncValidator { @@ -105,23 +105,21 @@ impl BlockHeaderSyncValidator { timestamps, target_difficulties, previous_accum, + // One large allocation is usually better even if it is not always used. + valid_headers: Vec::with_capacity(1000), }); Ok(()) } - pub fn validate_and_calculate_metadata( - &mut self, - header: BlockHeader, - ) -> Result - { - let expected_height = self.state().current_height + 1; + pub fn validate(&mut self, header: BlockHeader) -> Result<(), BlockHeaderSyncError> { + let state = self.state(); + let expected_height = state.current_height + 1; if header.height != expected_height { return Err(BlockHeaderSyncError::InvalidBlockHeight(expected_height, header.height)); } check_timestamp_ftl(&header, &self.consensus_rules)?; - let state = self.state(); check_header_timestamp_greater_than_median(&header, &state.timestamps)?; let constants = self.consensus_rules.consensus_constants(header.height); @@ -154,29 +152,38 @@ impl BlockHeaderSyncValidator { // Add a "more recent" datapoint onto the target difficulty state.target_difficulties.add_back(&header, target_difficulty); state.previous_accum = metadata.clone(); - - Ok(ChainHeader { + state.valid_headers.push(ChainHeader { header, accumulated_data: metadata, - }) + }); + + Ok(()) } - pub async fn check_stronger_chain(&mut self, their_header: &ChainHeader) -> Result<(), BlockHeaderSyncError> { - // Compare their header to ours at the same height, or if we don't have a header at that height, our current tip - // header - let our_header = match self - .db - .fetch_header_and_accumulated_data(their_header.height()) - .await - .optional()? - { - Some(h) => ChainHeader { - header: h.0, - accumulated_data: h.1, - }, - None => self.db.fetch_tip_header().await?, - }; + /// Drains and returns all the headers that were validated. + /// + /// ## Panics + /// + /// Panics if initialize_state was not called prior to calling this function + pub fn take_valid_headers(&mut self) -> Vec { + self.state_mut().valid_headers.drain(..).collect::>() + } + /// Returns a slice containing the current valid headers + /// + /// ## Panics + /// + /// Panics if initialize_state was not called prior to calling this function + pub fn valid_headers(&self) -> &[ChainHeader] { + &self.state().valid_headers + } + + pub fn check_stronger_chain( + &self, + our_header: &ChainHeader, + their_header: &ChainHeader, + ) -> Result<(), BlockHeaderSyncError> + { debug!( target: LOG_TARGET, "Comparing PoW on remote header #{} and local header #{}", @@ -189,8 +196,8 @@ impl BlockHeaderSyncValidator { .chain_strength_comparer() .compare(&our_header, their_header) { - Ordering::Less => Ok(()), - Ordering::Equal | Ordering::Greater => Err(BlockHeaderSyncError::WeakerChain), + Ordering::Less | Ordering::Equal => Ok(()), + Ordering::Greater => Err(BlockHeaderSyncError::WeakerChain), } } diff --git a/base_layer/core/src/chain_storage/accumulated_data.rs b/base_layer/core/src/chain_storage/accumulated_data.rs index a346f5e5707..b4ad4041c28 100644 --- a/base_layer/core/src/chain_storage/accumulated_data.rs +++ b/base_layer/core/src/chain_storage/accumulated_data.rs @@ -287,7 +287,7 @@ impl Display for BlockHeaderAccumulatedData { } } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct ChainHeader { pub header: BlockHeader, pub accumulated_data: BlockHeaderAccumulatedData, diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index 83901875d29..ff1c12b6232 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -156,6 +156,8 @@ impl AsyncBlockchainDb { make_async_fn!(rewind_to_height(height: u64) -> Vec>, "rewind_to_height"); + make_async_fn!(rewind_to_hash(hash: BlockHash) -> Vec>, "rewind_to_hash"); + //---------------------------------- Headers --------------------------------------------// make_async_fn!(fetch_header(height: u64) -> Option, "fetch_header"); diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 97a4478a364..aec40eb1a68 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -972,6 +972,17 @@ where B: BlockchainBackend rewind_to_height(&mut *db, height) } + /// Rewind the blockchain state to the block hash making the block at that hash the new tip. + /// Returns the removed blocks. + /// + /// The operation will fail if + /// * The block hash does not exist + /// * The block hash is before the horizon block height determined by the pruning horizon + pub fn rewind_to_hash(&self, hash: BlockHash) -> Result>, ChainStorageError> { + let mut db = self.db_write_access()?; + rewind_to_hash(&mut *db, hash) + } + pub fn fetch_horizon_data(&self) -> Result, ChainStorageError> { let db = self.db_read_access()?; db.fetch_horizon_data() @@ -1496,6 +1507,21 @@ fn rewind_to_height(db: &mut T, height: u64) -> Result( + db: &mut T, + block_hash: BlockHash, +) -> Result>, ChainStorageError> +{ + let block_hash_hex = block_hash.to_hex(); + let target_header = + fetch_header_by_block_hash(&*db, block_hash)?.ok_or_else(|| ChainStorageError::ValueNotFound { + entity: "BlockHeader".to_string(), + field: "block_hash".to_string(), + value: block_hash_hex, + })?; + rewind_to_height(db, target_header.height) +} + // Checks whether we should add the block as an orphan. If it is the case, the orphan block is added and the chain // is reorganised if necessary. fn handle_possible_reorg( diff --git a/integration_tests/features/Sync.feature b/integration_tests/features/Sync.feature index a6ef08273d2..dab51e12463 100644 --- a/integration_tests/features/Sync.feature +++ b/integration_tests/features/Sync.feature @@ -32,9 +32,37 @@ Feature: Block Sync Then NODE1 should have 11 peers Then NODE2 should have 11 peers + @critical @reorg + Scenario: Full block sync with small reorg + Given I have a base node NODE1 connected to all seed nodes + Given I have a base node NODE2 connected to node NODE1 + When I mine 5 blocks on NODE1 + Then all nodes are at height 5 + Given I stop NODE2 + Then I mine 5 blocks on NODE1 + Given I stop NODE1 + And I start NODE2 + Then I mine 7 blocks on NODE2 + When I start NODE1 + Then all nodes are on the same chain at height 12 + + @critical @reorg @long-running + Scenario: Full block sync with large reorg + Given I have a base node NODE1 connected to all seed nodes + Given I have a base node NODE2 connected to node NODE1 + When I mine 5 blocks on NODE1 + Then all nodes are at height 5 + Given I stop NODE2 + Then I mine 1001 blocks on NODE1 + Given I stop NODE1 + And I start NODE2 + Then I mine 1500 blocks on NODE2 + When I start NODE1 + Then all nodes are on the same chain at height 1505 + @critical Scenario: Pruned mode - #TODO: Merge steps into single lines + # TODO: Merge steps into single lines Given I have a base node NODE1 connected to all seed nodes When I mine a block on NODE1 with coinbase CB1 When I mine a block on NODE1 with coinbase CB2 @@ -72,4 +100,3 @@ Feature: Block Sync And I mine 6 blocks on PNODE2 When I start NODE1 Then all nodes are at height 20 - diff --git a/integration_tests/features/support/steps.js b/integration_tests/features/support/steps.js index 5e81e732a27..86ac8ece6c8 100644 --- a/integration_tests/features/support/steps.js +++ b/integration_tests/features/support/steps.js @@ -6,7 +6,7 @@ const MergeMiningProxyProcess = require('../../helpers/mergeMiningProxyProcess') const WalletProcess = require('../../helpers/walletProcess'); const expect = require('chai').expect; const {waitFor, getTransactionOutputHash, sleep, consoleLogTransactionDetails, consoleLogBalance, - consoleLogCoinbaseDetails} = require('../../helpers/util'); + consoleLogCoinbaseDetails, withTimeout} = require('../../helpers/util'); const TransactionBuilder = require('../../helpers/transactionBuilder'); let lastResult; @@ -29,7 +29,7 @@ Given(/I have a base node (.*) connected to all seed nodes/, {timeout: 20*1000}, miner.setPeerSeeds([this.seedAddresses()]); await miner.startNew(); this.addNode(name, miner); - }); +}); Given(/I have a base node (.*) connected to seed (.*)/, {timeout: 20*1000}, async function (name, seedNode) { const miner = this.createNode(name); @@ -47,8 +47,6 @@ Given(/I have a base node (.*) connected to node (.*)/, {timeout: 20*1000}, asyn await sleep(1000); }); - - Given(/I have a pruned node (.*) connected to node (.*)/, {timeout: 20*1000}, async function (name, node) { const miner = this.createNode(name, { pruningHorizon: 5}); miner.setPeerSeeds([this.nodes[node].peerAddress()]); @@ -162,6 +160,23 @@ Then(/node (.*) is at height (\d+)/, {timeout: 120*1000}, async function (name, expect(await client.getTipHeight()).to.equal(height); }); +Then('all nodes are on the same chain at height {int}', {timeout: 1200*1000}, async function (height) { + let tipHash = null; + await this.forEachClientAsync(async (client, name) => { + await waitFor(async() => client.getTipHeight(), height, 115*1000); + const currTip = await client.getTipHeader(); + expect(currTip.height).to.equal(height); + if (!tipHash) { + tipHash = currTip.hash.toString('hex'); + console.log(`Node ${name} is at tip: ${tipHash}`); + } else { + let currTipHash = currTip.hash.toString('hex'); + console.log(`Node ${name} is at tip: ${currTipHash} (should be ${tipHash})`); + expect(currTipHash).to.equal(tipHash); + } + }) +}); + Then('all nodes are at height {int}', {timeout: 1200*1000},async function (height) { await this.forEachClientAsync(async (client, name) => { await waitFor(async() => client.getTipHeight(), height, 115*1000); @@ -177,7 +192,7 @@ Then('all nodes are at current tip height', {timeout: 1200*1000},async function await this.forEachClientAsync(async (client, name) => { await waitFor(async() => client.getTipHeight(), height, 1200*1000); const currTip = await client.getTipHeight(); - console.log(`Node ${name} is at tip: ${currTip} (should be`, height, `)`); + console.log(`Node ${name} is at tip: ${currTip} (expected ${height})`); expect(currTip).to.equal(height); }) }); @@ -314,21 +329,24 @@ Then(/node (.*) is at tip (.*)/, async function (node, name) { }); When(/I mine a block on (.*) with coinbase (.*)/, {timeout: 600*1000}, async function (name, coinbaseName) { - await this.mineBlock(name, 0, candidate => { - this.addOutput(coinbaseName, candidate.originalTemplate.coinbase); - return candidate; - }); + await this.mineBlock(name, 0, candidate => { + this.addOutput(coinbaseName, candidate.originalTemplate.coinbase); + return candidate; + }); + this.tipHeight += 1; }); -When(/I mine (\d+) custom weight blocks on (.*) with weight (\d+)/, {timeout: 600*1000}, async function (numBlocks, name, weight) { +When(/I mine (\d+) custom weight blocks on (.*) with weight (\d+)/, {timeout: -1}, async function (numBlocks, name, weight) { for(let i=0;i { - return header; - }) + return this.client.listHeaders().sendMessage({from_height: 0, num_headers: 1}).then(headers => { + const header = headers[0]; + return Object.assign(header,{ + height: +header.height, + }); + }); + } + + getTipHeight() { + return this.client.getTipInfo() + .sendMessage({}) + .then(tip => parseInt(tip.metadata.height_of_longest_chain)); } getPreviousBlockTemplate(height) { @@ -122,14 +131,6 @@ class BaseNodeClient { ); } - getTipHeight() { - return this.client.getTipInfo() - .sendMessage({}) - .then(tip => { - return parseInt(tip.metadata.height_of_longest_chain); - }); - } - fetchMatchingUtxos(hashes) { return this.client.fetchMatchingUtxos() .sendMessage({hashes: hashes}) @@ -219,7 +220,7 @@ class BaseNodeClient { async mineBlockWithoutWallet(beforeSubmit, weight, onError) { let template = await this.getMinedCandidateBlock(weight); return this.submitTemplate(template, beforeSubmit).then(async () => { - let tip = await this.getTipHeight(); + // let tip = await this.getTipHeight(); // console.log("Node is at tip:", tip); }, err => { console.log("err submitting block:", err); diff --git a/integration_tests/helpers/config.js b/integration_tests/helpers/config.js index 857521d9c80..864a24c5cdd 100644 --- a/integration_tests/helpers/config.js +++ b/integration_tests/helpers/config.js @@ -82,7 +82,7 @@ function createEnv(name="config_identity", isWallet=false, nodeFile="newnodeid.j TARI_BASE_NODE__LOCALNET__GRPC_BASE_NODE_ADDRESS: `${baseNodeGrpcAddress}:${baseNodeGrpcPort}`, TARI_BASE_NODE__LOCALNET__GRPC_CONSOLE_WALLET_ADDRESS: `${walletGrpcAddress}:${walletGrpcPort}`, TARI_BASE_NODE__LOCALNET__BASE_NODE_IDENTITY_FILE: `${nodeFile}`, - TARI_BASE_NODE__LOCALNET__TCP_LISTENER_ADDRESS: "/ip4/0.0.0.0/tcp/" + (isWallet ? `${walletPort}` : `${baseNodePort}`), + TARI_BASE_NODE__LOCALNET__TCP_LISTENER_ADDRESS: "/ip4/127.0.0.1/tcp/" + (isWallet ? `${walletPort}` : `${baseNodePort}`), TARI_BASE_NODE__LOCALNET__PUBLIC_ADDRESS: "/ip4/127.0.0.1/tcp/" + (isWallet ? `${walletPort}` : `${baseNodePort}`), TARI_MERGE_MINING_PROXY__LOCALNET__PROXY_HOST_ADDRESS: `${proxyFullAddress}`, TARI_BASE_NODE__LOCALNET__TRANSPORT: "tcp", diff --git a/integration_tests/helpers/util.js b/integration_tests/helpers/util.js index 34ed9d44c6d..c056404ff69 100644 --- a/integration_tests/helpers/util.js +++ b/integration_tests/helpers/util.js @@ -1,8 +1,8 @@ -var net = require('net'); +const net = require('net'); const fs = require('fs'); const readline = require('readline'); -var {blake2bInit, blake2bUpdate, blake2bFinal} = require('blakejs'); +const {blake2bInit, blake2bUpdate, blake2bFinal} = require('blakejs'); function getRandomInt(min, max) { min = Math.ceil(min); @@ -16,6 +16,13 @@ function sleep(ms) { }); } +function withTimeout(ms, promise, message = '') { + const timeout = new Promise((resolve, reject) => { + setTimeout(() => reject(new Error(message || `Timed out after ${ms}ms`)), ms); + }); + return Promise.race([timeout, promise]); +} + async function waitFor(asyncTestFn, toBe, maxTime, timeOut=500, skipLog=50) { var now = new Date(); @@ -184,5 +191,6 @@ module.exports = { hexSwitchEndianness, consoleLogTransactionDetails, consoleLogBalance, - consoleLogCoinbaseDetails + consoleLogCoinbaseDetails, + withTimeout, };