From ef3e455fdef0637f0578c842c61ef0825a5bc3a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 15 Dec 2020 12:32:27 +0100 Subject: [PATCH 01/15] Update common block in sync after importing blocks of a peer This updates the sync code to update the common block of a peer, after we have imported blocks from this peer. This fixes a bug for when we are connected to one or more nodes that are doing a full sync as our node. Nodes in full sync will not announce new blocks, as we don't send import notifications on full sync. The problem as now that we were connected to some peer that reported some low number as its best and we tried to sync these blocks. But, as we did not update the common block of this peer, we would sync these blocks over and over again. Being captured in some time warp. The solution to this problem is that we increase the common number as we import blocks from this peer. --- client/network/src/protocol/sync.rs | 136 +++++++++++++----- .../src/protocol/sync/extra_requests.rs | 5 +- client/network/test/src/block_import.rs | 2 +- client/network/test/src/lib.rs | 2 + client/network/test/src/sync.rs | 32 ++++- .../consensus/common/src/import_queue.rs | 4 +- 6 files changed, 136 insertions(+), 45 deletions(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 1ff8d37afeca9..46fd2ef021562 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -211,6 +211,8 @@ pub struct ChainSync { /// All the data we have about a Peer that we are trying to sync with #[derive(Debug, Clone)] pub struct PeerSync { + /// Peer id of this peer. + pub peer_id: PeerId, /// The common number is the block number that is a common point of /// ancestry for both our chains (as far as we know). pub common_number: NumberFor, @@ -223,6 +225,22 @@ pub struct PeerSync { pub state: PeerSyncState, } +impl PeerSync { + /// Update the `common_number` iff `new_common > common_number`. + fn update_common_number(&mut self, new_common: NumberFor) { + if self.common_number < new_common { + trace!( + target: "sync", + "Updating peer {} common number from={} => to={}.", + self.peer_id, + self.common_number, + new_common, + ); + self.common_number = new_common; + } + } +} + /// The sync status of a peer we are trying to sync with #[derive(Debug)] pub struct PeerInfo { @@ -264,11 +282,7 @@ pub enum PeerSyncState { impl PeerSyncState { pub fn is_available(&self) -> bool { - if let PeerSyncState::Available = self { - true - } else { - false - } + matches!(self, Self::Available) } } @@ -512,7 +526,8 @@ impl ChainSync { self.best_queued_hash, self.best_queued_number ); - self.peers.insert(who, PeerSync { + self.peers.insert(who.clone(), PeerSync { + peer_id: who, common_number: self.best_queued_number, best_hash, best_number, @@ -525,6 +540,7 @@ impl ChainSync { if self.best_queued_number.is_zero() { debug!(target:"sync", "New peer with best hash {} ({}).", best_hash, best_number); self.peers.insert(who.clone(), PeerSync { + peer_id: who.clone(), common_number: Zero::zero(), best_hash, best_number, @@ -536,14 +552,16 @@ impl ChainSync { let common_best = std::cmp::min(self.best_queued_number, best_number); - debug!(target:"sync", + debug!( + target:"sync", "New peer with unknown best hash {} ({}), searching for common ancestor.", best_hash, best_number ); self.pending_requests.add(&who); - self.peers.insert(who, PeerSync { + self.peers.insert(who.clone(), PeerSync { + peer_id: who, common_number: Zero::zero(), best_hash, best_number, @@ -557,8 +575,14 @@ impl ChainSync { Ok(Some(ancestry_request::(common_best))) } Ok(BlockStatus::Queued) | Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => { - debug!(target:"sync", "New peer with known best hash {} ({}).", best_hash, best_number); + debug!( + target: "sync", + "New peer with known best hash {} ({}).", + best_hash, + best_number, + ); self.peers.insert(who.clone(), PeerSync { + peer_id: who.clone(), common_number: best_number, best_hash, best_number, @@ -991,7 +1015,11 @@ impl ChainSync { } match result { - Ok(BlockImportResult::ImportedKnown(_number)) => {} + Ok(BlockImportResult::ImportedKnown(number, who)) => { + if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) { + peer.update_common_number(number); + } + } Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => { if aux.clear_justification_requests { trace!( @@ -1004,38 +1032,61 @@ impl ChainSync { } if aux.needs_justification { - trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash); + trace!( + target: "sync", + "Block imported but requires justification {}: {:?}", + number, + hash, + ); self.request_justification(&hash, number); } if aux.bad_justification { - if let Some(peer) = who { + if let Some(ref peer) = who { info!("๐Ÿ’” Sent block with bad justification to import"); - output.push(Err(BadPeer(peer, rep::BAD_JUSTIFICATION))); + output.push(Err(BadPeer(peer.clone(), rep::BAD_JUSTIFICATION))); } } if number > self.best_imported_number { self.best_imported_number = number; } + + if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) { + peer.update_common_number(number); + } }, Err(BlockImportError::IncompleteHeader(who)) => { if let Some(peer) = who { - warn!("๐Ÿ’” Peer sent block with incomplete header to import"); + warn!( + target: "sync", + "๐Ÿ’” Peer sent block with incomplete header to import", + ); output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER))); output.extend(self.restart()); } }, Err(BlockImportError::VerificationFailed(who, e)) => { if let Some(peer) = who { - warn!("๐Ÿ’” Verification failed for block {:?} received from peer: {}, {:?}", hash, peer, e); + warn!( + target: "sync", + "๐Ÿ’” Verification failed for block {:?} received from peer: {}, {:?}", + hash, + peer, + e, + ); output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL))); output.extend(self.restart()); } }, Err(BlockImportError::BadBlock(who)) => { if let Some(peer) = who { - info!("๐Ÿ’” Block {:?} received from peer {} has been blacklisted", hash, peer); + info!( + target: "sync", + "๐Ÿ’” Block {:?} received from peer {} has been blacklisted", + hash, + peer, + ); output.push(Err(BadPeer(peer, rep::BAD_BLOCK))); } }, @@ -1074,7 +1125,11 @@ impl ChainSync { }); if let Err(err) = r { - warn!(target: "sync", "๐Ÿ’” Error cleaning up pending extra justification data requests: {:?}", err); + warn!( + target: "sync", + "๐Ÿ’” Error cleaning up pending extra justification data requests: {:?}", + err, + ); } } @@ -1279,6 +1334,12 @@ impl ChainSync { &mut self, pre_validation_result: PreValidateBlockAnnounce, ) -> PollBlockAnnounceValidation { + trace!( + target: "sync", + "Finished block announce validation: {:?}", + pre_validation_result, + ); + let (announce, is_best, who) = match pre_validation_result { PreValidateBlockAnnounce::Nothing { is_best, who, announce } => { self.peer_block_announce_validation_finished(&who); @@ -1316,6 +1377,7 @@ impl ChainSync { } if let PeerSyncState::AncestorSearch {..} = peer.state { + trace!(target: "sync", "Peer state is ancestor search."); return PollBlockAnnounceValidation::Nothing { is_best, who, header } } @@ -1323,11 +1385,11 @@ impl ChainSync { // is either one further ahead or it's the one they just announced, if we know about it. if is_best { if known && self.best_queued_number >= number { - peer.common_number = number + peer.update_common_number(number); } else if header.parent_hash() == &self.best_queued_hash || known_parent && self.best_queued_number >= number { - peer.common_number = number - One::one(); + peer.update_common_number(number - One::one()); } } self.pending_requests.add(&who); @@ -1367,6 +1429,7 @@ impl ChainSync { .peers.insert(who.clone()); } + trace!(target: "sync", "Announce validation result is nothing"); PollBlockAnnounceValidation::Nothing { is_best, who, header } } @@ -1750,7 +1813,7 @@ mod test { use substrate_test_runtime_client::{ runtime::{Block, Hash, Header}, ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, - BlockBuilderExt, + BlockBuilderExt, TestClient, }; use futures::{future::poll_fn, executor::block_on}; @@ -1963,6 +2026,13 @@ mod test { request } + /// Build and import a new best block. + fn build_block(client: &mut Arc) -> Block { + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).unwrap(); + block + } + /// This test is a regression test as observed on a real network. /// /// The node is connected to multiple peers. Both of these peers are having a best block (1) that @@ -1990,14 +2060,6 @@ mod test { let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); - let mut client2 = client.clone(); - let mut build_block = || { - let block = client2.new_block(Default::default()).unwrap().build().unwrap().block; - client2.import(BlockOrigin::Own, block.clone()).unwrap(); - - block - }; - let mut client2 = client.clone(); let mut build_block_at = |at, import| { let mut block_builder = client2.new_block_at(&BlockId::Hash(at), Default::default(), false) @@ -2014,9 +2076,9 @@ mod test { block }; - let block1 = build_block(); - let block2 = build_block(); - let block3 = build_block(); + let block1 = build_block(&mut client); + let block2 = build_block(&mut client); + let block3 = build_block(&mut client); let block3_fork = build_block_at(block2.hash(), false); // Add two peers which are on block 1. @@ -2041,24 +2103,24 @@ mod test { // Let peer2 announce block 4 and check that sync wants to get the block. send_block_announce(block4.header().clone(), &peer_id2, &mut sync); - let request = get_block_request(&mut sync, FromBlock::Hash(block4.hash()), 2, &peer_id2); + let request = get_block_request(&mut sync, FromBlock::Hash(block4.hash()), 1, &peer_id2); // Peer1 announces the same block, but as the common block is still `1`, sync will request // block 2 again. send_block_announce(block4.header().clone(), &peer_id1, &mut sync); - let request2 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id1); + let request2 = get_block_request(&mut sync, FromBlock::Number(3), 2, &peer_id1); - let response = create_block_response(vec![block4.clone(), block3_fork.clone()]); + let response = create_block_response(vec![block4.clone()]); let res = sync.on_block_data(&peer_id2, Some(request), response).unwrap(); // We should not yet import the blocks, because there is still an open request for fetching // block `2` which blocks the import. assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty())); - let request3 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id2); + let request3 = get_block_request(&mut sync, FromBlock::Number(3), 2, &peer_id2); - let response = create_block_response(vec![block2.clone()]); + let response = create_block_response(vec![block3_fork.clone(), block2.clone()]); let res = sync.on_block_data(&peer_id1, Some(request2), response).unwrap(); assert!( matches!( @@ -2068,7 +2130,7 @@ mod test { ) ); - let response = create_block_response(vec![block2.clone()]); + let response = create_block_response(vec![block3_fork.clone(), block2.clone()]); let res = sync.on_block_data(&peer_id2, Some(request3), response).unwrap(); // Nothing to import assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty())); diff --git a/client/network/src/protocol/sync/extra_requests.rs b/client/network/src/protocol/sync/extra_requests.rs index 7a7198aa7a0b6..84ad308c61eda 100644 --- a/client/network/src/protocol/sync/extra_requests.rs +++ b/client/network/src/protocol/sync/extra_requests.rs @@ -545,6 +545,7 @@ mod tests { impl Arbitrary for ArbitraryPeerSync { fn arbitrary(g: &mut G) -> Self { let ps = PeerSync { + peer_id: PeerId::random(), common_number: g.gen(), best_hash: Hash::random(), best_number: g.gen(), @@ -561,10 +562,10 @@ mod tests { fn arbitrary(g: &mut G) -> Self { let mut peers = HashMap::with_capacity(g.size()); for _ in 0 .. g.size() { - peers.insert(PeerId::random(), ArbitraryPeerSync::arbitrary(g).0); + let ps = ArbitraryPeerSync::arbitrary(g).0; + peers.insert(ps.peer_id.clone(), ps); } ArbitraryPeers(peers) } } - } diff --git a/client/network/test/src/block_import.rs b/client/network/test/src/block_import.rs index a5d0600abefea..5f9064d410e09 100644 --- a/client/network/test/src/block_import.rs +++ b/client/network/test/src/block_import.rs @@ -76,7 +76,7 @@ fn import_single_good_known_block_is_ignored() { block, &mut PassThroughVerifier::new(true) ) { - Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number => {} + Ok(BlockImportResult::ImportedKnown(ref n, _)) if *n == number => {} _ => panic!() } } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index a70ecb4fb0484..428d8390b3658 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -696,6 +696,8 @@ pub trait TestNetFactory: Sized { metrics_registry: None, }).unwrap(); + trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id()); + self.mut_peers(|peers| { for peer in peers.iter_mut() { peer.network.add_known_address(network.service().local_peer_id().clone(), listen_addr.clone()); diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 9a488ae4fa49c..448d288c0cc9f 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -721,7 +721,6 @@ impl BlockAnnounceValidator for NewBestBlockAnnounceValidator { #[test] fn sync_blocks_when_block_announce_validator_says_it_is_new_best() { sp_tracing::try_init_simple(); - log::trace!(target: "sync", "Test"); let mut net = TestNet::with_fork_choice(ForkChoiceStrategy::Custom(false)); net.add_full_peer_with_config(Default::default()); net.add_full_peer_with_config(Default::default()); @@ -763,7 +762,6 @@ impl BlockAnnounceValidator for DeferredBlockAnnounceValidator { #[test] fn wait_until_deferred_block_announce_validation_is_ready() { sp_tracing::try_init_simple(); - log::trace!(target: "sync", "Test"); let mut net = TestNet::with_fork_choice(ForkChoiceStrategy::Custom(false)); net.add_full_peer_with_config(Default::default()); net.add_full_peer_with_config(FullPeerConfig { @@ -785,7 +783,6 @@ fn wait_until_deferred_block_announce_validation_is_ready() { #[test] fn sync_to_tip_requires_that_sync_protocol_is_informed_about_best_block() { sp_tracing::try_init_simple(); - log::trace!(target: "sync", "Test"); let mut net = TestNet::new(1); // Produce some blocks @@ -814,3 +811,32 @@ fn sync_to_tip_requires_that_sync_protocol_is_informed_about_best_block() { // However peer 1 should still not have the block. assert!(!net.peer(1).has_block(&block_hash)); } + +#[test] +fn lol() { + sp_tracing::try_init_simple(); + + let mut net = TestNet::new(3); + + let block_hash = net.peer(0).push_blocks_at_without_informing_sync( + BlockId::Number(0), + 10_000, + false, + ); + + net.peer(1).push_blocks_at_without_informing_sync( + BlockId::Number(0), + 5_000, + false, + ); + + net.block_until_connected(); + net.block_until_idle(); + + assert!(!net.peer(2).has_block(&block_hash)); + + net.peer(0).network_service().new_best_block_imported(block_hash, 3); + while !net.peer(2).has_block(&block_hash) { + net.block_until_idle(); + } +} diff --git a/primitives/consensus/common/src/import_queue.rs b/primitives/consensus/common/src/import_queue.rs index b32ca0133d995..713c59b07a54a 100644 --- a/primitives/consensus/common/src/import_queue.rs +++ b/primitives/consensus/common/src/import_queue.rs @@ -138,7 +138,7 @@ pub trait Link: Send { #[derive(Debug, PartialEq)] pub enum BlockImportResult { /// Imported known block. - ImportedKnown(N), + ImportedKnown(N, Option), /// Imported unknown block. ImportedUnknown(N, ImportedAux, Option), } @@ -204,7 +204,7 @@ pub(crate) fn import_single_block_metered, Transaction match import { Ok(ImportResult::AlreadyInChain) => { trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); - Ok(BlockImportResult::ImportedKnown(number)) + Ok(BlockImportResult::ImportedKnown(number, peer.clone())) }, Ok(ImportResult::Imported(aux)) => Ok(BlockImportResult::ImportedUnknown(number, aux, peer.clone())), Ok(ImportResult::MissingState) => { From 609768f6ee9f6733ba47867af77fdd3c5ceb441b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 15 Dec 2020 12:43:59 +0100 Subject: [PATCH 02/15] Test --- client/network/test/src/sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 448d288c0cc9f..b7fe4b51deead 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -835,8 +835,8 @@ fn lol() { assert!(!net.peer(2).has_block(&block_hash)); - net.peer(0).network_service().new_best_block_imported(block_hash, 3); - while !net.peer(2).has_block(&block_hash) { + net.peer(0).network_service().new_best_block_imported(block_hash, 10_000); + while !net.peer(2).has_block(&block_hash) && !net.peer(1).has_block(&block_hash) { net.block_until_idle(); } } From be4bd7d283dab79189a7d42d7e6ef6fe91cdffa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 15 Dec 2020 12:46:36 +0100 Subject: [PATCH 03/15] Test name.. --- client/network/test/src/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index b7fe4b51deead..8e467898ebae0 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -813,7 +813,7 @@ fn sync_to_tip_requires_that_sync_protocol_is_informed_about_best_block() { } #[test] -fn lol() { +fn make_sure_we_sync_to_tip_without_informing_sync_about_new_best_block() { sp_tracing::try_init_simple(); let mut net = TestNet::new(3); From d5c9afab076fcda2561dcc98d8b0f7e7894f350b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 15 Dec 2020 15:51:44 +0100 Subject: [PATCH 04/15] Fix test --- client/network/src/protocol/sync.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 46fd2ef021562..d02360aacb143 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -2103,24 +2103,24 @@ mod test { // Let peer2 announce block 4 and check that sync wants to get the block. send_block_announce(block4.header().clone(), &peer_id2, &mut sync); - let request = get_block_request(&mut sync, FromBlock::Hash(block4.hash()), 1, &peer_id2); + let request = get_block_request(&mut sync, FromBlock::Hash(block4.hash()), 2, &peer_id2); // Peer1 announces the same block, but as the common block is still `1`, sync will request // block 2 again. send_block_announce(block4.header().clone(), &peer_id1, &mut sync); - let request2 = get_block_request(&mut sync, FromBlock::Number(3), 2, &peer_id1); + let request2 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id1); - let response = create_block_response(vec![block4.clone()]); + let response = create_block_response(vec![block4.clone(), block3_fork.clone()]); let res = sync.on_block_data(&peer_id2, Some(request), response).unwrap(); // We should not yet import the blocks, because there is still an open request for fetching // block `2` which blocks the import. assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty())); - let request3 = get_block_request(&mut sync, FromBlock::Number(3), 2, &peer_id2); + let request3 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id2); - let response = create_block_response(vec![block3_fork.clone(), block2.clone()]); + let response = create_block_response(vec![block2.clone()]); let res = sync.on_block_data(&peer_id1, Some(request2), response).unwrap(); assert!( matches!( @@ -2130,7 +2130,7 @@ mod test { ) ); - let response = create_block_response(vec![block3_fork.clone(), block2.clone()]); + let response = create_block_response(vec![block2.clone()]); let res = sync.on_block_data(&peer_id2, Some(request3), response).unwrap(); // Nothing to import assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty())); From b0ceb60b0ffd951d0e370fd03816e4876a560739 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 15 Dec 2020 17:04:01 +0100 Subject: [PATCH 05/15] Cleanup some code and write some new regression test --- client/network/src/protocol/sync.rs | 154 ++++++++++++++++++++-------- 1 file changed, 110 insertions(+), 44 deletions(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index d02360aacb143..76e14f8c2fae8 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -537,27 +537,34 @@ impl ChainSync { } // If we are at genesis, just start downloading. - if self.best_queued_number.is_zero() { - debug!(target:"sync", "New peer with best hash {} ({}).", best_hash, best_number); - self.peers.insert(who.clone(), PeerSync { - peer_id: who.clone(), - common_number: Zero::zero(), + let (state, req) = if self.best_queued_number.is_zero() { + debug!( + target:"sync", + "New peer with best hash {} ({}).", best_hash, best_number, - state: PeerSyncState::Available, - }); - self.pending_requests.add(&who); - return Ok(None) - } + ); - let common_best = std::cmp::min(self.best_queued_number, best_number); + (PeerSyncState::Available, None) + } else { + let common_best = std::cmp::min(self.best_queued_number, best_number); - debug!( - target:"sync", - "New peer with unknown best hash {} ({}), searching for common ancestor.", - best_hash, - best_number - ); + debug!( + target:"sync", + "New peer with unknown best hash {} ({}), searching for common ancestor.", + best_hash, + best_number + ); + + ( + PeerSyncState::AncestorSearch { + current: common_best, + start: self.best_queued_number, + state: AncestorSearchState::ExponentialBackoff(One::one()), + }, + Some(ancestry_request::(common_best)) + ) + }; self.pending_requests.add(&who); self.peers.insert(who.clone(), PeerSync { @@ -565,14 +572,10 @@ impl ChainSync { common_number: Zero::zero(), best_hash, best_number, - state: PeerSyncState::AncestorSearch { - current: common_best, - start: self.best_queued_number, - state: AncestorSearchState::ExponentialBackoff(One::one()), - }, + state, }); - Ok(Some(ancestry_request::(common_best))) + Ok(req) } Ok(BlockStatus::Queued) | Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => { debug!( @@ -1607,36 +1610,34 @@ fn peer_block_request( id, peer.common_number, finalized, peer.best_number, best_num, ); } - if let Some(range) = blocks.needed_blocks( + let range = blocks.needed_blocks( id.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number, max_parallel_downloads, MAX_DOWNLOAD_AHEAD, - ) { - // The end is not part of the range. - let last = range.end.saturating_sub(One::one()); - - let from = if peer.best_number == last { - message::FromBlock::Hash(peer.best_hash) - } else { - message::FromBlock::Number(last) - }; + )?; - let request = message::generic::BlockRequest { - id: 0, - fields: attrs.clone(), - from, - to: None, - direction: message::Direction::Descending, - max: Some((range.end - range.start).saturated_into::()) - }; + // The end is not part of the range. + let last = range.end.saturating_sub(One::one()); - Some((range, request)) + let from = if peer.best_number == last { + message::FromBlock::Hash(peer.best_hash) } else { - None - } + message::FromBlock::Number(last) + }; + + let request = message::generic::BlockRequest { + id: 0, + fields: attrs.clone(), + from, + to: None, + direction: message::Direction::Descending, + max: Some((range.end - range.start).saturated_into::()) + }; + + Some((range, request)) } /// Get pending fork sync targets for a peer. @@ -2135,4 +2136,69 @@ mod test { // Nothing to import assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty())); } + + #[test] + fn lol() { + sp_tracing::try_init_simple(); + + let blocks = { + let mut client = Arc::new(TestClientBuilder::new().build()); + (0..MAX_DOWNLOAD_AHEAD * 2).map(|_| build_block(&mut client)).collect::>() + }; + + let mut client = Arc::new(TestClientBuilder::new().build()); + let info = client.info(); + + let mut sync = ChainSync::new( + Roles::AUTHORITY, + client.clone(), + &info, + Box::new(DefaultBlockAnnounceValidator), + 5, + ); + + let peer_id1 = PeerId::random(); + let peer_id2 = PeerId::random(); + + let best_block = blocks.last().unwrap().clone(); + // Connect the node we will sync from + sync.new_peer(peer_id1.clone(), best_block.hash(), *best_block.header().number()).unwrap(); + sync.new_peer(peer_id2.clone(), info.best_hash, 0).unwrap(); + + let mut best_block_num = 0; + while best_block_num < MAX_DOWNLOAD_AHEAD { + let request = get_block_request( + &mut sync, + FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64), + MAX_BLOCKS_TO_REQUEST as u32, + &peer_id1, + ); + + let from = if let FromBlock::Number(from) = request.from { + from + } else { + panic!("We don't expect a hash!"); + }; + + let mut resp_blocks = blocks[best_block_num as usize..from as usize].to_vec(); + resp_blocks.reverse(); + + let response = create_block_response(resp_blocks); + + let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + assert!( + matches!( + res, + OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST, + ), + ); + + best_block_num += MAX_BLOCKS_TO_REQUEST as u32; + } + + // Let peer2 announce that it finished syncing + send_block_announce(best_block.header().clone(), &peer_id2, &mut sync); + + panic!("{:?}", sync.block_requests().collect::>()); + } } From 976a765377295db7cf3e98db63c74703d5cee66d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 15 Dec 2020 22:23:22 +0100 Subject: [PATCH 06/15] Implement the ancestor search --- client/network/src/protocol/sync.rs | 117 +++++++++++++++++++++++----- 1 file changed, 97 insertions(+), 20 deletions(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 76e14f8c2fae8..b933c9755ec6f 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -67,6 +67,10 @@ const MAX_IMPORTING_BLOCKS: usize = 2048; /// Maximum blocks to download ahead of any gap. const MAX_DOWNLOAD_AHEAD: u32 = 2048; +/// Maximum blocks to look backwards. The gap is the difference between the highest block and the +/// common block of a node. +const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2; + /// Maximum number of concurrent block announce validations. /// /// If the queue reaches the maximum, we drop any new block @@ -714,7 +718,20 @@ impl ChainSync { return None } - if let Some((range, req)) = peer_block_request( + // If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from the + // common number, but the peer best number is higher than our best queued, we should + // do an ancestor search to find a better common block. + if best_queued.saturating_sub(peer.common_number) > MAX_BLOCKS_TO_LOOK_BACKWARDS.into() + && best_queued < peer.best_number + { + let current = std::cmp::min(peer.best_number, best_queued); + peer.state = PeerSyncState::AncestorSearch { + current, + start: best_queued, + state: AncestorSearchState::ExponentialBackoff(One::one()), + }; + Some((id, ancestry_request::(current))) + } else if let Some((range, req)) = peer_block_request( id, peer, blocks, @@ -822,15 +839,29 @@ impl ChainSync { PeerSyncState::AncestorSearch { current, start, state } => { let matching_hash = match (blocks.get(0), self.client.hash(*current)) { (Some(block), Ok(maybe_our_block_hash)) => { - trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", current, block.hash, who); + trace!( + target: "sync", + "Got ancestry block #{} ({}) from peer {}", + current, + block.hash, + who, + ); maybe_our_block_hash.filter(|x| x == &block.hash) }, (None, _) => { - debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); + debug!( + target: "sync", + "Invalid response when searching for ancestor from {}", + who, + ); return Err(BadPeer(who.clone(), rep::UNKNOWN_ANCESTOR)) }, (_, Err(e)) => { - info!("โŒ Error answering legitimate blockchain query: {:?}", e); + info!( + target: "sync", + "โŒ Error answering legitimate blockchain query: {:?}", + e, + ); return Err(BadPeer(who.clone(), rep::BLOCKCHAIN_READ_ERROR)) } }; @@ -1551,13 +1582,13 @@ pub enum AncestorSearchState { fn handle_ancestor_search_state( state: &AncestorSearchState, curr_block_num: NumberFor, - block_hash_match: bool + block_hash_match: bool, ) -> Option<(AncestorSearchState, NumberFor)> { let two = >::one() + >::one(); - match state { + match dbg!(state) { AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => { - let next_distance_to_tip = *next_distance_to_tip; - if block_hash_match && next_distance_to_tip == One::one() { + let next_distance_to_tip = dbg!(*next_distance_to_tip); + if dbg!(block_hash_match) && next_distance_to_tip == One::one() { // We found the ancestor in the first step so there is no need to execute binary search. return None; } @@ -1602,8 +1633,7 @@ fn peer_block_request( if best_num >= peer.best_number { // Will be downloaded as alternative fork instead. return None; - } - if peer.common_number < finalized { + } else if peer.common_number < finalized { trace!( target: "sync", "Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}", @@ -2012,7 +2042,7 @@ mod test { /// Get a block request from `sync` and check that is matches the expected request. fn get_block_request( sync: &mut ChainSync, - from: message::FromBlock, + from: FromBlock, max: u32, peer: &PeerId, ) -> BlockRequest { @@ -2137,8 +2167,23 @@ mod test { assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty())); } + fn unwrap_from_block_number(from: FromBlock) -> u64 { + if let FromBlock::Number(from) = from { + from + } else { + panic!("Expected a number!"); + } + } + + /// A regression test for a behavior we have seen on a live network. + /// + /// The scenario is that the node is doing a full resync and is connected to some node that is + /// doing a major sync as well. This other node that is doing a major sync will finish before + /// our node and send a block announcement message, but we don't have seen any block announcement + /// from this node in its sync process. Meaning our common number didn't change. It is now expected + /// that we start an ancestor search to find the common number. #[test] - fn lol() { + fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { sp_tracing::try_init_simple(); let blocks = { @@ -2174,31 +2219,63 @@ mod test { &peer_id1, ); - let from = if let FromBlock::Number(from) = request.from { - from - } else { - panic!("We don't expect a hash!"); - }; + let from = unwrap_from_block_number(request.from.clone()); let mut resp_blocks = blocks[best_block_num as usize..from as usize].to_vec(); resp_blocks.reverse(); - let response = create_block_response(resp_blocks); + let response = create_block_response(resp_blocks.clone()); let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); assert!( matches!( res, - OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST, + OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST ), ); best_block_num += MAX_BLOCKS_TO_REQUEST as u32; + + resp_blocks.into_iter().rev().for_each(|b| client.import(BlockOrigin::Own, b).unwrap()); } // Let peer2 announce that it finished syncing send_block_announce(best_block.header().clone(), &peer_id2, &mut sync); - panic!("{:?}", sync.block_requests().collect::>()); + let (peer1_req, peer2_req) = sync.block_requests().fold((None, None), |res, req| { + if req.0 == &peer_id1 { + (Some(req.1), res.1) + } else if req.0 == &peer_id2 { + (res.0, Some(req.1)) + } else { + panic!("Unexpected req: {:?}", req) + } + }); + + // We should now do an ancestor search to find the correct common block. + let peer2_req = peer2_req.unwrap(); + assert_eq!(Some(1), peer2_req.max); + assert_eq!(FromBlock::Number(best_block_num as u64), peer2_req.from); + + let response = create_block_response(vec![blocks[(best_block_num - 1) as usize].clone()]); + let res = sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap(); + assert!( + matches!( + res, + OnBlockData::Import(_, blocks) if blocks.is_empty() + ), + ); + + let peer1_from = unwrap_from_block_number(peer1_req.unwrap().from); + + // As we are on the same chain, we should directly continue with requesting blocks from + // peer 2 as well. + get_block_request( + &mut sync, + FromBlock::Number(peer1_from + MAX_BLOCKS_TO_REQUEST as u64), + MAX_BLOCKS_TO_REQUEST as u32, + &peer_id2, + ); } + } From 2c665a83ee1da3d698724a4f891316c4b737e45c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 15 Dec 2020 23:03:30 +0100 Subject: [PATCH 07/15] Check that the common number is smaller than the last finalized block --- client/network/src/protocol/sync.rs | 11 +++++++---- client/network/test/src/sync.rs | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index b933c9755ec6f..7bf93fba14642 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -719,10 +719,11 @@ impl ChainSync { } // If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from the - // common number, but the peer best number is higher than our best queued, we should - // do an ancestor search to find a better common block. + // common number, the peer best number is higher than our best queued and the common + // number is smaller than the last finalized block number, we should do an ancestor + // search to find a better common block. if best_queued.saturating_sub(peer.common_number) > MAX_BLOCKS_TO_LOOK_BACKWARDS.into() - && best_queued < peer.best_number + && best_queued < peer.best_number && peer.common_number < last_finalized { let current = std::cmp::min(peer.best_number, best_queued); peer.state = PeerSyncState::AncestorSearch { @@ -2236,7 +2237,9 @@ mod test { best_block_num += MAX_BLOCKS_TO_REQUEST as u32; - resp_blocks.into_iter().rev().for_each(|b| client.import(BlockOrigin::Own, b).unwrap()); + resp_blocks.into_iter() + .rev() + .for_each(|b| client.import_as_final(BlockOrigin::Own, b).unwrap()); } // Let peer2 announce that it finished syncing diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 8e467898ebae0..f0b140d6711e5 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -702,7 +702,7 @@ fn can_sync_to_peers_with_wrong_common_block() { net.block_until_sync(); - assert!(net.peer(1).client().header(&BlockId::Hash(final_hash)).unwrap().is_some()); + assert!(net.peer(1).has_block(&final_hash)); } /// Returns `is_new_best = true` for each validated announcement. From c7689da23598fce26f7e4da932efea59c8347507 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 16 Dec 2020 10:30:55 +0100 Subject: [PATCH 08/15] Update client/network/src/protocol/sync.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andrรฉ Silva <123550+andresilva@users.noreply.github.com> --- client/network/src/protocol/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 7bf93fba14642..ef5606b29311d 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -1586,7 +1586,7 @@ fn handle_ancestor_search_state( block_hash_match: bool, ) -> Option<(AncestorSearchState, NumberFor)> { let two = >::one() + >::one(); - match dbg!(state) { + match state { AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => { let next_distance_to_tip = dbg!(*next_distance_to_tip); if dbg!(block_hash_match) && next_distance_to_tip == One::one() { From b5420b4276795b18047e502198fa444cef9611ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 16 Dec 2020 10:31:05 +0100 Subject: [PATCH 09/15] Update client/network/src/protocol/sync.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andrรฉ Silva <123550+andresilva@users.noreply.github.com> --- client/network/src/protocol/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index ef5606b29311d..e2172ce17f636 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -1588,7 +1588,7 @@ fn handle_ancestor_search_state( let two = >::one() + >::one(); match state { AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => { - let next_distance_to_tip = dbg!(*next_distance_to_tip); + let next_distance_to_tip = *next_distance_to_tip; if dbg!(block_hash_match) && next_distance_to_tip == One::one() { // We found the ancestor in the first step so there is no need to execute binary search. return None; From 0838c38af7447fbcd704c291d0de2a00ee8a9d43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 16 Dec 2020 10:31:16 +0100 Subject: [PATCH 10/15] Update client/network/src/protocol/sync.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andrรฉ Silva <123550+andresilva@users.noreply.github.com> --- client/network/src/protocol/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index e2172ce17f636..3b800d9a6a6c1 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -1589,7 +1589,7 @@ fn handle_ancestor_search_state( match state { AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => { let next_distance_to_tip = *next_distance_to_tip; - if dbg!(block_hash_match) && next_distance_to_tip == One::one() { + if block_hash_match && next_distance_to_tip == One::one() { // We found the ancestor in the first step so there is no need to execute binary search. return None; } From 8567844365ea1e9401f5b8bd29e6eaf949aad0f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 17 Dec 2020 15:09:11 +0100 Subject: [PATCH 11/15] Change the way we build the status messages --- client/network/src/protocol.rs | 54 +++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 41326b6d82a07..5a64d1e12b6b3 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -307,27 +307,35 @@ struct BlockAnnouncesHandshake { } impl BlockAnnouncesHandshake { - fn build(protocol_config: &ProtocolConfig, chain: &Arc>) -> Self { - let info = chain.info(); + fn build( + protocol_config: &ProtocolConfig, + best_number: NumberFor, + best_hash: B::Hash, + genesis_hash: B::Hash, + ) -> Self { BlockAnnouncesHandshake { - genesis_hash: info.genesis_hash, + genesis_hash, roles: protocol_config.roles, - best_number: info.best_number, - best_hash: info.best_hash, + best_number, + best_hash, } } } /// Builds a SCALE-encoded "Status" message to send as handshake for the legacy protocol. -fn build_status_message(protocol_config: &ProtocolConfig, chain: &Arc>) -> Vec { - let info = chain.info(); +fn build_status_message( + protocol_config: &ProtocolConfig, + best_number: NumberFor, + best_hash: B::Hash, + genesis_hash: B::Hash, +) -> Vec { let status = message::generic::Status { version: CURRENT_VERSION, min_supported_version: MIN_VERSION, - genesis_hash: info.genesis_hash, + genesis_hash, roles: protocol_config.roles.into(), - best_number: info.best_number, - best_hash: info.best_hash, + best_number, + best_hash, chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible }; @@ -400,12 +408,22 @@ impl Protocol { let behaviour = { let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); - let block_announces_handshake = BlockAnnouncesHandshake::build(&config, &chain).encode(); + + let best_number = info.best_number; + let best_hash = info.best_hash; + let genesis_hash = info.genesis_hash; + + let block_announces_handshake = BlockAnnouncesHandshake::::build( + &config, + best_number, + best_hash, + genesis_hash, + ).encode(); GenericProto::new( local_peer_id, protocol_id.clone(), versions, - build_status_message(&config, &chain), + build_status_message::(&config, best_number, best_hash, genesis_hash), peerset, // As documented in `GenericProto`, the first protocol in the list is always the // one carrying the handshake reported in the `CustomProtocolOpen` event. @@ -528,13 +546,21 @@ impl Protocol { /// Inform sync about new best imported block. pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor) { + trace!(target: "sync", "New best block imported {:?}/{}", hash, number); + self.sync.update_chain_info(&hash, number); + self.behaviour.set_legacy_handshake_message( - build_status_message(&self.config, &self.context_data.chain), + build_status_message::(&self.config, number, hash, self.genesis_hash), ); self.behaviour.set_notif_protocol_handshake( &self.block_announces_protocol, - BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode() + BlockAnnouncesHandshake::::build( + &self.config, + number, + hash, + self.genesis_hash, + ).encode() ); } From 0bd40b8e08f72cfa9154b51838140a516925fddd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 17 Dec 2020 17:24:53 +0100 Subject: [PATCH 12/15] Start some new test... --- client/network/src/protocol/sync.rs | 79 ++++++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 7 deletions(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 3b800d9a6a6c1..1608b8fac18a9 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -1845,7 +1845,7 @@ mod test { use substrate_test_runtime_client::{ runtime::{Block, Hash, Header}, ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, - BlockBuilderExt, TestClient, + BlockBuilderExt, TestClient, ClientExt, }; use futures::{future::poll_fn, executor::block_on}; @@ -2059,8 +2059,21 @@ mod test { } /// Build and import a new best block. - fn build_block(client: &mut Arc) -> Block { - let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + fn build_block(client: &mut Arc, at: Option, fork: bool) -> Block { + let at = at.unwrap_or_else(|| client.info().best_hash); + + let mut block_builder = client.new_block_at( + &BlockId::Hash(at), + Default::default(), + false, + ).unwrap(); + + if fork { + block_builder.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6])).unwrap(); + } + + let block = block_builder.build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).unwrap(); block } @@ -2108,9 +2121,9 @@ mod test { block }; - let block1 = build_block(&mut client); - let block2 = build_block(&mut client); - let block3 = build_block(&mut client); + let block1 = build_block(&mut client, None, false); + let block2 = build_block(&mut client, None, false); + let block3 = build_block(&mut client, None, false); let block3_fork = build_block_at(block2.hash(), false); // Add two peers which are on block 1. @@ -2189,7 +2202,7 @@ mod test { let blocks = { let mut client = Arc::new(TestClientBuilder::new().build()); - (0..MAX_DOWNLOAD_AHEAD * 2).map(|_| build_block(&mut client)).collect::>() + (0..MAX_DOWNLOAD_AHEAD * 2).map(|_| build_block(&mut client, None, false)).collect::>() }; let mut client = Arc::new(TestClientBuilder::new().build()); @@ -2281,4 +2294,56 @@ mod test { ); } + #[test] + fn can_sync_huge_fork() { + sp_tracing::try_init_simple(); + + let mut client = Arc::new(TestClientBuilder::new().build()); + let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4) + .map(|_| build_block(&mut client, None, false)) + .collect::>(); + + let fork_blocks = { + let mut client = Arc::new(TestClientBuilder::new().build()); + let fork_blocks = blocks[..MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2] + .into_iter() + .inspect(|b| client.import(BlockOrigin::Own, (*b).clone()).unwrap()) + .cloned() + .collect::>(); + + fork_blocks.into_iter().chain( + (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 2 + 1) + .map(|_| build_block(&mut client, None, true)) + ).collect::>() + }; + + let info = client.info(); + + let mut sync = ChainSync::new( + Roles::AUTHORITY, + client.clone(), + &info, + Box::new(DefaultBlockAnnounceValidator), + 5, + ); + + let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2].clone(); + client.finalize_block(BlockId::Hash(finalized_block.hash()), Some(Vec::new())).unwrap(); + sync.update_chain_info(&info.best_hash, info.best_number); + + let peer_id1 = PeerId::random(); + + let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone(); + // Connect the node we will sync from + sync.new_peer(peer_id1.clone(), common_block.hash(), *common_block.header().number()).unwrap(); + + send_block_announce(fork_blocks.last().unwrap().header().clone(), &peer_id1, &mut sync); + + get_block_request( + &mut sync, + FromBlock::Number(*common_block.header().number() as u64), + 1, + &peer_id1, + ); + } } From 041842a62dc470316dc4cd8e4ee0eca9ae5ed285 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 18 Dec 2020 00:59:08 +0100 Subject: [PATCH 13/15] Finish test --- client/network/src/protocol/sync.rs | 121 ++++++++++++++++++++++++++-- 1 file changed, 115 insertions(+), 6 deletions(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 1608b8fac18a9..712c130028839 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -333,6 +333,18 @@ pub enum OnBlockData { Request(PeerId, BlockRequest) } +impl OnBlockData { + /// Returns `self` as request. + #[cfg(test)] + fn into_request(self) -> Option<(PeerId, BlockRequest)> { + if let Self::Request(peer, req) = self { + Some((peer, req)) + } else { + None + } + } +} + /// Result of [`ChainSync::poll_block_announce_validation`]. #[derive(Debug, Clone, PartialEq, Eq)] pub enum PollBlockAnnounceValidation { @@ -881,17 +893,23 @@ impl ChainSync { trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); return Err(BadPeer(who.clone(), rep::GENESIS_MISMATCH)) } - if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *current, matching_hash.is_some()) { + if let Some((next_state, next_num)) = + handle_ancestor_search_state(state, *current, matching_hash.is_some()) + { peer.state = PeerSyncState::AncestorSearch { current: next_num, start: *start, state: next_state, }; - return Ok(OnBlockData::Request(who.clone(), ancestry_request::(next_num))) + return Ok( + OnBlockData::Request(who.clone(), ancestry_request::(next_num)) + ) } else { // Ancestry search is complete. Check if peer is on a stale fork unknown to us and // add it to sync targets if necessary. - trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})", + trace!( + target: "sync", + "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})", self.best_queued_hash, self.best_queued_number, peer.best_hash, @@ -902,7 +920,12 @@ impl ChainSync { if peer.common_number < peer.best_number && peer.best_number < self.best_queued_number { - trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who); + trace!( + target: "sync", + "Added fork target {} for {}", + peer.best_hash, + who, + ); self.fork_targets .entry(peer.best_hash.clone()) .or_insert_with(|| ForkTarget { @@ -2048,6 +2071,9 @@ mod test { peer: &PeerId, ) -> BlockRequest { let requests = sync.block_requests().collect::>(); + + log::trace!(target: "sync", "Requests: {:?}", requests); + assert_eq!(1, requests.len()); assert_eq!(peer, requests[0].0); @@ -2294,6 +2320,14 @@ mod test { ); } + /// A test that ensures that we can sync a huge fork. + /// + /// The following scenario: + /// A peer connects to us and we both have the common block 512. The last finalized is 2048. + /// Our best block is 4096. The peer send us a block announcement with 4097 from a fork. + /// + /// We will first do an ancestor search to find the common block. After that we start to sync + /// the fork and finish it ;) #[test] fn can_sync_huge_fork() { sp_tracing::try_init_simple(); @@ -2327,7 +2361,7 @@ mod test { 5, ); - let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2].clone(); + let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); client.finalize_block(BlockId::Hash(finalized_block.hash()), Some(Vec::new())).unwrap(); sync.update_chain_info(&info.best_hash, info.best_number); @@ -2339,9 +2373,84 @@ mod test { send_block_announce(fork_blocks.last().unwrap().header().clone(), &peer_id1, &mut sync); + let mut request = get_block_request( + &mut sync, + FromBlock::Number(info.best_number), + 1, + &peer_id1, + ); + + // Do the ancestor search + loop { + let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; + let response = create_block_response(vec![block.clone()]); + + let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + request = match on_block_data.into_request() { + Some(req) => req.1, + // We found the ancenstor + None => break, + }; + + log::trace!(target: "sync", "Request: {:?}", request); + } + + // Now request and import the fork. + let mut best_block_num = finalized_block.header().number().clone() as u32; + while best_block_num < *fork_blocks.last().unwrap().header().number() as u32 - 1 { + let request = get_block_request( + &mut sync, + FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64), + MAX_BLOCKS_TO_REQUEST as u32, + &peer_id1, + ); + + let from = unwrap_from_block_number(request.from.clone()); + + let mut resp_blocks = fork_blocks[best_block_num as usize..from as usize].to_vec(); + resp_blocks.reverse(); + + let response = create_block_response(resp_blocks.clone()); + + let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + assert!( + matches!( + res, + OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST + ), + ); + + best_block_num += MAX_BLOCKS_TO_REQUEST as u32; + + let _ = sync.on_blocks_processed( + MAX_BLOCKS_TO_REQUEST as usize, + MAX_BLOCKS_TO_REQUEST as usize, + resp_blocks.iter() + .rev() + .map(|b| + ( + Ok( + BlockImportResult::ImportedUnknown( + b.header().number().clone(), + Default::default(), + Some(peer_id1.clone()), + ) + ), + b.hash(), + ) + ) + .collect() + ); + + resp_blocks.into_iter() + .rev() + .for_each(|b| client.import(BlockOrigin::Own, b).unwrap()); + } + + // Request the tip get_block_request( &mut sync, - FromBlock::Number(*common_block.header().number() as u64), + FromBlock::Hash(fork_blocks.last().unwrap().hash()), 1, &peer_id1, ); From 5e87ed15d7eda99ee72d9eedd1e9512f56cbe2fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 18 Dec 2020 01:05:32 +0100 Subject: [PATCH 14/15] Rename test --- client/network/test/src/sync.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index f0b140d6711e5..e04ef060f08c0 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -812,8 +812,10 @@ fn sync_to_tip_requires_that_sync_protocol_is_informed_about_best_block() { assert!(!net.peer(1).has_block(&block_hash)); } +/// Ensures that if we as a syncing node sync to the tip while we are connected to another peer +/// that is currently also doing a major sync. #[test] -fn make_sure_we_sync_to_tip_without_informing_sync_about_new_best_block() { +fn sync_to_tip_when_we_sync_together_with_multiple_peers() { sp_tracing::try_init_simple(); let mut net = TestNet::new(3); From 0b4730f6132d25b674f9d286722b4131206e120d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 18 Dec 2020 11:19:57 +0100 Subject: [PATCH 15/15] Update client/network/src/protocol.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andrรฉ Silva <123550+andresilva@users.noreply.github.com> --- client/network/src/protocol.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 5a64d1e12b6b3..1a67aec57abb2 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -546,7 +546,7 @@ impl Protocol { /// Inform sync about new best imported block. pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor) { - trace!(target: "sync", "New best block imported {:?}/{}", hash, number); + trace!(target: "sync", "New best block imported {:?}/#{}", hash, number); self.sync.update_chain_info(&hash, number);