Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Avoid a duplicate block request when syncing from a fork #11094

Merged
merged 11 commits into from
Jun 24, 2022
10 changes: 10 additions & 0 deletions client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ pub enum BlockImportStatus<N: std::fmt::Debug + PartialEq> {
ImportedUnknown(N, ImportedAux, Option<Origin>),
}

impl<N: std::fmt::Debug + PartialEq> BlockImportStatus<N> {
/// Returns the imported block number.
pub fn number(&self) -> &N {
match self {
BlockImportStatus::ImportedKnown(n, _) |
BlockImportStatus::ImportedUnknown(n, _, _) => n,
}
}
}

/// Block import error.
#[derive(Debug, thiserror::Error)]
pub enum BlockImportError {
Expand Down
94 changes: 73 additions & 21 deletions client/network/sync/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ pub struct BlockData<B: BlockT> {
enum BlockRangeState<B: BlockT> {
Downloading { len: NumberFor<B>, downloading: u32 },
Complete(Vec<BlockData<B>>),
Queued { len: NumberFor<B> },
}

impl<B: BlockT> BlockRangeState<B> {
pub fn len(&self) -> NumberFor<B> {
match *self {
Self::Downloading { len, .. } => len,
Self::Complete(ref blocks) => (blocks.len() as u32).into(),
Self::Queued { len } => len,
}
}
}
Expand Down Expand Up @@ -170,29 +172,51 @@ impl<B: BlockT> BlockCollection<B> {
}

/// Get a valid chain of blocks ordered in descending order and ready for importing into
/// blockchain.
pub fn drain(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut drained = Vec::new();
let mut ranges = Vec::new();
/// the blockchain.
pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut ready = Vec::new();

let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
BlockRangeState::Complete(blocks) if *start <= prev => {
prev = *start + (blocks.len() as u32).into();
if *start > prev {
break
}
let len = match range_data {
BlockRangeState::Complete(blocks) => {
let len = (blocks.len() as u32).into();
prev = *start + len;
// Remove all elements from `blocks` and add them to `drained`
drained.append(blocks);
ready.append(blocks);
len
},
BlockRangeState::Queued { .. } => continue,
_ => break,
};
*range_data = BlockRangeState::Queued { len };
}

trace!(target: "sync", "{} blocks ready for import", ready.len());
bkchr marked this conversation as resolved.
Show resolved Hide resolved
ready
}

pub fn clear_queued(&mut self, from: NumberFor<B>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation of this function could lead to deleting data in the queue that we don't yet have imported. You always set prev = start + len. If we have data that was downloaded after the last call to ready_blocks, we would also delete this.

I think we should change the clear_queued to take the hash of the first block that was returned by ready_blocks.

Internally we would add a HashMap<Block::Hash, usize> where the second parameter is the number of entries we need to delete in self.blocks. We would insert the value there in ready_blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense. Since we need to know the start number of the range along with the number of entries, instead of a HashMap<Block::Hash, usize> I went with a HashMap<Block::Hash, (NumberFor<Block>, NumberFor<Block>)>.

let mut ranges = Vec::new();

let mut prev = from;
for (start, range_data) in &self.blocks {
match range_data {
BlockRangeState::Queued { len } if *start <= prev => {
prev = *start + *len;
ranges.push(*start);
},
_ => break,
}
}

for r in ranges {
self.blocks.remove(&r);
for r in &ranges {
self.blocks.remove(r);
}
trace!(target: "sync", "Drained {} blocks from {:?}", drained.len(), from);
drained
trace!(target: "sync", "Cleared {} block ranges from {:?}", ranges.len(), from);
}

pub fn clear_peer_download(&mut self, who: &PeerId) {
Expand Down Expand Up @@ -268,14 +292,14 @@ mod test {

bc.clear_peer_download(&peer1);
bc.insert(41, blocks[41..81].to_vec(), peer1.clone());
assert_eq!(bc.drain(1), vec![]);
assert_eq!(bc.ready_blocks(1), vec![]);
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1, 200), Some(121..151));
bc.clear_peer_download(&peer0);
bc.insert(1, blocks[1..11].to_vec(), peer0.clone());

assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1, 200), Some(11..41));
assert_eq!(
bc.drain(1),
bc.ready_blocks(1),
blocks[1..11]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) })
Expand All @@ -285,16 +309,16 @@ mod test {
bc.clear_peer_download(&peer0);
bc.insert(11, blocks[11..41].to_vec(), peer0.clone());

let drained = bc.drain(12);
let ready = bc.ready_blocks(12);
assert_eq!(
drained[..30],
ready[..30],
blocks[11..41]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) })
.collect::<Vec<_>>()[..]
);
assert_eq!(
drained[30..],
ready[30..],
blocks[41..81]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) })
Expand All @@ -308,17 +332,17 @@ mod test {
bc.clear_peer_download(&peer1);
bc.insert(121, blocks[121..150].to_vec(), peer1.clone());

assert_eq!(bc.drain(80), vec![]);
let drained = bc.drain(81);
assert_eq!(bc.ready_blocks(80), vec![]);
let ready = bc.ready_blocks(81);
assert_eq!(
drained[..40],
ready[..40],
blocks[81..121]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer2.clone()) })
.collect::<Vec<_>>()[..]
);
assert_eq!(
drained[40..],
ready[40..],
blocks[121..150]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) })
Expand All @@ -344,4 +368,32 @@ mod test {
Some(100 + 128..100 + 128 + 128)
);
}

#[test]
fn no_duplicate_requests_on_fork() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let peer = PeerId::random();

let blocks = generate_blocks(10);

// count = 5, peer_best = 50, common = 39, max_parallel = 0, max_ahead = 200
assert_eq!(bc.needed_blocks(peer.clone(), 5, 50, 39, 0, 200), Some(40..45));

// got a response on the request for `40..45`
bc.clear_peer_download(&peer);
bc.insert(40, blocks[..5].to_vec(), peer.clone());

// our "node" started on a fork, with its current best = 47, which is > common
let ready = bc.ready_blocks(48);
assert_eq!(
ready,
blocks[..5]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer.clone()) })
.collect::<Vec<_>>()
);

assert_eq!(bc.needed_blocks(peer.clone(), 5, 50, 39, 0, 200), Some(45..50));
}
}
Loading