Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(provider): use canonical_chain on range lookups #11332

Merged
merged 6 commits into from
Oct 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 137 additions & 74 deletions crates/storage/provider/src/providers/blockchain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,47 +122,108 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
(start, end)
}

/// Fetches a range of data from both in-memory state and storage.
/// Fetches a range of data from both in-memory state and storage while a predicate is met.
///
/// - `fetch_db_range`: Retrieves a range of items from the database.
/// - `map_block_state_item`: Maps a block number to an item in memory. Stops fetching if `None`
/// is returned.
fn fetch_db_mem_range<T, F, G, P>(
/// Creates a snapshot of the in-memory chain state and database provider to prevent
/// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
/// recent in-memory blocks in case of overlaps.
///
/// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
/// user to retrieve the required items from the database using [`RangeInclusive`].
/// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
/// state, allowing for selection or filtering for the desired data.
fn fetch_db_mem_range_while<T, F, G, P>(
&self,
range: impl RangeBounds<BlockNumber>,
fetch_db_range: F,
map_block_state_item: G,
mut predicate: P,
) -> ProviderResult<Vec<T>>
where
F: FnOnce(RangeInclusive<BlockNumber>, &mut P) -> ProviderResult<Vec<T>>,
G: Fn(BlockNumber, &mut P) -> Option<T>,
F: FnOnce(
&DatabaseProviderRO<N::DB, N::ChainSpec>,
RangeInclusive<BlockNumber>,
&mut P,
) -> ProviderResult<Vec<T>>,
G: Fn(Arc<BlockState>, &mut P) -> Option<T>,
P: FnMut(&T) -> bool,
{
// Each one provides a snapshot at the time of instantiation, but its order matters.
//
// If we acquire first the database provider, it's possible that before the in-memory chain
// snapshot is instantiated, it will flush blocks to disk. This would
// mean that our database provider would not have access to the flushed blocks (since it's
// working under an older view), while the in-memory state may have deleted them
// entirely. Resulting in gaps on the range.
let mut in_memory_chain =
self.canonical_in_memory_state.canonical_chain().collect::<Vec<_>>();
let db_provider = self.database_provider_ro()?;

let (start, end) = self.convert_range_bounds(range, || {
self.canonical_in_memory_state.get_canonical_block_number()
// the first block is the highest one.
in_memory_chain
.first()
.map(|b| b.number())
.unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
});
let mut range = start..=end;
let mut items = Vec::with_capacity((end - start + 1) as usize);

// First, fetch the items from the database
let mut db_items = fetch_db_range(range.clone(), &mut predicate)?;
// Split range into storage_range and in-memory range. If the in-memory range is not
// necessary drop it early.
//
// The last block of `in_memory_chain` is the lowest block number.
let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
Some(lowest_memory_block) if lowest_memory_block <= end => {
let highest_memory_block =
in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");

// Database will for a time overlap with in-memory-chain blocks. In
// case of a re-org, it can mean that the database blocks are of a forked chain, and
// so, we should prioritize the in-memory overlapped blocks.
let in_memory_range =
lowest_memory_block.max(start)..=end.min(highest_memory_block);

// If requested range is in the middle of the in-memory range, remove the necessary
// lowest blocks
in_memory_chain.truncate(
in_memory_chain
.len()
.saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
);

let storage_range =
(lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);

(Some((in_memory_chain, in_memory_range)), storage_range)
}
_ => {
// Drop the in-memory chain so we don't hold blocks in memory.
drop(in_memory_chain);

(None, Some(start..=end))
}
};

if !db_items.is_empty() {
let mut items = Vec::with_capacity((end - start + 1) as usize);

if let Some(storage_range) = storage_range {
let mut db_items = fetch_db_range(&db_provider, storage_range.clone(), &mut predicate)?;
items.append(&mut db_items);

// Advance the range iterator by the number of items fetched from the database
range.nth(items.len() - 1);
// The predicate was not met, if the number of items differs from the expected. So, we
// return what we have.
if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
return Ok(items)
}
}

// Fetch the remaining items from the in-memory state
for num in range {
// TODO: there might be an update between loop iterations, we
// need to handle that situation.
if let Some(item) = map_block_state_item(num, &mut predicate) {
items.push(item);
} else {
break;
if let Some((in_memory_chain, in_memory_range)) = in_memory {
for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
debug_assert!(num == block.number());
if let Some(item) = map_block_state_item(block, &mut predicate) {
items.push(item);
} else {
break
}
}
}

Expand Down Expand Up @@ -327,14 +388,10 @@ impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider2<N> {
}

fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> ProviderResult<Vec<Header>> {
self.fetch_db_mem_range(
self.fetch_db_mem_range_while(
range,
|range, _| self.database.headers_range(range),
|num, _| {
self.canonical_in_memory_state
.state_by_number(num)
.map(|block_state| block_state.block().block().header.header().clone())
},
|db_provider, range, _| db_provider.headers_range(range),
|block_state, _| Some(block_state.block().block().header.header().clone()),
|_| true,
)
}
Expand All @@ -351,14 +408,10 @@ impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider2<N> {
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<SealedHeader>> {
self.fetch_db_mem_range(
self.fetch_db_mem_range_while(
range,
|range, _| self.database.sealed_headers_range(range),
|num, _| {
self.canonical_in_memory_state
.state_by_number(num)
.map(|block_state| block_state.block().block().header.clone())
},
|db_provider, range, _| db_provider.sealed_headers_range(range),
|block_state, _| Some(block_state.block().block().header.clone()),
|_| true,
)
}
Expand All @@ -368,14 +421,11 @@ impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider2<N> {
range: impl RangeBounds<BlockNumber>,
predicate: impl FnMut(&SealedHeader) -> bool,
) -> ProviderResult<Vec<SealedHeader>> {
self.fetch_db_mem_range(
self.fetch_db_mem_range_while(
range,
|range, predicate| self.database.sealed_headers_while(range, predicate),
|num, predicate| {
self.canonical_in_memory_state
.state_by_number(num)
.map(|block_state| block_state.block().block().header.clone())
.filter(|header| predicate(header))
|db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
|block_state, predicate| {
Some(block_state.block().block().header.clone()).filter(|header| predicate(header))
},
predicate,
)
Expand All @@ -396,14 +446,13 @@ impl<N: ProviderNodeTypes> BlockHashReader for BlockchainProvider2<N> {
start: BlockNumber,
end: BlockNumber,
) -> ProviderResult<Vec<B256>> {
self.fetch_db_mem_range(
start..=end,
|range, _| self.database.canonical_hashes_range(*range.start(), *range.end()),
|num, _| {
self.canonical_in_memory_state
.state_by_number(num)
.map(|block_state| block_state.hash())
self.fetch_db_mem_range_while(
start..end,
|db_provider, inclusive_range, _| {
db_provider
.canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
},
|block_state, _| Some(block_state.hash()),
|_| true,
)
}
Expand Down Expand Up @@ -595,14 +644,10 @@ impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider2<N> {
}

fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
self.fetch_db_mem_range(
self.fetch_db_mem_range_while(
range,
|range, _| self.database.block_range(range),
|num, _| {
self.canonical_in_memory_state
.state_by_number(num)
.map(|block_state| block_state.block().block().clone().unseal())
},
|db_provider, range, _| db_provider.block_range(range),
|block_state, _| Some(block_state.block().block().clone().unseal()),
|_| true,
)
}
Expand All @@ -611,15 +656,13 @@ impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider2<N> {
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<BlockWithSenders>> {
self.fetch_db_mem_range(
self.fetch_db_mem_range_while(
range,
|range, _| self.database.block_with_senders_range(range),
|num, _| {
self.canonical_in_memory_state.state_by_number(num).map(|block_state| {
let block = block_state.block().block().clone();
let senders = block_state.block().senders().clone();
BlockWithSenders { block: block.unseal(), senders }
})
|db_provider, range, _| db_provider.block_with_senders_range(range),
|block_state, _| {
let block = block_state.block().block().clone();
let senders = block_state.block().senders().clone();
Some(BlockWithSenders { block: block.unseal(), senders })
},
|_| true,
)
Expand All @@ -629,15 +672,13 @@ impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider2<N> {
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
self.fetch_db_mem_range(
self.fetch_db_mem_range_while(
range,
|range, _| self.database.sealed_block_with_senders_range(range),
|num, _| {
self.canonical_in_memory_state.state_by_number(num).map(|block_state| {
let block = block_state.block().block().clone();
let senders = block_state.block().senders().clone();
SealedBlockWithSenders { block, senders }
})
|db_provider, range, _| db_provider.sealed_block_with_senders_range(range),
|block_state, _| {
let block = block_state.block().block().clone();
let senders = block_state.block().senders().clone();
Some(SealedBlockWithSenders { block, senders })
},
|_| true,
)
Expand Down Expand Up @@ -2203,6 +2244,28 @@ mod tests {
assert_eq!(retrieved_block, &expected_block.clone().unseal());
}

// Check for partial in-memory ranges
let blocks = provider.block_range(start_block_number + 1..=end_block_number)?;
assert_eq!(blocks.len(), in_memory_blocks.len() - 1);
for (retrieved_block, expected_block) in blocks.iter().zip(in_memory_blocks.iter().skip(1))
{
assert_eq!(retrieved_block, &expected_block.clone().unseal());
}

let blocks = provider.block_range(start_block_number + 1..=end_block_number - 1)?;
assert_eq!(blocks.len(), in_memory_blocks.len() - 2);
for (retrieved_block, expected_block) in blocks.iter().zip(in_memory_blocks.iter().skip(1))
{
assert_eq!(retrieved_block, &expected_block.clone().unseal());
}

let blocks = provider.block_range(start_block_number + 1..=end_block_number + 1)?;
assert_eq!(blocks.len(), in_memory_blocks.len() - 1);
for (retrieved_block, expected_block) in blocks.iter().zip(in_memory_blocks.iter().skip(1))
{
assert_eq!(retrieved_block, &expected_block.clone().unseal());
}

Ok(())
}

Expand Down
Loading