Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Drain blocks on peer disconnect #8553

Merged
merged 4 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,9 @@ impl<B: BlockT> Protocol<B> {
}

if let Some(_peer_data) = self.peers.remove(&peer) {
self.sync.peer_disconnected(&peer);
if let Some(sync::OnBlockData::Import(origin, blocks)) = self.sync.peer_disconnected(&peer) {
self.pending_messages.push_back(CustomMessageOutcome::BlockImport(origin, blocks));
}
Ok(())
} else {
Err(())
Expand Down
54 changes: 47 additions & 7 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,10 +738,19 @@ impl<B: BlockT> ChainSync<B> {
// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from the
// common number, the peer best number is higher than our best queued and the common
// number is smaller than the last finalized block number, we should do an ancestor
// search to find a better common block.
// search to find a better common block. If the queue is full we wait till all blocks are
// imported though.
if best_queued.saturating_sub(peer.common_number) > MAX_BLOCKS_TO_LOOK_BACKWARDS.into()
&& best_queued < peer.best_number && peer.common_number < last_finalized
&& queue.len() <= MAJOR_SYNC_BLOCKS.into()
{
trace!(
target: "sync",
"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
id,
peer.common_number,
best_queued,
);
let current = std::cmp::min(peer.best_number, best_queued);
peer.state = PeerSyncState::AncestorSearch {
current,
Expand Down Expand Up @@ -804,7 +813,7 @@ impl<B: BlockT> ChainSync<B> {
response: BlockResponse<B>
) -> Result<OnBlockData<B>, BadPeer> {
self.downloaded_blocks += response.blocks.len();
let mut new_blocks: Vec<IncomingBlock<B>> =
let new_blocks: Vec<IncomingBlock<B>> =
if let Some(peer) = self.peers.get_mut(who) {
let mut blocks = response.blocks;
if request.as_ref().map_or(false, |r| r.direction == message::Direction::Descending) {
Expand Down Expand Up @@ -970,6 +979,13 @@ impl<B: BlockT> ChainSync<B> {
return Err(BadPeer(who.clone(), rep::NOT_REQUESTED));
};

Ok(self.validate_and_queue_blocks(new_blocks))
}

fn validate_and_queue_blocks(
&mut self,
mut new_blocks: Vec<IncomingBlock<B>>,
) -> OnBlockData<B> {
let orig_len = new_blocks.len();
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
if new_blocks.len() != orig_len {
Expand All @@ -992,10 +1008,8 @@ impl<B: BlockT> ChainSync<B> {
);
self.on_block_queued(h, n)
}

self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));

Ok(OnBlockData::Import(origin, new_blocks))
OnBlockData::Import(origin, new_blocks)
}

/// Handle a response from the remote to a justification request that we made.
Expand Down Expand Up @@ -1353,7 +1367,7 @@ impl<B: BlockT> ChainSync<B> {
PreValidateBlockAnnounce::Failure { who, disconnect }
}
Err(e) => {
error!(
debug!(
target: "sync",
"💔 Block announcement validation of block {:?} errored: {}",
hash,
Expand Down Expand Up @@ -1542,11 +1556,34 @@ impl<B: BlockT> ChainSync<B> {
}

/// Call when a peer has disconnected.
pub fn peer_disconnected(&mut self, who: &PeerId) {
/// Canceled obsolete block request may result in some blocks being ready for
/// import, so this functions checks for such blocks and returns them.
pub fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<B>> {
self.blocks.clear_peer_download(who);
self.peers.remove(who);
self.extra_justifications.peer_disconnected(who);
self.pending_requests.set_all();
let blocks: Vec<_> = self.blocks
.drain(self.best_queued_number + One::one())
.into_iter()
.map(|block_data| {
let justifications =
legacy_justification_mapping(block_data.block.justification);
IncomingBlock {
hash: block_data.block.hash,
header: block_data.block.header,
body: block_data.block.body,
justifications,
origin: block_data.origin,
allow_missing_state: true,
import_existing: false,
}
}).collect();
if !blocks.is_empty() {
Some(self.validate_and_queue_blocks(blocks))
} else {
None
}
}

/// Restart the sync process. This will reset all pending block requests and return an iterator
Expand Down Expand Up @@ -2349,6 +2386,9 @@ mod test {
.for_each(|b| block_on(client.import_as_final(BlockOrigin::Own, b)).unwrap());
}

// "Wait" for the queue to clear
sync.queue_blocks.clear();

// Let peer2 announce that it finished syncing
send_block_announce(best_block.header().clone(), &peer_id2, &mut sync);

Expand Down