Skip to content

Commit

Permalink
[core] Fix handling of large chain reorgs in header sync
Browse files Browse the repository at this point in the history
Changes header sync to continue downloading headers until a higher
accumulated PoW than the current tip is acheived.

Added cucumber tests for sync with large reorg
  • Loading branch information
sdbondi committed Mar 22, 2021
1 parent 95499a7 commit baa411f
Show file tree
Hide file tree
Showing 13 changed files with 369 additions and 164 deletions.
2 changes: 2 additions & 0 deletions base_layer/core/src/base_node/sync/header_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@ pub enum BlockHeaderSyncError {
ChainSplitNotFound(NodeId),
#[error("Node could not find any other node with which to sync. Silence.")]
NetworkSilence,
#[error("Invalid protocol response: {0}")]
InvalidProtocolResponse(String),
}
3 changes: 0 additions & 3 deletions base_layer/core/src/base_node/sync/header_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#[cfg(test)]
mod test;

mod error;
pub use error::BlockHeaderSyncError;

Expand Down
298 changes: 218 additions & 80 deletions base_layer/core/src/base_node/sync/header_sync/synchronizer.rs

Large diffs are not rendered by default.

21 changes: 0 additions & 21 deletions base_layer/core/src/base_node/sync/header_sync/test.rs

This file was deleted.

63 changes: 35 additions & 28 deletions base_layer/core/src/base_node/sync/header_sync/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::{
BlockchainBackend,
ChainHeader,
ChainStorageError,
Optional,
TargetDifficulties,
},
common::rolling_vec::RollingVec,
Expand Down Expand Up @@ -64,6 +63,7 @@ struct State {
timestamps: RollingVec<EpochTime>,
target_difficulties: TargetDifficulties,
previous_accum: BlockHeaderAccumulatedData,
valid_headers: Vec<ChainHeader>,
}

impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
Expand Down Expand Up @@ -105,23 +105,21 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
timestamps,
target_difficulties,
previous_accum,
// One large allocation is usually better even if it is not always used.
valid_headers: Vec::with_capacity(1000),
});

Ok(())
}

