Skip to content

Commit

Permalink
fix: edge cases causing bans during header/block sync (#3661)
Browse files Browse the repository at this point in the history
Description
---
Fixes the following edge cases:
- base node reorg while streaming blocks (including the reorged block) sends an error message to the peer to let it know to resolve reorg using header sync.
- lock add_block during header/block sync, sync protocols assume chain does not change while syncing
- check the local chain metadata at the time the base node kicked into header sync to decide to ban 

Motivation and Context
---
Fixes these erroneous ban conditions that were discovered during the recent stress test
1. Peer claimed an accumulated difficulty of 7632339559469659380317024 but validated difficulty was 7632339559469659380317024 <= local: 7632357293467858576437000.
2.  Banned sync peer because Peer claimed an accumulated difficulty of 7880400444975439614055394 but validated difficulty was 0 <= local: 7880400444975439
3. Peer sent hash for block header we do not have.


How Has This Been Tested?
---
Not explicitly tested, simple test syncing base node to tip
  • Loading branch information
sdbondi authored Jan 4, 2022
1 parent 576a00c commit 95af1cf
Show file tree
Hide file tree
Showing 18 changed files with 253 additions and 85 deletions.
7 changes: 6 additions & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tari_core::{
state_machine_service::{initializer::BaseNodeStateMachineInitializer, states::HorizonSyncConfig},
BaseNodeStateMachineConfig,
BlockSyncConfig,
LocalNodeCommsInterface,
StateMachineHandle,
},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, BlockchainDatabase},
Expand Down Expand Up @@ -211,6 +212,7 @@ where B: BlockchainBackend + 'static
config: &GlobalConfig,
) -> UnspawnedCommsNode {
let dht = handles.expect_handle::<Dht>();
let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
let builder = RpcServer::builder();
let builder = match config.rpc_max_simultaneous_sessions {
Some(limit) => builder.with_maximum_simultaneous_sessions(limit),
Expand All @@ -228,7 +230,10 @@ where B: BlockchainBackend + 'static
// Add your RPC services here ‍🏴‍☠️️☮️🌊
let rpc_server = rpc_server
.add_service(dht.rpc_service())
.add_service(base_node::create_base_node_sync_rpc_service(db.clone()))
.add_service(base_node::create_base_node_sync_rpc_service(
db.clone(),
base_node_service,
))
.add_service(mempool::create_mempool_rpc_service(
handles.expect_handle::<MempoolHandle>(),
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,15 @@ where T: BlockchainBackend + 'static
) -> Result<(), CommsInterfaceError> {
let NewBlock { block_hash } = new_block;

if self.blockchain_db.inner().is_add_block_disabled() {
info!(
target: LOG_TARGET,
"Ignoring block message ({}) because add_block is locked",
block_hash.to_hex()
);
return Ok(());
}

// Only a single block request can complete at a time.
// As multiple NewBlock requests arrive from propagation, this semaphore prevents multiple requests to nodes for
// the same full block. The first request that succeeds will stop the node from requesting the block from any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,16 @@ use crate::{
comms_interface::LocalNodeCommsInterface,
state_machine_service::{
states,
states::{BaseNodeState, HorizonSyncConfig, StateEvent, StateInfo, StatusInfo, SyncPeerConfig, SyncStatus},
states::{
BaseNodeState,
HeaderSyncState,
HorizonSyncConfig,
StateEvent,
StateInfo,
StatusInfo,
SyncPeerConfig,
SyncStatus,
},
},
sync::{BlockSyncConfig, SyncValidators},
},
Expand Down Expand Up @@ -137,22 +146,51 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
/// Describe the Finite State Machine for the base node. This function describes _every possible_ state
/// transition for the node given its current state and an event that gets triggered.
pub fn transition(&self, state: BaseNodeState, event: StateEvent) -> BaseNodeState {
let db = self.db.inner();
use self::{BaseNodeState::*, StateEvent::*, SyncStatus::*};
match (state, event) {
(Starting(s), Initialized) => Listening(s.into()),
(Listening(_), FallenBehind(Lagging(_, sync_peers))) => HeaderSync(sync_peers.into()),
(HeaderSync(s), HeaderSyncFailed) => Waiting(s.into()),
(HeaderSync(s), Continue | NetworkSilence) => Listening(s.into()),
(
Listening(_),
FallenBehind(Lagging {
local: local_metadata,
sync_peers,
..
}),
) => {
db.set_disable_add_block_flag();
HeaderSync(HeaderSyncState::new(sync_peers, local_metadata))
},
(HeaderSync(s), HeaderSyncFailed) => {
db.clear_disable_add_block_flag();
Waiting(s.into())
},
(HeaderSync(s), Continue | NetworkSilence) => {
db.clear_disable_add_block_flag();
Listening(s.into())
},
(HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()),

(DecideNextSync(_), ProceedToHorizonSync(peer)) => HorizonStateSync(peer.into()),
(DecideNextSync(s), Continue) => Listening(s.into()),
(DecideNextSync(s), Continue) => {
db.clear_disable_add_block_flag();
Listening(s.into())
},
(HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()),
(HorizonStateSync(s), HorizonStateSyncFailure) => Waiting(s.into()),
(HorizonStateSync(s), HorizonStateSyncFailure) => {
db.clear_disable_add_block_flag();
Waiting(s.into())
},

(DecideNextSync(_), ProceedToBlockSync(peer)) => BlockSync(peer.into()),
(BlockSync(s), BlocksSynchronized) => Listening(s.into()),
(BlockSync(s), BlockSyncFailed) => Waiting(s.into()),
(BlockSync(s), BlocksSynchronized) => {
db.clear_disable_add_block_flag();
Listening(s.into())
},
(BlockSync(s), BlockSyncFailed) => {
db.clear_disable_add_block_flag();
Waiting(s.into())
},

(Waiting(s), Continue) => Listening(s.into()),
(_, FatalError(s)) => Shutdown(states::Shutdown::with_reason(s)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::base_node::{
state_machine_service::states::{
BlockSync,
DecideNextSync,
HeaderSync,
HeaderSyncState,
HorizonStateSync,
Listening,
ListeningInfo,
Expand All @@ -44,7 +44,7 @@ use crate::base_node::{
#[derive(Debug)]
pub enum BaseNodeState {
Starting(Starting),
HeaderSync(HeaderSync),
HeaderSync(HeaderSyncState),
DecideNextSync(DecideNextSync),
HorizonStateSync(HorizonStateSync),
BlockSync(BlockSync),
Expand Down Expand Up @@ -86,7 +86,11 @@ impl<E: std::error::Error> From<E> for StateEvent {
#[derive(Debug, Clone, PartialEq)]
pub enum SyncStatus {
// We are behind the chain tip.
Lagging(ChainMetadata, Vec<SyncPeer>),
Lagging {
local: ChainMetadata,
network: ChainMetadata,
sync_peers: Vec<SyncPeer>,
},
UpToDate,
}

Expand All @@ -104,12 +108,14 @@ impl Display for SyncStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
use SyncStatus::*;
match self {
Lagging(m, v) => write!(
Lagging {
network, sync_peers, ..
} => write!(
f,
"Lagging behind {} peers (#{}, Difficulty: {})",
v.len(),
m.height_of_longest_chain(),
m.accumulated_difficulty(),
sync_peers.len(),
network.height_of_longest_chain(),
network.accumulated_difficulty(),
),
UpToDate => f.write_str("UpToDate"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
use std::{cmp::Ordering, time::Instant};

use log::*;
use tari_common_types::chain_metadata::ChainMetadata;

use crate::{
base_node::{
comms_interface::BlockEvent,
state_machine_service::states::{BlockSyncInfo, Listening, StateEvent, StateInfo, StatusInfo},
state_machine_service::states::{BlockSyncInfo, StateEvent, StateInfo, StatusInfo},
sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer},
BaseNodeStateMachine,
},
Expand All @@ -36,14 +37,15 @@ use crate::{

const LOG_TARGET: &str = "c::bn::header_sync";

#[derive(Clone, Debug, Default)]
pub struct HeaderSync {
#[derive(Clone, Debug)]
pub struct HeaderSyncState {
sync_peers: Vec<SyncPeer>,
is_synced: bool,
local_metadata: ChainMetadata,
}

impl HeaderSync {
pub fn new(mut sync_peers: Vec<SyncPeer>) -> Self {
impl HeaderSyncState {
pub fn new(mut sync_peers: Vec<SyncPeer>, local_metadata: ChainMetadata) -> Self {
// Sort by latency lowest to highest
sync_peers.sort_by(|a, b| match (a.latency(), b.latency()) {
(None, None) => Ordering::Equal,
Expand All @@ -55,6 +57,7 @@ impl HeaderSync {
Self {
sync_peers,
is_synced: false,
local_metadata,
}
}

Expand All @@ -77,6 +80,7 @@ impl HeaderSync {
shared.connectivity.clone(),
&self.sync_peers,
shared.randomx_factory.clone(),
&self.local_metadata,
);

let status_event_sender = shared.status_event_sender.clone();
Expand Down Expand Up @@ -141,14 +145,3 @@ impl HeaderSync {
}
}
}

impl From<Listening> for HeaderSync {
fn from(_: Listening) -> Self {
Default::default()
}
}
impl From<Vec<SyncPeer>> for HeaderSync {
fn from(peers: Vec<SyncPeer>) -> Self {
Self::new(peers)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
states::{
BlockSync,
DecideNextSync,
HeaderSync,
HeaderSyncState,
StateEvent,
StateEvent::FatalError,
StateInfo,
Expand Down Expand Up @@ -195,7 +195,7 @@ impl Listening {
if self.is_synced &&
best_metadata.height_of_longest_chain() == local.height_of_longest_chain() + 1 &&
time_since_better_block
.map(|ts: Instant| ts.elapsed() < Duration::from_secs(30))
.map(|ts: Instant| ts.elapsed() < Duration::from_secs(60))
.unwrap_or(true)
{
if time_since_better_block.is_none() {
Expand All @@ -217,7 +217,7 @@ impl Listening {
peer_metadata_list
};

let local = match shared.db.get_chain_metadata().await {
let local_metadata = match shared.db.get_chain_metadata().await {
Ok(m) => m,
Err(e) => {
return FatalError(format!("Could not get local blockchain metadata. {}", e));
Expand All @@ -227,7 +227,7 @@ impl Listening {

let sync_mode = determine_sync_mode(
shared.config.blocks_behind_before_considered_lagging,
&local,
&local_metadata,
best_metadata,
sync_peers,
);
Expand Down Expand Up @@ -266,8 +266,8 @@ impl From<Waiting> for Listening {
}
}

impl From<HeaderSync> for Listening {
fn from(sync: HeaderSync) -> Self {
impl From<HeaderSyncState> for Listening {
fn from(sync: HeaderSyncState) -> Self {
Self {
is_synced: sync.is_synced(),
}
Expand Down Expand Up @@ -356,12 +356,15 @@ fn determine_sync_mode(
return UpToDate;
};

let sync_peers = sync_peers.into_iter().cloned().collect();
debug!(
target: LOG_TARGET,
"Lagging (local height = {}, network height = {})", local_tip_height, network_tip_height
);
Lagging(network.clone(), sync_peers)
Lagging {
local: local.clone(),
network: network.clone(),
sync_peers: sync_peers.into_iter().cloned().collect(),
}
} else {
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -497,28 +500,28 @@ mod test {

let network = ChainMetadata::new(0, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}

let local = ChainMetadata::new(100, Vec::new(), 50, 50, 500_000);
let network = ChainMetadata::new(150, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}

let local = ChainMetadata::new(0, Vec::new(), 50, 50, 500_000);
let network = ChainMetadata::new(100, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}

let local = ChainMetadata::new(99, Vec::new(), 50, 50, 500_000);
let network = ChainMetadata::new(150, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub(crate) mod helpers;
pub use helpers::SyncPeerConfig;

mod header_sync;
pub use header_sync::HeaderSync;
pub use header_sync::HeaderSyncState;

mod sync_decide;
pub use sync_decide::DecideNextSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use log::*;
use crate::{
base_node::{
state_machine_service::{
states::{HeaderSync, StateEvent},
states::{HeaderSyncState, StateEvent},
BaseNodeStateMachine,
},
sync::SyncPeer,
Expand Down Expand Up @@ -118,8 +118,8 @@ fn find_best_latency<'a, I: IntoIterator<Item = &'a SyncPeer>>(iter: I) -> Optio
.cloned()
}

impl From<HeaderSync> for DecideNextSync {
fn from(sync: HeaderSync) -> Self {
impl From<HeaderSyncState> for DecideNextSync {
fn from(sync: HeaderSyncState) -> Self {
Self {
sync_peers: sync.into_sync_peers(),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::Duration;
use log::info;
use tokio::time::sleep;

use crate::base_node::state_machine_service::states::{BlockSync, HeaderSync, HorizonStateSync, StateEvent};
use crate::base_node::state_machine_service::states::{BlockSync, HeaderSyncState, HorizonStateSync, StateEvent};

const LOG_TARGET: &str = "c::bn::state_machine_service::states::waiting";

Expand Down Expand Up @@ -68,8 +68,8 @@ impl From<BlockSync> for Waiting {
}
}

impl From<HeaderSync> for Waiting {
fn from(_: HeaderSync) -> Self {
impl From<HeaderSyncState> for Waiting {
fn from(_: HeaderSyncState) -> Self {
Default::default()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
.fetch_chain_header_by_block_hash(block.hash.clone())
.await?
.ok_or_else(|| {
BlockSyncError::ProtocolViolation("Peer sent hash for block header we do not have".into())
BlockSyncError::ProtocolViolation(format!(
"Peer sent hash ({}) for block header we do not have",
block.hash.to_hex()
))
})?;

let current_height = header.height();
Expand Down
Loading

0 comments on commit 95af1cf

Please sign in to comment.