Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Drain blocks on peer disconnect (#8553)
Browse files Browse the repository at this point in the history
* Drain blocks on peer disconnect

* Finish comment

* Fixed test

* Update client/network/src/protocol/sync.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
  • Loading branch information
arkpar and bkchr authored Apr 7, 2021
1 parent 7bb59a6 commit f296656
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
4 changes: 3 additions & 1 deletion client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,9 @@ impl<B: BlockT> Protocol<B> {
}

if let Some(_peer_data) = self.peers.remove(&peer) {
self.sync.peer_disconnected(&peer);
if let Some(sync::OnBlockData::Import(origin, blocks)) = self.sync.peer_disconnected(&peer) {
self.pending_messages.push_back(CustomMessageOutcome::BlockImport(origin, blocks));
}
Ok(())
} else {
Err(())
Expand Down
54 changes: 47 additions & 7 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,10 +738,19 @@ impl<B: BlockT> ChainSync<B> {
// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from the
// common number, the peer best number is higher than our best queued and the common
// number is smaller than the last finalized block number, we should do an ancestor
// search to find a better common block.
// search to find a better common block. If the queue is full we wait till all blocks are
// imported though.
if best_queued.saturating_sub(peer.common_number) > MAX_BLOCKS_TO_LOOK_BACKWARDS.into()
&& best_queued < peer.best_number && peer.common_number < last_finalized
&& queue.len() <= MAJOR_SYNC_BLOCKS.into()
{
trace!(
target: "sync",
"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
id,
peer.common_number,
best_queued,
);
let current = std::cmp::min(peer.best_number, best_queued);
peer.state = PeerSyncState::AncestorSearch {
current,
Expand Down Expand Up @@ -804,7 +813,7 @@ impl<B: BlockT> ChainSync<B> {
response: BlockResponse<B>
) -> Result<OnBlockData<B>, BadPeer> {
self.downloaded_blocks += response.blocks.len();
let mut new_blocks: Vec<IncomingBlock<B>> =
let new_blocks: Vec<IncomingBlock<B>> =
if let Some(peer) = self.peers.get_mut(who) {
let mut blocks = response.blocks;
if request.as_ref().map_or(false, |r| r.direction == message::Direction::Descending) {
Expand Down Expand Up @@ -970,6 +979,13 @@ impl<B: BlockT> ChainSync<B> {
return Err(BadPeer(who.clone(), rep::NOT_REQUESTED));
};

Ok(self.validate_and_queue_blocks(new_blocks))
}

fn validate_and_queue_blocks(
&mut self,
mut new_blocks: Vec<IncomingBlock<B>>,
) -> OnBlockData<B> {
let orig_len = new_blocks.len();
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
if new_blocks.len() != orig_len {
Expand All @@ -992,10 +1008,8 @@ impl<B: BlockT> ChainSync<B> {
);
self.on_block_queued(h, n)
}

self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));

Ok(OnBlockData::Import(origin, new_blocks))
OnBlockData::Import(origin, new_blocks)
}

/// Handle a response from the remote to a justification request that we made.
Expand Down Expand Up @@ -1353,7 +1367,7 @@ impl<B: BlockT> ChainSync<B> {
PreValidateBlockAnnounce::Failure { who, disconnect }
}
Err(e) => {
error!(
debug!(
target: "sync",
"💔 Block announcement validation of block {:?} errored: {}",
hash,
Expand Down Expand Up @@ -1542,11 +1556,34 @@ impl<B: BlockT> ChainSync<B> {
}

/// Call when a peer has disconnected.
pub fn peer_disconnected(&mut self, who: &PeerId) {
/// Canceled obsolete block request may result in some blocks being ready for
/// import, so this functions checks for such blocks and returns them.
pub fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<B>> {
self.blocks.clear_peer_download(who);
self.peers.remove(who);
self.extra_justifications.peer_disconnected(who);
self.pending_requests.set_all();
let blocks: Vec<_> = self.blocks
.drain(self.best_queued_number + One::one())
.into_iter()
.map(|block_data| {
let justifications =
legacy_justification_mapping(block_data.block.justification);
IncomingBlock {
hash: block_data.block.hash,
header: block_data.block.header,
body: block_data.block.body,
justifications,
origin: block_data.origin,
allow_missing_state: true,
import_existing: false,
}
}).collect();
if !blocks.is_empty() {
Some(self.validate_and_queue_blocks(blocks))
} else {
None
}
}

/// Restart the sync process. This will reset all pending block requests and return an iterator
Expand Down Expand Up @@ -2349,6 +2386,9 @@ mod test {
.for_each(|b| block_on(client.import_as_final(BlockOrigin::Own, b)).unwrap());
}

// "Wait" for the queue to clear
sync.queue_blocks.clear();

// Let peer2 announce that it finished syncing
send_block_announce(best_block.header().clone(), &peer_id2, &mut sync);

Expand Down

0 comments on commit f296656

Please sign in to comment.