pub fn validate_and_calculate_metadata(
&mut self,
header: BlockHeader,
) -> Result<ChainHeader, BlockHeaderSyncError>
{
let expected_height = self.state().current_height + 1;
pub fn validate(&mut self, header: BlockHeader) -> Result<(), BlockHeaderSyncError> {
let state = self.state();
let expected_height = state.current_height + 1;
if header.height != expected_height {
return Err(BlockHeaderSyncError::InvalidBlockHeight(expected_height, header.height));
}
check_timestamp_ftl(&header, &self.consensus_rules)?;

let state = self.state();
check_header_timestamp_greater_than_median(&header, &state.timestamps)?;

let constants = self.consensus_rules.consensus_constants(header.height);
Expand Down Expand Up @@ -154,29 +152,38 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
// Add a "more recent" datapoint onto the target difficulty
state.target_difficulties.add_back(&header, target_difficulty);
state.previous_accum = metadata.clone();

Ok(ChainHeader {
state.valid_headers.push(ChainHeader {
header,
accumulated_data: metadata,
})
});

Ok(())
}

pub async fn check_stronger_chain(&mut self, their_header: &ChainHeader) -> Result<(), BlockHeaderSyncError> {
// Compare their header to ours at the same height, or if we don't have a header at that height, our current tip
// header
let our_header = match self
.db
.fetch_header_and_accumulated_data(their_header.height())
.await
.optional()?
{
Some(h) => ChainHeader {
header: h.0,
accumulated_data: h.1,
},
None => self.db.fetch_tip_header().await?,
};
/// Drains and returns all the headers that were validated.
///
/// ## Panics
///
/// Panics if initialize_state was not called prior to calling this function
pub fn take_valid_headers(&mut self) -> Vec<ChainHeader> {
self.state_mut().valid_headers.drain(..).collect::<Vec<_>>()
}

/// Returns a slice containing the current valid headers
///
/// ## Panics
///
/// Panics if initialize_state was not called prior to calling this function
pub fn valid_headers(&self) -> &[ChainHeader] {
&self.state().valid_headers
}

pub fn check_stronger_chain(
&self,
our_header: &ChainHeader,
their_header: &ChainHeader,
) -> Result<(), BlockHeaderSyncError>
{
debug!(
target: LOG_TARGET,
"Comparing PoW on remote header #{} and local header #{}",
Expand All @@ -189,8 +196,8 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
.chain_strength_comparer()
.compare(&our_header, their_header)
{
Ordering::Less => Ok(()),
Ordering::Equal | Ordering::Greater => Err(BlockHeaderSyncError::WeakerChain),
Ordering::Less | Ordering::Equal => Ok(()),
Ordering::Greater => Err(BlockHeaderSyncError::WeakerChain),
}
}

Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/chain_storage/accumulated_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl Display for BlockHeaderAccumulatedData {
}
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ChainHeader {
pub header: BlockHeader,
pub accumulated_data: BlockHeaderAccumulatedData,
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(rewind_to_height(height: u64) -> Vec<Arc<ChainBlock>>, "rewind_to_height");

make_async_fn!(rewind_to_hash(hash: BlockHash) -> Vec<Arc<ChainBlock>>, "rewind_to_hash");

//---------------------------------- Headers --------------------------------------------//
make_async_fn!(fetch_header(height: u64) -> Option<BlockHeader>, "fetch_header");

Expand Down
26 changes: 26 additions & 0 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,17 @@ where B: BlockchainBackend
rewind_to_height(&mut *db, height)
}

/// Rewind the blockchain state to the block hash making the block at that hash the new tip.
/// Returns the removed blocks.
///
/// The operation will fail if
/// * The block hash does not exist
/// * The block hash is before the horizon block height determined by the pruning horizon
pub fn rewind_to_hash(&self, hash: BlockHash) -> Result<Vec<Arc<ChainBlock>>, ChainStorageError> {
let mut db = self.db_write_access()?;
rewind_to_hash(&mut *db, hash)
}

pub fn fetch_horizon_data(&self) -> Result<Option<HorizonData>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_horizon_data()
Expand Down Expand Up @@ -1496,6 +1507,21 @@ fn rewind_to_height<T: BlockchainBackend>(db: &mut T, height: u64) -> Result<Vec
Ok(removed_blocks)
}

fn rewind_to_hash<T: BlockchainBackend>(
db: &mut T,
block_hash: BlockHash,
) -> Result<Vec<Arc<ChainBlock>>, ChainStorageError>
{
let block_hash_hex = block_hash.to_hex();
let target_header =
fetch_header_by_block_hash(&*db, block_hash)?.ok_or_else(|| ChainStorageError::ValueNotFound {
entity: "BlockHeader".to_string(),
field: "block_hash".to_string(),
value: block_hash_hex,
})?;
rewind_to_height(db, target_header.height)
}

// Checks whether we should add the block as an orphan. If it is the case, the orphan block is added and the chain
// is reorganised if necessary.
fn handle_possible_reorg<T: BlockchainBackend>(
Expand Down
31 changes: 29 additions & 2 deletions integration_tests/features/Sync.feature
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,37 @@ Feature: Block Sync
Then NODE1 should have 11 peers
Then NODE2 should have 11 peers

@critical @reorg
Scenario: Full block sync with small reorg
Given I have a base node NODE1 connected to all seed nodes
Given I have a base node NODE2 connected to node NODE1
When I mine 5 blocks on NODE1
Then all nodes are at height 5
Given I stop NODE2
Then I mine 5 blocks on NODE1
Given I stop NODE1
And I start NODE2
Then I mine 7 blocks on NODE2
When I start NODE1
Then all nodes are on the same chain at height 12

@critical @reorg @long-running
Scenario: Full block sync with large reorg
Given I have a base node NODE1 connected to all seed nodes
Given I have a base node NODE2 connected to node NODE1
When I mine 5 blocks on NODE1
Then all nodes are at height 5
Given I stop NODE2
Then I mine 1001 blocks on NODE1
Given I stop NODE1
And I start NODE2
Then I mine 1500 blocks on NODE2
When I start NODE1
Then all nodes are on the same chain at height 1505

@critical
Scenario: Pruned mode
#TODO: Merge steps into single lines
# TODO: Merge steps into single lines
Given I have a base node NODE1 connected to all seed nodes
When I mine a block on NODE1 with coinbase CB1
When I mine a block on NODE1 with coinbase CB2
Expand Down Expand Up @@ -72,4 +100,3 @@ Feature: Block Sync
And I mine 6 blocks on PNODE2
When I start NODE1
Then all nodes are at height 20

44 changes: 31 additions & 13 deletions integration_tests/features/support/steps.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const MergeMiningProxyProcess = require('../../helpers/mergeMiningProxyProcess')
const WalletProcess = require('../../helpers/walletProcess');
const expect = require('chai').expect;
const {waitFor, getTransactionOutputHash, sleep, consoleLogTransactionDetails, consoleLogBalance,
consoleLogCoinbaseDetails} = require('../../helpers/util');
consoleLogCoinbaseDetails, withTimeout} = require('../../helpers/util');
const TransactionBuilder = require('../../helpers/transactionBuilder');
let lastResult;

Expand All @@ -29,7 +29,7 @@ Given(/I have a base node (.*) connected to all seed nodes/, {timeout: 20*1000},
miner.setPeerSeeds([this.seedAddresses()]);
await miner.startNew();
this.addNode(name, miner);
});
});

