Skip to content

Commit

Permalink
Fix ordering of the diff queue
Browse files Browse the repository at this point in the history
When diffs are not needed, append the blocks to the futures ordered.
  • Loading branch information
ii-cruz authored and hrxi committed Sep 19, 2024
1 parent 8352415 commit d076dd2
Showing 1 changed file with 62 additions and 70 deletions.
132 changes: 62 additions & 70 deletions consensus/src/sync/live/diff_queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::HashSet,
future::Future,
future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand Down Expand Up @@ -65,53 +65,62 @@ pub enum QueuedDiff<N: Network> {
PeerIncompleteState(N::PeerId),
}

async fn augment_block<N, F, R>(block: QueuedBlock<N>, mut get_diff: F) -> Result<QueuedDiff<N>, ()>
where
N: Network,
F: FnMut(&BlockAndSource<N>) -> R,
R: Future<Output = Result<TrieDiff, ()>>,
{
async fn get_multiple_diffs<N, F, R>(
blocks: &[BlockAndSource<N>],
mut get_diff: F,
) -> Result<Vec<TrieDiff>, ()>
where
N: Network,
F: FnMut(&BlockAndSource<N>) -> R,
R: Future<Output = Result<TrieDiff, ()>>,
{
// This is just a fancy way to collect all the diffs simultaneously.
let diffs: FuturesUnordered<_> = blocks
.iter()
.enumerate()
.map(|(i, block)| {
// Get each diff.
let diff = get_diff(block);
async move {
// Annotate it with its index.
diff.await.map(|d| (i, d))
}
})
.collect();
impl<N: Network> QueuedDiff<N> {
fn from_block_no_diff(block: QueuedBlock<N>) -> Self {
match block {
QueuedBlock::Head(block) => QueuedDiff::Head(block, None),
QueuedBlock::Buffered(blocks) => {
QueuedDiff::Buffered(blocks.into_iter().map(|block| (block, None)).collect())
}
QueuedBlock::Missing(blocks) => {
QueuedDiff::Missing(blocks.into_iter().map(|block| (block, None)).collect())
}
QueuedBlock::TooFarAhead(peer_id) => QueuedDiff::TooFarAhead(peer_id),
QueuedBlock::TooFarBehind(peer_id) => QueuedDiff::TooFarBehind(peer_id),
}
}
}

// Collect all diffs, returning an error if any failed.
let mut diffs = diffs.try_collect::<Vec<_>>().await?;
async fn get_multiple_diffs<N: Network>(
blocks: &[BlockAndSource<N>],
mut get_diff: impl FnMut(&BlockAndSource<N>) -> BoxFuture<'static, Result<TrieDiff, ()>>,
) -> Result<Vec<TrieDiff>, ()> {
// This is just a fancy way to collect all the diffs simultaneously.
let diffs: FuturesUnordered<_> = blocks
.iter()
.enumerate()
.map(|(i, block)| {
// Get each diff.
let diff = get_diff(block);
async move {
// Annotate it with its index.
diff.await.map(|d| (i, d))
}
})
.collect();

// Sort the diffs by index again.
diffs.sort_unstable_by_key(|&(i, _)| i);
assert_eq!(blocks.len(), diffs.len());
// Collect all diffs, returning an error if any failed.
let mut diffs = diffs.try_collect::<Vec<_>>().await?;

// Strip index before returning.
Ok(diffs.into_iter().map(|(_, diff)| diff).collect())
}
// Sort the diffs by index again.
diffs.sort_unstable_by_key(|&(i, _)| i);
assert_eq!(blocks.len(), diffs.len());

// Strip index before returning.
Ok(diffs.into_iter().map(|(_, diff)| diff).collect())
}

async fn augment_block<N: Network>(
block: QueuedBlock<N>,
mut get_diff: impl FnMut(&BlockAndSource<N>) -> BoxFuture<'static, Result<TrieDiff, ()>>,
) -> Result<QueuedDiff<N>, ()> {
Ok(match block {
QueuedBlock::Head(block) => {
let diff = get_diff(&block).await?;
QueuedDiff::Head(block, Some(diff))
}
QueuedBlock::Buffered(blocks) => {
let diffs = get_multiple_diffs::<N, F, R>(&blocks[..], get_diff).await?;
let diffs = get_multiple_diffs::<N>(&blocks[..], get_diff).await?;
QueuedDiff::Buffered(
blocks
.into_iter()
Expand All @@ -120,7 +129,7 @@ where
)
}
QueuedBlock::Missing(blocks) => {
let diffs = get_multiple_diffs::<N, F, R>(&blocks[..], get_diff).await?;
let diffs = get_multiple_diffs::<N>(&blocks[..], get_diff).await?;
QueuedDiff::Missing(
blocks
.into_iter()
Expand Down Expand Up @@ -213,49 +222,32 @@ impl<N: Network> Stream for DiffQueue<N> {
type Item = QueuedDiff<N>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut block_queue_done = false;

// Receive blocks from BlockQueue.
loop {
match self.block_queue.poll_next_unpin(cx) {
Poll::Ready(Some(block)) => {
if !self.diff_needed {
return Poll::Ready(Some(match block {
QueuedBlock::Head(block) => QueuedDiff::Head(block, None),
QueuedBlock::Buffered(blocks) => QueuedDiff::Buffered(
blocks.into_iter().map(|block| (block, None)).collect(),
),
QueuedBlock::Missing(blocks) => QueuedDiff::Missing(
blocks.into_iter().map(|block| (block, None)).collect(),
),
QueuedBlock::TooFarAhead(peer_id) => QueuedDiff::TooFarAhead(peer_id),
QueuedBlock::TooFarBehind(peer_id) => QueuedDiff::TooFarBehind(peer_id),
}));
}

while let Poll::Ready(block) = self.block_queue.poll_next_unpin(cx) {
match (block, self.diff_needed) {
(Some(block), true) => {
let get_diff = self.diff_request_component.request_diff();
self.diffs
.push_back(Box::pin(augment_block(block, get_diff)));
}
Poll::Ready(None) => {
block_queue_done = true;
break;
(Some(block), false) => {
self.diffs.push_back(Box::pin(future::ready(Ok(
QueuedDiff::from_block_no_diff(block),
))));
}
Poll::Pending => break,
// The block queue only ends when something bad happens.
// Thus we immediately quit and do not wait for any pending diffs.
(None, ..) => return Poll::Ready(None),
}
}

// Check for blocks augmented with diffs.
loop {
match self.diffs.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(diff))) => return Poll::Ready(Some(diff)),
Poll::Ready(Some(Err(()))) => {
while let Poll::Ready(Some(diff)) = self.diffs.poll_next_unpin(cx) {
match diff {
Ok(diff) => return Poll::Ready(Some(diff)),
Err(()) => {
error!("couldn't fetch diff");
continue;
}
Poll::Ready(None) if block_queue_done => return Poll::Ready(None),
Poll::Ready(None) => break,
Poll::Pending => break,
}
}

Expand Down

0 comments on commit d076dd2

Please sign in to comment.