Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[base-node] Fix handling of large chain reorgs in header sync #2759

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions base_layer/core/src/base_node/sync/header_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ pub enum BlockHeaderSyncError {
NotInSync,
#[error("Unable to locate start hash `{0}`")]
StartHashNotFound(String),
#[error("Expected header height {0} got {1}")]
InvalidBlockHeight(u64, u64),
#[error("Expected header height {expected} got {actual}")]
InvalidBlockHeight { expected: u64, actual: u64 },
#[error("Unable to find chain split from peer `{0}`")]
ChainSplitNotFound(NodeId),
#[error("Node could not find any other node with which to sync. Silence.")]
NetworkSilence,
#[error("Invalid protocol response: {0}")]
InvalidProtocolResponse(String),
}
3 changes: 0 additions & 3 deletions base_layer/core/src/base_node/sync/header_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#[cfg(test)]
mod test;

mod error;
pub use error::BlockHeaderSyncError;

Expand Down
299 changes: 218 additions & 81 deletions base_layer/core/src/base_node/sync/header_sync/synchronizer.rs

Large diffs are not rendered by default.

21 changes: 0 additions & 21 deletions base_layer/core/src/base_node/sync/header_sync/test.rs

This file was deleted.

176 changes: 148 additions & 28 deletions base_layer/core/src/base_node/sync/header_sync/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::{
BlockchainBackend,
ChainHeader,
ChainStorageError,
Optional,
TargetDifficulties,
},
common::rolling_vec::RollingVec,
Expand Down Expand Up @@ -64,6 +63,7 @@ struct State {
timestamps: RollingVec<EpochTime>,
target_difficulties: TargetDifficulties,
previous_accum: BlockHeaderAccumulatedData,
valid_headers: Vec<ChainHeader>,
}

impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
Expand Down Expand Up @@ -105,23 +105,24 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
timestamps,
target_difficulties,
previous_accum,
// One large allocation is usually better even if it is not always used.
valid_headers: Vec::with_capacity(1000),
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
});

Ok(())
}