Given(/I have a base node (.*) connected to seed (.*)/, {timeout: 20*1000}, async function (name, seedNode) {
const miner = this.createNode(name);
Expand All @@ -47,8 +47,6 @@ Given(/I have a base node (.*) connected to node (.*)/, {timeout: 20*1000}, asyn
await sleep(1000);
});



Given(/I have a pruned node (.*) connected to node (.*)/, {timeout: 20*1000}, async function (name, node) {
const miner = this.createNode(name, { pruningHorizon: 5});
miner.setPeerSeeds([this.nodes[node].peerAddress()]);
Expand Down Expand Up @@ -162,6 +160,23 @@ Then(/node (.*) is at height (\d+)/, {timeout: 120*1000}, async function (name,
expect(await client.getTipHeight()).to.equal(height);
});

Then('all nodes are on the same chain at height {int}', {timeout: 1200*1000}, async function (height) {
let tipHash = null;
await this.forEachClientAsync(async (client, name) => {
await waitFor(async() => client.getTipHeight(), height, 115*1000);
const currTip = await client.getTipHeader();
expect(currTip.height).to.equal(height);
if (!tipHash) {
tipHash = currTip.hash.toString('hex');
console.log(`Node ${name} is at tip: ${tipHash}`);
} else {
let currTipHash = currTip.hash.toString('hex');
console.log(`Node ${name} is at tip: ${currTipHash} (should be ${tipHash})`);
expect(currTipHash).to.equal(tipHash);
}
})
});

Then('all nodes are at height {int}', {timeout: 1200*1000},async function (height) {
await this.forEachClientAsync(async (client, name) => {
await waitFor(async() => client.getTipHeight(), height, 115*1000);
Expand All @@ -177,7 +192,7 @@ Then('all nodes are at current tip height', {timeout: 1200*1000},async function
await this.forEachClientAsync(async (client, name) => {
await waitFor(async() => client.getTipHeight(), height, 1200*1000);
const currTip = await client.getTipHeight();
console.log(`Node ${name} is at tip: ${currTip} (should be`, height, `)`);
console.log(`Node ${name} is at tip: ${currTip} (expected ${height})`);
expect(currTip).to.equal(height);
})
});
Expand Down Expand Up @@ -314,21 +329,24 @@ Then(/node (.*) is at tip (.*)/, async function (node, name) {
});

When(/I mine a block on (.*) with coinbase (.*)/, {timeout: 600*1000}, async function (name, coinbaseName) {
await this.mineBlock(name, 0, candidate => {
this.addOutput(coinbaseName, candidate.originalTemplate.coinbase);
return candidate;
});
await this.mineBlock(name, 0, candidate => {
this.addOutput(coinbaseName, candidate.originalTemplate.coinbase);
return candidate;
});
this.tipHeight += 1;
});

When(/I mine (\d+) custom weight blocks on (.*) with weight (\d+)/, {timeout: 600*1000}, async function (numBlocks, name, weight) {
When(/I mine (\d+) custom weight blocks on (.*) with weight (\d+)/, {timeout: -1}, async function (numBlocks, name, weight) {
for(let i=0;i<numBlocks;i++) {
await this.mineBlock(name, 17);
// If a block cannot be mined quickly enough (or the process has frozen), timeout.
await withTimeout(60 * 1000, this.mineBlock(name, parseInt(weight)));
}
this.tipHeight += parseInt(numBlocks);
});

When(/I mine (\d+) blocks on (.*)/, {timeout: 600*1000}, async function (numBlocks, name) {
When(/I mine (\d+) blocks on (.*)/, {timeout: -1}, async function (numBlocks, name) {
for(let i=0;i<numBlocks;i++) {
await this.mineBlock(name, 0);
await withTimeout(60 * 1000, this.mineBlock(name, 0));
}
this.tipHeight += parseInt(numBlocks);
});
Expand Down
25 changes: 13 additions & 12 deletions integration_tests/helpers/baseNodeClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,18 @@ class BaseNodeClient {
}

getTipHeader() {
return this.client.listHeaders().sendMessage({from_height: 0, num_headers: 1}).then(header => {
return header;
})
return this.client.listHeaders().sendMessage({from_height: 0, num_headers: 1}).then(headers => {
const header = headers[0];
return Object.assign(header,{
height: +header.height,
});
});
}

getTipHeight() {
return this.client.getTipInfo()
.sendMessage({})
.then(tip => parseInt(tip.metadata.height_of_longest_chain));
}

getPreviousBlockTemplate(height) {
Expand Down Expand Up @@ -122,14 +131,6 @@ class BaseNodeClient {
);
}

getTipHeight() {
return this.client.getTipInfo()
.sendMessage({})
.then(tip => {
return parseInt(tip.metadata.height_of_longest_chain);
});
}

fetchMatchingUtxos(hashes) {
return this.client.fetchMatchingUtxos()
.sendMessage({hashes: hashes})
Expand Down Expand Up @@ -219,7 +220,7 @@ class BaseNodeClient {
async mineBlockWithoutWallet(beforeSubmit, weight, onError) {
let template = await this.getMinedCandidateBlock(weight);
return this.submitTemplate(template, beforeSubmit).then(async () => {
let tip = await this.getTipHeight();
// let tip = await this.getTipHeight();
// console.log("Node is at tip:", tip);
}, err => {
console.log("err submitting block:", err);
Expand Down
Loading

0 comments on commit baa411f

Please sign in to comment.