diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index bbb87b5255d10..1ae1d48cba3c5 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -526,7 +526,9 @@ impl Protocol { } if let Some(_peer_data) = self.peers.remove(&peer) { - self.sync.peer_disconnected(&peer); + if let Some(sync::OnBlockData::Import(origin, blocks)) = self.sync.peer_disconnected(&peer) { + self.pending_messages.push_back(CustomMessageOutcome::BlockImport(origin, blocks)); + } Ok(()) } else { Err(()) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index dd682bf348b03..6e07bd4c96971 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -738,10 +738,19 @@ impl ChainSync { // If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from the // common number, the peer best number is higher than our best queued and the common // number is smaller than the last finalized block number, we should do an ancestor - // search to find a better common block. + // search to find a better common block. If the queue is full we wait till all blocks are + // imported though. if best_queued.saturating_sub(peer.common_number) > MAX_BLOCKS_TO_LOOK_BACKWARDS.into() && best_queued < peer.best_number && peer.common_number < last_finalized + && queue.len() <= MAJOR_SYNC_BLOCKS.into() { + trace!( + target: "sync", + "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.", + id, + peer.common_number, + best_queued, + ); let current = std::cmp::min(peer.best_number, best_queued); peer.state = PeerSyncState::AncestorSearch { current, @@ -804,7 +813,7 @@ impl ChainSync { response: BlockResponse ) -> Result, BadPeer> { self.downloaded_blocks += response.blocks.len(); - let mut new_blocks: Vec> = + let new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(who) { let mut blocks = response.blocks; if request.as_ref().map_or(false, |r| r.direction == message::Direction::Descending) { @@ -970,6 +979,13 @@ impl ChainSync { return Err(BadPeer(who.clone(), rep::NOT_REQUESTED)); }; + Ok(self.validate_and_queue_blocks(new_blocks)) + } + + fn validate_and_queue_blocks( + &mut self, + mut new_blocks: Vec>, + ) -> OnBlockData { let orig_len = new_blocks.len(); new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash)); if new_blocks.len() != orig_len { @@ -992,10 +1008,8 @@ impl ChainSync { ); self.on_block_queued(h, n) } - self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash)); - - Ok(OnBlockData::Import(origin, new_blocks)) + OnBlockData::Import(origin, new_blocks) } /// Handle a response from the remote to a justification request that we made. @@ -1353,7 +1367,7 @@ impl ChainSync { PreValidateBlockAnnounce::Failure { who, disconnect } } Err(e) => { - error!( + debug!( target: "sync", "💔 Block announcement validation of block {:?} errored: {}", hash, @@ -1542,11 +1556,34 @@ impl ChainSync { } /// Call when a peer has disconnected. - pub fn peer_disconnected(&mut self, who: &PeerId) { + /// Canceled obsolete block request may result in some blocks being ready for + /// import, so this functions checks for such blocks and returns them. + pub fn peer_disconnected(&mut self, who: &PeerId) -> Option> { self.blocks.clear_peer_download(who); self.peers.remove(who); self.extra_justifications.peer_disconnected(who); self.pending_requests.set_all(); + let blocks: Vec<_> = self.blocks + .drain(self.best_queued_number + One::one()) + .into_iter() + .map(|block_data| { + let justifications = + legacy_justification_mapping(block_data.block.justification); + IncomingBlock { + hash: block_data.block.hash, + header: block_data.block.header, + body: block_data.block.body, + justifications, + origin: block_data.origin, + allow_missing_state: true, + import_existing: false, + } + }).collect(); + if !blocks.is_empty() { + Some(self.validate_and_queue_blocks(blocks)) + } else { + None + } } /// Restart the sync process. This will reset all pending block requests and return an iterator @@ -2349,6 +2386,9 @@ mod test { .for_each(|b| block_on(client.import_as_final(BlockOrigin::Own, b)).unwrap()); } + // "Wait" for the queue to clear + sync.queue_blocks.clear(); + // Let peer2 announce that it finished syncing send_block_announce(best_block.header().clone(), &peer_id2, &mut sync);