Skip to content

Commit

Permalink
fix(download): body download range (paradigmxyz#1065)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk authored and literallymarvellous committed Feb 5, 2023
1 parent 0cff916 commit 3b54dc9
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
63 changes: 31 additions & 32 deletions crates/net/downloaders/src/bodies/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ where
};

let limit = self.download_range.end.saturating_sub(start_at).min(self.request_limit);
if limit == 0 {
return Ok(None)
}

self.query_headers(start_at..self.download_range.end, limit)
}

Expand All @@ -107,7 +103,7 @@ where
range: Range<BlockNumber>,
max_non_empty: u64,
) -> DownloadResult<Option<Vec<SealedHeader>>> {
if range.start >= self.download_range.end {
if range.is_empty() || max_non_empty == 0 {
return Ok(None)
}

Expand All @@ -128,15 +124,15 @@ where
// 1. Current block number is in range
// 2. The number of non empty headers is less than maximum
// 3. The total number of headers is less than the stream batch size
while current_block_num < range.end &&
while range.contains(&current_block_num) &&
non_empty_headers < max_non_empty &&
headers.len() < self.stream_batch_size
{
// Find the block hash
// Find the block hash.
let (number, hash) = canonical_cursor
.seek_exact(current_block_num)?
.ok_or(DownloadError::MissingHeader { block_number: current_block_num })?;
// Find the block number
// Find the block header.
let (_, header) = header_cursor
.seek_exact((number, hash).into())?
.ok_or(DownloadError::MissingHeader { block_number: number })?;
Expand Down Expand Up @@ -281,7 +277,8 @@ where
(Some(first), Some(last)) => first.block_number()..last.block_number() + 1,
_ => Range::default(),
};
let mut request_range = Range::default();

let mut requests = Vec::<RangeInclusive<BlockNumber>>::default();
for num in range.start..=latest_queued {
// Check if block has been downloaded or is currently in progress
if queued_bodies_range.contains(&num) ||
Expand All @@ -291,29 +288,31 @@ where
continue
}

if range.is_empty() {
request_range.start = num;
} else if request_range.end + 1 == num {
request_range.end = num;
} else {
let headers = self
.query_headers(
request_range.start..request_range.end + 1, // exclusive
request_range.clone().count() as u64,
)?
.ok_or(DownloadError::MissingHeader {
block_number: request_range.start,
})?;

// Dispatch contiguous request.
self.in_progress_queue.push_new_request(
Arc::clone(&self.client),
Arc::clone(&self.consensus),
headers,
);
// Clear the current request range
request_range = Range::default();
}
match requests.last().map(|range| *range.end()) {
// Extend the last range if contiguous
Some(range_end) if range_end + 1 == num => {
let range = requests.pop().unwrap();
requests.push(*range.start()..=num);
}
// Push the new request range
Some(_) | None => requests.push(num..=num),
};
}

for range in requests {
let headers = self
.query_headers(
*range.start()..*range.end() + 1, //
range.clone().count() as u64,
)?
.ok_or(DownloadError::MissingHeader { block_number: *range.start() })?;

// Dispatch contiguous request.
self.in_progress_queue.push_new_request(
Arc::clone(&self.client),
Arc::clone(&self.consensus),
headers,
);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl StageError {
matches!(
self,
StageError::Database(_) |
StageError::Download(_) |
StageError::DatabaseIntegrity(_) |
StageError::StageProgress(_) |
StageError::Fatal(_)
Expand Down

0 comments on commit 3b54dc9

Please sign in to comment.