Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Update common block in sync after importing blocks of a peer, please read UPDATE #7733

Merged
merged 16 commits into from
Dec 18, 2020
Merged
136 changes: 99 additions & 37 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ pub struct ChainSync<B: BlockT> {
/// All the data we have about a Peer that we are trying to sync with
#[derive(Debug, Clone)]
pub struct PeerSync<B: BlockT> {
/// Peer id of this peer.
pub peer_id: PeerId,
/// The common number is the block number that is a common point of
/// ancestry for both our chains (as far as we know).
pub common_number: NumberFor<B>,
Expand All @@ -223,6 +225,22 @@ pub struct PeerSync<B: BlockT> {
pub state: PeerSyncState<B>,
}

impl<B: BlockT> PeerSync<B> {
/// Update the `common_number` iff `new_common > common_number`.
fn update_common_number(&mut self, new_common: NumberFor<B>) {
if self.common_number < new_common {
trace!(
target: "sync",
"Updating peer {} common number from={} => to={}.",
self.peer_id,
self.common_number,
new_common,
);
self.common_number = new_common;
}
}
}

/// The sync status of a peer we are trying to sync with
#[derive(Debug)]
pub struct PeerInfo<B: BlockT> {
Expand Down Expand Up @@ -264,11 +282,7 @@ pub enum PeerSyncState<B: BlockT> {

impl<B: BlockT> PeerSyncState<B> {
pub fn is_available(&self) -> bool {
if let PeerSyncState::Available = self {
true
} else {
false
}
matches!(self, Self::Available)
}
}

Expand Down Expand Up @@ -512,7 +526,8 @@ impl<B: BlockT> ChainSync<B> {
self.best_queued_hash,
self.best_queued_number
);
self.peers.insert(who, PeerSync {
self.peers.insert(who.clone(), PeerSync {
peer_id: who,
common_number: self.best_queued_number,
best_hash,
best_number,
Expand All @@ -525,6 +540,7 @@ impl<B: BlockT> ChainSync<B> {
if self.best_queued_number.is_zero() {
debug!(target:"sync", "New peer with best hash {} ({}).", best_hash, best_number);
self.peers.insert(who.clone(), PeerSync {
peer_id: who.clone(),
common_number: Zero::zero(),
best_hash,
best_number,
Expand All @@ -536,14 +552,16 @@ impl<B: BlockT> ChainSync<B> {

let common_best = std::cmp::min(self.best_queued_number, best_number);

debug!(target:"sync",
debug!(
target:"sync",
"New peer with unknown best hash {} ({}), searching for common ancestor.",
best_hash,
best_number
);

self.pending_requests.add(&who);
self.peers.insert(who, PeerSync {
self.peers.insert(who.clone(), PeerSync {
peer_id: who,
common_number: Zero::zero(),
best_hash,
best_number,
Expand All @@ -557,8 +575,14 @@ impl<B: BlockT> ChainSync<B> {
Ok(Some(ancestry_request::<B>(common_best)))
}
Ok(BlockStatus::Queued) | Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => {
debug!(target:"sync", "New peer with known best hash {} ({}).", best_hash, best_number);
debug!(
target: "sync",
"New peer with known best hash {} ({}).",
best_hash,
best_number,
);
self.peers.insert(who.clone(), PeerSync {
peer_id: who.clone(),
common_number: best_number,
best_hash,
best_number,
Expand Down Expand Up @@ -991,7 +1015,11 @@ impl<B: BlockT> ChainSync<B> {
}

match result {
Ok(BlockImportResult::ImportedKnown(_number)) => {}
Ok(BlockImportResult::ImportedKnown(number, who)) => {
if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) {
peer.update_common_number(number);
}
}
Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => {
if aux.clear_justification_requests {
trace!(
Expand All @@ -1004,38 +1032,61 @@ impl<B: BlockT> ChainSync<B> {
}

if aux.needs_justification {
trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash);
trace!(
target: "sync",
"Block imported but requires justification {}: {:?}",
number,
hash,
);
self.request_justification(&hash, number);
}

if aux.bad_justification {
if let Some(peer) = who {
if let Some(ref peer) = who {
info!("💔 Sent block with bad justification to import");
output.push(Err(BadPeer(peer, rep::BAD_JUSTIFICATION)));
output.push(Err(BadPeer(peer.clone(), rep::BAD_JUSTIFICATION)));
}
}

if number > self.best_imported_number {
self.best_imported_number = number;
}

if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) {
peer.update_common_number(number);
}
},
Err(BlockImportError::IncompleteHeader(who)) => {
if let Some(peer) = who {
warn!("💔 Peer sent block with incomplete header to import");
warn!(
target: "sync",
"💔 Peer sent block with incomplete header to import",
);
output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER)));
output.extend(self.restart());
}
},
Err(BlockImportError::VerificationFailed(who, e)) => {
if let Some(peer) = who {
warn!("💔 Verification failed for block {:?} received from peer: {}, {:?}", hash, peer, e);
warn!(
target: "sync",
"💔 Verification failed for block {:?} received from peer: {}, {:?}",
hash,
peer,
e,
);
output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL)));
output.extend(self.restart());
}
},
Err(BlockImportError::BadBlock(who)) => {
if let Some(peer) = who {
info!("💔 Block {:?} received from peer {} has been blacklisted", hash, peer);
info!(
target: "sync",
"💔 Block {:?} received from peer {} has been blacklisted",
hash,
peer,
);
output.push(Err(BadPeer(peer, rep::BAD_BLOCK)));
}
},
Expand Down Expand Up @@ -1074,7 +1125,11 @@ impl<B: BlockT> ChainSync<B> {
});

if let Err(err) = r {
warn!(target: "sync", "💔 Error cleaning up pending extra justification data requests: {:?}", err);
warn!(
target: "sync",
"💔 Error cleaning up pending extra justification data requests: {:?}",
err,
);
}
}

Expand Down Expand Up @@ -1279,6 +1334,12 @@ impl<B: BlockT> ChainSync<B> {
&mut self,
pre_validation_result: PreValidateBlockAnnounce<B::Header>,
) -> PollBlockAnnounceValidation<B::Header> {
trace!(
target: "sync",
"Finished block announce validation: {:?}",
pre_validation_result,
);

let (announce, is_best, who) = match pre_validation_result {
PreValidateBlockAnnounce::Nothing { is_best, who, announce } => {
self.peer_block_announce_validation_finished(&who);
Expand Down Expand Up @@ -1316,18 +1377,19 @@ impl<B: BlockT> ChainSync<B> {
}

if let PeerSyncState::AncestorSearch {..} = peer.state {
trace!(target: "sync", "Peer state is ancestor search.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message could use more context, as it may be interleaved with other messages in the log. Perhaps would be better to additionally print peer.state here instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the top of the function we already print the peer id and nothing else can be printed in between from syncing that isn't related to this peer. So, we don't need to print anymore here.

return PollBlockAnnounceValidation::Nothing { is_best, who, header }
}

// If the announced block is the best they have and is not ahead of us, our common number
// is either one further ahead or it's the one they just announced, if we know about it.
if is_best {
if known && self.best_queued_number >= number {
peer.common_number = number
peer.update_common_number(number);
} else if header.parent_hash() == &self.best_queued_hash
|| known_parent && self.best_queued_number >= number
{
peer.common_number = number - One::one();
peer.update_common_number(number - One::one());
}
}
self.pending_requests.add(&who);
Expand Down Expand Up @@ -1367,6 +1429,7 @@ impl<B: BlockT> ChainSync<B> {
.peers.insert(who.clone());
}

trace!(target: "sync", "Announce validation result is nothing");
PollBlockAnnounceValidation::Nothing { is_best, who, header }
}

Expand Down Expand Up @@ -1750,7 +1813,7 @@ mod test {
use substrate_test_runtime_client::{
runtime::{Block, Hash, Header},
ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
BlockBuilderExt,
BlockBuilderExt, TestClient,
};
use futures::{future::poll_fn, executor::block_on};

Expand Down Expand Up @@ -1963,6 +2026,13 @@ mod test {
request
}

/// Build and import a new best block.
fn build_block(client: &mut Arc<TestClient>) -> Block {
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block.clone()).unwrap();
block
}

/// This test is a regression test as observed on a real network.
///
/// The node is connected to multiple peers. Both of these peers are having a best block (1) that
Expand Down Expand Up @@ -1990,14 +2060,6 @@ mod test {
let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();

let mut client2 = client.clone();
let mut build_block = || {
let block = client2.new_block(Default::default()).unwrap().build().unwrap().block;
client2.import(BlockOrigin::Own, block.clone()).unwrap();

block
};

let mut client2 = client.clone();
let mut build_block_at = |at, import| {
let mut block_builder = client2.new_block_at(&BlockId::Hash(at), Default::default(), false)
Expand All @@ -2014,9 +2076,9 @@ mod test {
block
};

let block1 = build_block();
let block2 = build_block();
let block3 = build_block();
let block1 = build_block(&mut client);
let block2 = build_block(&mut client);
let block3 = build_block(&mut client);
let block3_fork = build_block_at(block2.hash(), false);

// Add two peers which are on block 1.
Expand All @@ -2041,24 +2103,24 @@ mod test {
// Let peer2 announce block 4 and check that sync wants to get the block.
send_block_announce(block4.header().clone(), &peer_id2, &mut sync);

let request = get_block_request(&mut sync, FromBlock::Hash(block4.hash()), 2, &peer_id2);
let request = get_block_request(&mut sync, FromBlock::Hash(block4.hash()), 1, &peer_id2);

// Peer1 announces the same block, but as the common block is still `1`, sync will request
// block 2 again.
send_block_announce(block4.header().clone(), &peer_id1, &mut sync);

let request2 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id1);
let request2 = get_block_request(&mut sync, FromBlock::Number(3), 2, &peer_id1);

let response = create_block_response(vec![block4.clone(), block3_fork.clone()]);
let response = create_block_response(vec![block4.clone()]);
let res = sync.on_block_data(&peer_id2, Some(request), response).unwrap();

// We should not yet import the blocks, because there is still an open request for fetching
// block `2` which blocks the import.
assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty()));

let request3 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id2);
let request3 = get_block_request(&mut sync, FromBlock::Number(3), 2, &peer_id2);

let response = create_block_response(vec![block2.clone()]);
let response = create_block_response(vec![block3_fork.clone(), block2.clone()]);
let res = sync.on_block_data(&peer_id1, Some(request2), response).unwrap();
assert!(
matches!(
Expand All @@ -2068,7 +2130,7 @@ mod test {
)
);

let response = create_block_response(vec![block2.clone()]);
let response = create_block_response(vec![block3_fork.clone(), block2.clone()]);
let res = sync.on_block_data(&peer_id2, Some(request3), response).unwrap();
// Nothing to import
assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty()));
Expand Down
5 changes: 3 additions & 2 deletions client/network/src/protocol/sync/extra_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ mod tests {
impl Arbitrary for ArbitraryPeerSync {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let ps = PeerSync {
peer_id: PeerId::random(),
common_number: g.gen(),
best_hash: Hash::random(),
best_number: g.gen(),
Expand All @@ -561,10 +562,10 @@ mod tests {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let mut peers = HashMap::with_capacity(g.size());
for _ in 0 .. g.size() {
peers.insert(PeerId::random(), ArbitraryPeerSync::arbitrary(g).0);
let ps = ArbitraryPeerSync::arbitrary(g).0;
peers.insert(ps.peer_id.clone(), ps);
}
ArbitraryPeers(peers)
}
}

}
2 changes: 1 addition & 1 deletion client/network/test/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn import_single_good_known_block_is_ignored() {
block,
&mut PassThroughVerifier::new(true)
) {
Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number => {}
Ok(BlockImportResult::ImportedKnown(ref n, _)) if *n == number => {}
_ => panic!()
}
}
Expand Down
2 changes: 2 additions & 0 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,8 @@ pub trait TestNetFactory: Sized {
metrics_registry: None,
}).unwrap();

trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id());

self.mut_peers(|peers| {
for peer in peers.iter_mut() {
peer.network.add_known_address(network.service().local_peer_id().clone(), listen_addr.clone());
Expand Down
Loading