Skip to content

Commit 2dee0e3

Browse files
committedOct 25, 2023
Ban bad block-sync peers
- Added a check to ban a misbahaving peer after block sync when not supplying any or all of the blocks corresponding to the accumulated difficulty they claimed they had. - Added a check in the RPC block-sync server method to not try and supply blocks past its own end of chain best block. - Added integration level unit tests for block sync.
1 parent 3e1ec1f commit 2dee0e3

File tree

8 files changed

+266
-52
lines changed

8 files changed

+266
-52
lines changed
 

‎base_layer/core/src/base_node/sync/block_sync/error.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ pub enum BlockSyncError {
6969
SyncRoundFailed,
7070
#[error("Could not find peer info")]
7171
PeerNotFound,
72+
#[error("Peer did not supply all the blocks they claimed they had: {0}")]
73+
PeerDidNotSupplyAllClaimedBlocks(String),
7274
}
7375

7476
impl BlockSyncError {
@@ -93,6 +95,7 @@ impl BlockSyncError {
9395
BlockSyncError::FixedHashSizeError(_) => "FixedHashSizeError",
9496
BlockSyncError::SyncRoundFailed => "SyncRoundFailed",
9597
BlockSyncError::PeerNotFound => "PeerNotFound",
98+
BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(_) => "PeerDidNotSupplyAllClaimedBlocks",
9699
}
97100
}
98101
}
@@ -102,8 +105,6 @@ impl BlockSyncError {
102105
match self {
103106
// no ban
104107
BlockSyncError::AsyncTaskFailed(_) |
105-
BlockSyncError::RpcError(_) |
106-
BlockSyncError::RpcRequestError(_) |
107108
BlockSyncError::ChainStorageError(_) |
108109
BlockSyncError::ConnectivityError(_) |
109110
BlockSyncError::NoMoreSyncPeers(_) |
@@ -113,7 +114,10 @@ impl BlockSyncError {
113114
BlockSyncError::SyncRoundFailed => None,
114115

115116
// short ban
116-
err @ BlockSyncError::MaxLatencyExceeded { .. } => Some(BanReason {
117+
err @ BlockSyncError::MaxLatencyExceeded { .. } |
118+
err @ BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(_) |
119+
err @ BlockSyncError::RpcError(_) |
120+
err @ BlockSyncError::RpcRequestError(_) => Some(BanReason {
117121
reason: format!("{}", err),
118122
ban_duration: short_ban,
119123
}),

‎base_layer/core/src/base_node/sync/block_sync/synchronizer.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
128128
warn!(target: LOG_TARGET, "{} ({})", err, sync_round);
129129
continue;
130130
},
131-
Err(err) => return Err(err),
131+
Err(err) => {
132+
return Err(err);
133+
},
132134
}
133135
}
134136
}
@@ -407,6 +409,15 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
407409
last_sync_timer = Instant::now();
408410
}
409411

412+
let accumulated_difficulty = self.db.get_chain_metadata().await?.accumulated_difficulty();
413+
if accumulated_difficulty < sync_peer.claimed_chain_metadata().accumulated_difficulty() {
414+
return Err(BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(format!(
415+
"Their claimed difficulty: {}, our local difficulty after block sync: {}",
416+
sync_peer.claimed_chain_metadata().accumulated_difficulty(),
417+
accumulated_difficulty
418+
)));
419+
}
420+
410421
if let Some(block) = current_block {
411422
self.hooks.call_on_complete_hooks(block, best_height);
412423
}

‎base_layer/core/src/base_node/sync/rpc/service.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
117117
.start_hash
118118
.try_into()
119119
.map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?;
120+
if db.fetch_block_by_hash(hash, true).await.is_err() {
121+
return Err(RpcStatus::not_found("Requested start block sync hash was not found"));
122+
}
120123
let start_header = db
121124
.fetch_header_by_block_hash(hash)
122125
.await
@@ -134,13 +137,13 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
134137
)));
135138
}
136139

