From c13dd3b2bf3f7cebb4de884fe8e4dea4e542e858 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Thu, 1 Apr 2021 00:02:31 +0300 Subject: [PATCH] Fixed sync skipping some block announcements (#8459) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixed sync missing some block announcements * Apply suggestions from code review Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> --- client/network/src/protocol/message.rs | 20 +++++++ client/network/src/protocol/sync.rs | 67 ++++++++++++---------- client/network/test/src/lib.rs | 11 +++- client/network/test/src/sync.rs | 77 ++++++++++++++++++++++++++ 4 files changed, 144 insertions(+), 31 deletions(-) diff --git a/client/network/src/protocol/message.rs b/client/network/src/protocol/message.rs index 01e9a5d7215af..7564804400fb3 100644 --- a/client/network/src/protocol/message.rs +++ b/client/network/src/protocol/message.rs @@ -144,6 +144,26 @@ pub struct RemoteReadResponse { pub proof: StorageProof, } +/// Announcement summary used for debug logging. +#[derive(Debug)] +pub struct AnnouncementSummary { + block_hash: H::Hash, + number: H::Number, + parent_hash: H::Hash, + state: Option, +} + +impl generic::BlockAnnounce { + pub fn summary(&self) -> AnnouncementSummary { + AnnouncementSummary { + block_hash: self.header.hash(), + number: *self.header.number(), + parent_hash: self.header.parent_hash().clone(), + state: self.state, + } + } +} + /// Generic types. pub mod generic { use bitflags::bitflags; diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 37f9a451b67d3..ee7943ba64697 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -505,9 +505,10 @@ impl ChainSync { } } - /// Number of active sync requests. + /// Number of active forks requests. This includes + /// requests that are pending or could be issued right away. pub fn num_sync_requests(&self) -> usize { - self.fork_targets.len() + self.fork_targets.values().filter(|f| f.number <= self.best_queued_number).count() } /// Number of downloaded blocks. @@ -1421,23 +1422,36 @@ impl ChainSync { &mut self, pre_validation_result: PreValidateBlockAnnounce, ) -> PollBlockAnnounceValidation { - trace!( - target: "sync", - "Finished block announce validation: {:?}", - pre_validation_result, - ); - let (announce, is_best, who) = match pre_validation_result { PreValidateBlockAnnounce::Failure { who, disconnect } => { + debug!( + target: "sync", + "Failed announce validation: {:?}, disconnect: {}", + who, + disconnect, + ); return PollBlockAnnounceValidation::Failure { who, disconnect } }, PreValidateBlockAnnounce::Process { announce, is_new_best, who } => { (announce, is_new_best, who) }, - PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip => - return PollBlockAnnounceValidation::Skip, + PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip => { + debug!( + target: "sync", + "Ignored announce validation", + ); + return PollBlockAnnounceValidation::Skip + }, }; + trace!( + target: "sync", + "Finished block announce validation: from {:?}: {:?}. local_best={}", + who, + announce.summary(), + is_best, + ); + let number = *announce.header.number(); let hash = announce.header.hash(); let parent_status = self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown); @@ -1508,25 +1522,22 @@ impl ChainSync { return PollBlockAnnounceValidation::ImportHeader { is_best, announce, who } } - if number <= self.best_queued_number { - trace!( - target: "sync", - "Added sync target for block announced from {}: {} {:?}", - who, - hash, - announce.header, - ); - self.fork_targets - .entry(hash.clone()) - .or_insert_with(|| ForkTarget { - number, - parent_hash: Some(*announce.header.parent_hash()), - peers: Default::default(), - }) - .peers.insert(who.clone()); - } + trace!( + target: "sync", + "Added sync target for block announced from {}: {} {:?}", + who, + hash, + announce.summary(), + ); + self.fork_targets + .entry(hash.clone()) + .or_insert_with(|| ForkTarget { + number, + parent_hash: Some(*announce.header.parent_hash()), + peers: Default::default(), + }) + .peers.insert(who.clone()); - trace!(target: "sync", "Announce validation result is nothing"); PollBlockAnnounceValidation::Nothing { is_best, who, announce } } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 1c237f94700c3..e37c1cad19c81 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -449,6 +449,11 @@ impl Peer { &self.network.service() } + /// Get a reference to the network worker. + pub fn network(&self) -> &NetworkWorker::Hash> { + &self.network + } + /// Test helper to compare the blockchain state of multiple (networked) /// clients. pub fn blockchain_canon_equals(&self, other: &Self) -> bool { @@ -958,12 +963,12 @@ pub trait TestNetFactory: Sized { /// Polls the testnet. Processes all the pending actions. fn poll(&mut self, cx: &mut FutureContext) { self.mut_peers(|peers| { - for peer in peers { - trace!(target: "sync", "-- Polling {}", peer.id()); + for (i, peer) in peers.into_iter().enumerate() { + trace!(target: "sync", "-- Polling {}: {}", i, peer.id()); if let Poll::Ready(()) = peer.network.poll_unpin(cx) { panic!("NetworkWorker has terminated unexpectedly.") } - trace!(target: "sync", "-- Polling complete {}", peer.id()); + trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id()); // We poll `imported_blocks_stream`. while let Poll::Ready(Some(notification)) = peer.imported_blocks_stream.as_mut().poll_next(cx) { diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 953639dcc0e22..553a769ec14a4 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -740,6 +740,27 @@ impl BlockAnnounceValidator for NewBestBlockAnnounceValidator { } } +/// Returns `Validation::Failure` for specified block number +struct FailingBlockAnnounceValidator(u64); + +impl BlockAnnounceValidator for FailingBlockAnnounceValidator { + fn validate( + &mut self, + header: &Header, + _: &[u8], + ) -> Pin>> + Send>> { + let number = *header.number(); + let target_number = self.0; + async move { Ok( + if number == target_number { + Validation::Failure { disconnect: false } + } else { + Validation::Success { is_new_best: true } + } + ) }.boxed() + } +} + #[test] fn sync_blocks_when_block_announce_validator_says_it_is_new_best() { sp_tracing::try_init_simple(); @@ -1010,3 +1031,59 @@ fn multiple_requests_are_accepted_as_long_as_they_are_not_fulfilled() { Poll::Ready(()) })); } + +#[test] +fn syncs_all_forks_from_single_peer() { + sp_tracing::try_init_simple(); + let mut net = TestNet::new(2); + net.peer(0).push_blocks(10, false); + net.peer(1).push_blocks(10, false); + + // poll until the two nodes connect, otherwise announcing the block will not work + net.block_until_connected(); + + // Peer 0 produces new blocks and announces. + let branch1 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, true); + + // Wait till peer 1 starts downloading + block_on(futures::future::poll_fn::<(), _>(|cx| { + net.poll(cx); + if net.peer(1).network().best_seen_block() != Some(12) { + return Poll::Pending + } + Poll::Ready(()) + })); + + // Peer 0 produces and announces another fork + let branch2 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, false); + + net.block_until_sync(); + + // Peer 1 should have both branches, + assert!(net.peer(1).client().header(&BlockId::Hash(branch1)).unwrap().is_some()); + assert!(net.peer(1).client().header(&BlockId::Hash(branch2)).unwrap().is_some()); +} + +#[test] +fn syncs_after_missing_announcement() { + sp_tracing::try_init_simple(); + let mut net = TestNet::new(0); + net.add_full_peer_with_config(Default::default()); + // Set peer 1 to ignore announcement + net.add_full_peer_with_config(FullPeerConfig { + block_announce_validator: Some(Box::new(FailingBlockAnnounceValidator(11))), + ..Default::default() + }); + net.peer(0).push_blocks(10, false); + net.peer(1).push_blocks(10, false); + + net.block_until_connected(); + + // Peer 0 produces a new block and announces. Peer 1 ignores announcement. + net.peer(0).push_blocks_at(BlockId::Number(10), 1, false); + // Peer 0 produces another block and announces. + let final_block = net.peer(0).push_blocks_at(BlockId::Number(11), 1, false); + net.peer(1).push_blocks_at(BlockId::Number(10), 1, true); + net.block_until_sync(); + assert!(net.peer(1).client().header(&BlockId::Hash(final_block)).unwrap().is_some()); +}