pub fn validate_and_calculate_metadata(
&mut self,
header: BlockHeader,
) -> Result<ChainHeader, BlockHeaderSyncError>
{
let expected_height = self.state().current_height + 1;
pub fn validate(&mut self, header: BlockHeader) -> Result<(), BlockHeaderSyncError> {
let state = self.state();
let expected_height = state.current_height + 1;
if header.height != expected_height {
return Err(BlockHeaderSyncError::InvalidBlockHeight(expected_height, header.height));
return Err(BlockHeaderSyncError::InvalidBlockHeight {
expected: expected_height,
actual: header.height,
});
}
check_timestamp_ftl(&header, &self.consensus_rules)?;

let state = self.state();
check_header_timestamp_greater_than_median(&header, &state.timestamps)?;

let constants = self.consensus_rules.consensus_constants(header.height);
Expand Down Expand Up @@ -154,29 +155,38 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
// Add a "more recent" datapoint onto the target difficulty
state.target_difficulties.add_back(&header, target_difficulty);
state.previous_accum = metadata.clone();

Ok(ChainHeader {
state.valid_headers.push(ChainHeader {
header,
accumulated_data: metadata,
})
});

Ok(())
}

pub async fn check_stronger_chain(&mut self, their_header: &ChainHeader) -> Result<(), BlockHeaderSyncError> {
// Compare their header to ours at the same height, or if we don't have a header at that height, our current tip
// header
let our_header = match self
.db
.fetch_header_and_accumulated_data(their_header.height())
.await
.optional()?
{
Some(h) => ChainHeader {
header: h.0,
accumulated_data: h.1,
},
None => self.db.fetch_tip_header().await?,
};
/// Drains and returns all the headers that were validated.
///
/// ## Panics
///
/// Panics if initialize_state was not called prior to calling this function
pub fn take_valid_headers(&mut self) -> Vec<ChainHeader> {
self.state_mut().valid_headers.drain(..).collect::<Vec<_>>()
}

/// Returns a slice containing the current valid headers
///
/// ## Panics
///
/// Panics if initialize_state was not called prior to calling this function
pub fn valid_headers(&self) -> &[ChainHeader] {
&self.state().valid_headers
}

pub fn check_stronger_chain(
&self,
our_header: &ChainHeader,
their_header: &ChainHeader,
) -> Result<(), BlockHeaderSyncError>
{
debug!(
target: LOG_TARGET,
"Comparing PoW on remote header #{} and local header #{}",
Expand All @@ -190,7 +200,7 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
.compare(&our_header, their_header)
{
Ordering::Less => Ok(()),
Ordering::Equal | Ordering::Greater => Err(BlockHeaderSyncError::WeakerChain),
Ordering::Greater | Ordering::Equal => Err(BlockHeaderSyncError::WeakerChain),
}
}

Expand All @@ -206,3 +216,113 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
.expect("state() called before state was initialized (using the `begin` method)")
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::{
blocks::BlockHeader,
chain_storage::{async_db::AsyncBlockchainDb, BlockHeaderAccumulatedData},
consensus::{ConsensusManager, Network},
crypto::tari_utilities::{hex::Hex, Hashable},
proof_of_work::{randomx_factory::RandomXFactory, PowAlgorithm},
test_helpers::blockchain::{create_new_blockchain, TempDatabase},
};
use tari_test_utils::unpack_enum;

fn setup() -> (BlockHeaderSyncValidator<TempDatabase>, AsyncBlockchainDb<TempDatabase>) {
let rules = ConsensusManager::builder(Network::LocalNet).build();
let randomx_factory = RandomXFactory::default();
let db = create_new_blockchain();
(
BlockHeaderSyncValidator::new(db.clone().into(), rules, randomx_factory),
db.into(),
)
}

async fn setup_with_headers(
n: usize,
) -> (
BlockHeaderSyncValidator<TempDatabase>,
AsyncBlockchainDb<TempDatabase>,
ChainHeader,
) {
let (validator, db) = setup();
let mut tip = db.fetch_tip_header().await.unwrap();
for _ in 0..n {
let mut header = BlockHeader::from_previous(&tip.header).unwrap();
// Needed to have unique keys for the blockchain db mmr count indexes (MDB_KEY_EXIST error)
header.kernel_mmr_size += 1;
header.output_mmr_size += 1;
let acc_data = BlockHeaderAccumulatedData {
hash: header.hash(),
..Default::default()
};

db.insert_valid_headers(vec![(header.clone(), acc_data.clone())])
.await
.unwrap();
tip = ChainHeader {
header,
accumulated_data: acc_data,
};
}

(validator, db, tip)
}

mod initialize_state {
use super::*;

#[tokio_macros::test_basic]
async fn it_initializes_state_to_given_header() {
let (mut validator, _, tip) = setup_with_headers(1).await;
validator.initialize_state(tip.header.hash()).await.unwrap();
let state = validator.state();
assert!(state.valid_headers.is_empty());
assert_eq!(state.target_difficulties.get(PowAlgorithm::Sha3).len(), 2);
assert!(state.target_difficulties.get(PowAlgorithm::Monero).is_empty());
assert_eq!(state.timestamps.len(), 2);
assert_eq!(state.current_height, 1);
}

#[tokio_macros::test_basic]
async fn it_errors_if_hash_does_not_exist() {
let (mut validator, _) = setup();
let start_hash = vec![0; 32];
let err = validator.initialize_state(start_hash.clone()).await.unwrap_err();
unpack_enum!(BlockHeaderSyncError::StartHashNotFound(hash) = err);
assert_eq!(hash, start_hash.to_hex());
}
}

mod validate {
use super::*;

#[tokio_macros::test_basic]
async fn it_passes_if_headers_are_valid() {
let (mut validator, _, tip) = setup_with_headers(1).await;
validator.initialize_state(tip.header.hash()).await.unwrap();
assert!(validator.valid_headers().is_empty());
let next = BlockHeader::from_previous(&tip.header).unwrap();
validator.validate(next).unwrap();
assert_eq!(validator.valid_headers().len(), 1);
let tip = validator.valid_headers().last().cloned().unwrap();
let next = BlockHeader::from_previous(&tip.header).unwrap();
validator.validate(next).unwrap();
assert_eq!(validator.valid_headers().len(), 2);
}

#[tokio_macros::test_basic]
async fn it_fails_if_height_is_not_serial() {
let (mut validator, _, tip) = setup_with_headers(2).await;
validator.initialize_state(tip.header.hash()).await.unwrap();
let mut next = BlockHeader::from_previous(&tip.header).unwrap();
next.height = 10;
let err = validator.validate(next).unwrap_err();
unpack_enum!(BlockHeaderSyncError::InvalidBlockHeight { expected, actual } = err);
assert_eq!(actual, 10);
assert_eq!(expected, 3);
}
}
}
2 changes: 1 addition & 1 deletion base_layer/core/src/chain_storage/accumulated_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl Display for BlockHeaderAccumulatedData {
}
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ChainHeader {
pub header: BlockHeader,
pub accumulated_data: BlockHeaderAccumulatedData,
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(rewind_to_height(height: u64) -> Vec<Arc<ChainBlock>>, "rewind_to_height");

make_async_fn!(rewind_to_hash(hash: BlockHash) -> Vec<Arc<ChainBlock>>, "rewind_to_hash");

//---------------------------------- Headers --------------------------------------------//
make_async_fn!(fetch_header(height: u64) -> Option<BlockHeader>, "fetch_header");

Expand Down
26 changes: 26 additions & 0 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,17 @@ where B: BlockchainBackend
rewind_to_height(&mut *db, height)
}

/// Rewind the blockchain state to the block hash making the block at that hash the new tip.
/// Returns the removed blocks.
///
/// The operation will fail if
/// * The block hash does not exist
/// * The block hash is before the horizon block height determined by the pruning horizon
pub fn rewind_to_hash(&self, hash: BlockHash) -> Result<Vec<Arc<ChainBlock>>, ChainStorageError> {
let mut db = self.db_write_access()?;
rewind_to_hash(&mut *db, hash)
}

pub fn fetch_horizon_data(&self) -> Result<Option<HorizonData>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_horizon_data()
Expand Down Expand Up @@ -1537,6 +1548,21 @@ fn rewind_to_height<T: BlockchainBackend>(
Ok(removed_blocks)
}

fn rewind_to_hash<T: BlockchainBackend>(
db: &mut T,
block_hash: BlockHash,
) -> Result<Vec<Arc<ChainBlock>>, ChainStorageError>
{
let block_hash_hex = block_hash.to_hex();
let target_header =
fetch_header_by_block_hash(&*db, block_hash)?.ok_or_else(|| ChainStorageError::ValueNotFound {
entity: "BlockHeader".to_string(),
field: "block_hash".to_string(),
value: block_hash_hex,
})?;
rewind_to_height(db, target_header.height)
}

// Checks whether we should add the block as an orphan. If it is the case, the orphan block is added and the chain
// is reorganised if necessary.
fn handle_possible_reorg<T: BlockchainBackend>(
Expand Down
30 changes: 29 additions & 1 deletion integration_tests/features/Sync.feature
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,37 @@ Feature: Block Sync
Then NODE1 should have 11 peers
Then NODE2 should have 11 peers

@critical @reorg
Scenario: Full block sync with small reorg
Given I have a base node NODE1 connected to all seed nodes
Given I have a base node NODE2 connected to node NODE1
When I mine 5 blocks on NODE1
Then all nodes are at height 5
Given I stop NODE2
Then I mine 5 blocks on NODE1
Given I stop NODE1
And I start NODE2
Then I mine 7 blocks on NODE2
When I start NODE1
Then all nodes are on the same chain at height 12

@critical @reorg @long-running
Scenario: Full block sync with large reorg
Given I have a base node NODE1 connected to all seed nodes
Given I have a base node NODE2 connected to node NODE1
When I mine 5 blocks on NODE1
Then all nodes are at height 5
Given I stop NODE2
Then I mine 1001 blocks on NODE1
Given I stop NODE1
And I start NODE2
Then I mine 1500 blocks on NODE2
When I start NODE1
Then all nodes are on the same chain at height 1505

@critical
Scenario: Pruned mode
#TODO: Merge steps into single lines
# TODO: Merge steps into single lines
Given I have a base node NODE1 connected to all seed nodes
When I mine a block on NODE1 with coinbase CB1
When I mine a block on NODE1 with coinbase CB2
Expand Down
Loading