137-
if start_height > metadata.height_of_longest_chain() {
138-
return Ok(Streaming::empty());
139-
}
140140
let hash = message
141141
.end_hash
142142
.try_into()
143143
.map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?;
144+
if db.fetch_block_by_hash(hash, true).await.is_err() {
145+
return Err(RpcStatus::not_found("Requested end block sync hash was not found"));
146+
}
144147
let end_header = db
145148
.fetch_header_by_block_hash(hash)
146149
.await

‎base_layer/core/src/common/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub type ConfidentialOutputHasher = DomainSeparatedConsensusHasher<ConfidentialO
4242

4343
/// The reason for a peer being banned
4444
#[cfg(feature = "base_node")]
45-
#[derive(Clone)]
45+
#[derive(Clone, Debug)]
4646
pub struct BanReason {
4747
/// The reason for the ban
4848
pub reason: String,

‎base_layer/core/tests/helpers/sync.rs

+53-33
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use tari_comms::peer_manager::NodeId;
2828
use tari_core::{
2929
base_node::{
3030
chain_metadata_service::PeerChainMetadata,
31-
state_machine_service::states::{HeaderSyncState, StateEvent, StatusInfo},
31+
state_machine_service::states::{BlockSync, HeaderSyncState, StateEvent, StatusInfo},
3232
sync::SyncPeer,
3333
BaseNodeStateMachine,
3434
BaseNodeStateMachineConfig,
@@ -76,6 +76,21 @@ pub async fn sync_headers_execute(
7676
header_sync.next_event(state_machine).await
7777
}
7878

79+
pub fn initialize_sync_blocks(peer_node_interfaces: &NodeInterfaces) -> BlockSync {
80+
BlockSync::from(vec![SyncPeer::from(PeerChainMetadata::new(
81+
peer_node_interfaces.node_identity.node_id().clone(),
82+
peer_node_interfaces.blockchain_db.get_chain_metadata().unwrap(),
83+
None,
84+
))])
85+
}
86+
87+
pub async fn sync_blocks_execute(
88+
state_machine: &mut BaseNodeStateMachine<TempDatabase>,
89+
block_sync: &mut BlockSync,
90+
) -> StateEvent {
91+
block_sync.next_event(state_machine).await
92+
}
93+
7994
pub async fn create_network_with_local_and_peer_nodes() -> (
8095
BaseNodeStateMachine<TempDatabase>,
8196
NodeInterfaces,
@@ -140,6 +155,7 @@ pub async fn create_network_with_local_and_peer_nodes() -> (
140155
#[allow(dead_code)]
141156
#[derive(Debug)]
142157
pub enum WhatToDelete {
158+
BlocksAndHeaders,
143159
Blocks,
144160
Headers,
145161
}
@@ -149,44 +165,48 @@ pub fn delete_some_blocks_and_headers(
149165
blocks_with_anchor: &[ChainBlock],
150166
instruction: WhatToDelete,
151167
node: &NodeInterfaces,
152-
set_best_block: Option<bool>,
153168
) {
154169
if blocks_with_anchor.is_empty() || blocks_with_anchor.len() < 2 {
155170
panic!("blocks must have at least 2 elements");
156171
}
157-
let set_best_block = set_best_block.unwrap_or(false);
158172
let mut blocks: Vec<_> = blocks_with_anchor.to_vec();
159-
blocks.reverse();
160-
for i in 0..blocks.len() - 1 {
161-
let mut txn = DbTransaction::new();
162-
match instruction {
163-
WhatToDelete::Blocks => {
164-
txn.delete_block(*blocks[i].hash());
165-
txn.delete_orphan(*blocks[i].hash());
166-
if set_best_block {
167-
txn.set_best_block(
168-
blocks[i + 1].height(),
169-
blocks[i + 1].accumulated_data().hash,
170-
blocks[i + 1].accumulated_data().total_accumulated_difficulty,
171-
*node.blockchain_db.get_chain_metadata().unwrap().best_block(),
172-
blocks[i + 1].to_chain_header().timestamp(),
173-
);
173+
match instruction {
174+
WhatToDelete::BlocksAndHeaders => {
175+
node.blockchain_db.rewind_to_height(blocks[0].height()).unwrap();
176+
for block in &blocks[1..] {
177+
if node.blockchain_db.block_exists(*block.hash()).unwrap() {
178+
let mut txn = DbTransaction::new();
179+
txn.delete_orphan(*block.hash());
180+
node.blockchain_db.write(txn).unwrap();
174181
}
175-
},
176-
WhatToDelete::Headers => {
177-
txn.delete_header(blocks[i].height());
178-
},
179-
}
180-
node.blockchain_db.write(txn).unwrap();
181-
// Note: Something is funny here... the block is deleted but the block exists in the db. This should be
182-
// investigated and fixed as it will enhance the tests. If we uncomment the following assertion, the
183-
// tests depending on this function will fail.
184-
// match instruction {
185-
// WhatToDelete::Blocks => {
186-
// assert!(!node.blockchain_db.block_exists(*blocks[i].hash()).unwrap());
187-
// }
188-
// WhatToDelete::Headers => {}
189-
// }
182+
assert!(!node.blockchain_db.block_exists(*block.hash()).unwrap());
183+
}
184+
},
185+
WhatToDelete::Blocks => {
186+
let headers = blocks.iter().map(|b| b.to_chain_header()).collect::<Vec<_>>();
187+
node.blockchain_db.rewind_to_height(blocks[0].height()).unwrap();
188+
for block in &blocks[1..] {
189+
if node.blockchain_db.block_exists(*block.hash()).unwrap() {
190+
let mut txn = DbTransaction::new();
191+
txn.delete_orphan(*block.hash());
192+
node.blockchain_db.write(txn).unwrap();
193+
}
194+
assert!(!node.blockchain_db.block_exists(*block.hash()).unwrap());
195+
}
196+
node.blockchain_db.insert_valid_headers(headers[1..].to_vec()).unwrap();
197+
// Note: This seems funny, as inserting the headers back into the db will cause this test to fail
198+
// for block in &blocks[1..] {
199+
// assert!(!node.blockchain_db.block_exists(*block.hash()).unwrap());
200+
// }
201+
},
202+
WhatToDelete::Headers => {
203+
blocks.reverse();
204+
for block in blocks.iter().take(blocks.len() - 1) {
205+
let mut txn = DbTransaction::new();
206+
txn.delete_header(block.height());
207+
node.blockchain_db.write(txn).unwrap();
208+
}
209+
},
190210
}
191211
}
192212

+176
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright 2022. The Tari Project
2+
//
3+
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4+
// following conditions are met:
5+
//
6+
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7+
// disclaimer.
8+
//
9+
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10+
// following disclaimer in the documentation and/or other materials provided with the distribution.
11+
//
12+
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13+
// products derived from this software without specific prior written permission.
14+
//
15+
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16+
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17+
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18+
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19+
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20+
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21+
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22+
23+
use tari_core::base_node::state_machine_service::states::StateEvent;
24+
25+
use crate::helpers::{sync, sync::WhatToDelete};
26+
27+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
28+
async fn test_block_sync_happy_path() {
29+
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`
30+
31+
// Create the network with Alice node and Bob node
32+
let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) =
33+
sync::create_network_with_local_and_peer_nodes().await;
34+
35+
// Add some block to Bob's chain
36+
let _bob_blocks =
37+
sync::create_and_add_some_blocks(&bob_node, &initial_block, 5, &consensus_manager, &key_manager, &[3; 5]).await;
38+
assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 5);
39+
40+
// Alice attempts header sync
41+
let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node);
42+
let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await;
43+
match event.clone() {
44+
StateEvent::HeadersSynchronized(..) => {
45+
// Good, headers are synced
46+
},
47+
_ => panic!("Expected HeadersSynchronized event"),
48+
}
49+
50+
// Alice attempts block sync
51+
println!();
52+
assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 0);
53+
let mut block_sync = sync::initialize_sync_blocks(&bob_node);
54+
let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await;
55+
match event {
56+
StateEvent::BlocksSynchronized => {
57+
// Good, blocks are synced
58+
},
59+
_ => panic!("Expected BlocksSynchronized event"),
60+
}
61+
assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5);
62+
63+
// Alice attempts block sync again
64+
println!();
65+
let mut block_sync = sync::initialize_sync_blocks(&bob_node);
66+
let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await;
67+
match event {
68+
StateEvent::BlocksSynchronized => {
69+
// Good, blocks are synced
70+
},
71+
_ => panic!("Expected BlocksSynchronized event"),
72+
}
73+
assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5);
74+
}
75+
76+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
77+
async fn test_block_sync_peer_supplies_no_blocks_with_ban() {
78+
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`
79+
80+
// Create the network with Alice node and Bob node
81+
let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) =
82+
sync::create_network_with_local_and_peer_nodes().await;
83+
84+
// Add some block to Bob's chain
85+
let blocks = sync::create_and_add_some_blocks(
86+
&bob_node,
87+
&initial_block,
88+
10,
89+
&consensus_manager,
90+
&key_manager,
91+
&[3; 10],
92+
)
93+
.await;
94+
assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 10);
95+
// Add blocks to Alice's chain
96+
sync::add_some_existing_blocks(&blocks[1..=5], &alice_node);
97+
assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5);
98+
99+
// Alice attempts header sync
100+
let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node);
101+
let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await;
102+
match event.clone() {
103+
StateEvent::HeadersSynchronized(..) => {
104+
// Good, headers are synced
105+
},
106+
_ => panic!("Expected HeadersSynchronized event"),
107+
}
108+
109+
// Alice attempts block sync, Bob will not send any blocks and be banned
110+
println!();
111+
let mut block_sync = sync::initialize_sync_blocks(&bob_node);
112+
sync::delete_some_blocks_and_headers(&blocks[5..=10], WhatToDelete::Blocks, &bob_node);
113+
assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 5);
114+
let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await;
115+
match event {
116+
StateEvent::BlockSyncFailed => {
117+
// Good, Bob is banned.
118+
},
119+
_ => panic!("Expected BlockSyncFailed event"),
120+
}
121+
assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5);
122+
123+
// Bob will be banned
124+
assert!(sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await);
125+
}
126+
127+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
128+
async fn test_block_sync_peer_supplies_not_all_blocks_with_ban() {
129+
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`
130+
131+
// Create the network with Alice node and Bob node
132+
let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) =
133+
sync::create_network_with_local_and_peer_nodes().await;
134+
135+
// Add some block to Bob's chain
136+
let blocks = sync::create_and_add_some_blocks(
137+
&bob_node,
138+
&initial_block,
139+
10,
140+
&consensus_manager,
141+
&key_manager,
142+
&[3; 10],
143+
)
144+
.await;
145+
assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 10);
146+
// Add blocks to Alice's chain
147+
sync::add_some_existing_blocks(&blocks[1..=5], &alice_node);
148+
assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5);
149+
150+
// Alice attempts header sync
151+
let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node);
152+
let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await;
153+
match event.clone() {
154+
StateEvent::HeadersSynchronized(..) => {
155+
// Good, headers are synced
156+
},
157+
_ => panic!("Expected HeadersSynchronized event"),
158+
}
159+
160+
// Alice attempts block sync, Bob will not send all blocks and be banned
161+
println!();
162+
let mut block_sync = sync::initialize_sync_blocks(&bob_node);
163+
sync::delete_some_blocks_and_headers(&blocks[8..=10], WhatToDelete::Blocks, &bob_node);
164+
assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 8);
165+
let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await;
166+
match event {
167+
StateEvent::BlockSyncFailed => {
168+
// Good, Bob is banned.
169+
},
170+
_ => panic!("Expected BlockSyncFailed event"),
171+
}
172+
assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5);
173+
174+
// Bob will be banned
175+
assert!(sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await);
176+
}

0 commit comments

Comments
 (0)