Skip to content

Commit

Permalink
Use the forwards iterator more often (#2376)
Browse files Browse the repository at this point in the history
## Issue Addressed

NA

## Primary Change

When investigating memory usage, I noticed that retrieving a block from an early slot (e.g., slot 900) would cause a sharp increase in the memory footprint (from 400mb to 800mb+) which seemed to be ever-lasting.

After some investigation, I found that the reverse iteration from the head back to that slot was the likely culprit. To counter this, I've switched the `BeaconChain::block_root_at_slot` to use the forwards iterator, instead of the reverse one.

I also noticed that the networking stack is using `BeaconChain::root_at_slot` to check if a peer is relevant (`check_peer_relevance`). Perhaps the steep, seemingly-random-but-consistent increases in memory usage are caused by the use of this function.

Using the forwards iterator with the HTTP API alleviated the sharp increases in memory usage. It also made the response much faster (before it felt like to took 1-2s, now it feels instant).

## Additional Changes

In the process I also noticed that we have two functions for getting block roots:

- `BeaconChain::block_root_at_slot`: returns `None` for a skip slot.
- `BeaconChain::root_at_slot`: returns the previous root for a skip slot.

I unified these two functions into `block_root_at_slot` and added the `WhenSlotSkipped` enum. Now, the caller must be explicit about the skip-slot behaviour when requesting a root. 

Additionally, I replaced `vec![]` with `Vec::with_capacity` in `store::chunked_vector::range_query`. I stumbled across this whilst debugging and made this modification to see what effect it would have (not much). It seems like a decent change to keep around, but I'm not concerned either way.

Also, `BeaconChain::get_ancestor_block_root` is unused, so I got rid of it :wastebasket:.

## Additional Info

I haven't also done the same for state roots here. Whilst it's possible and a good idea, it's more work since the fwds iterators are presently block-roots-specific.

Whilst there's a few places a reverse iteration of state roots could be triggered (e.g., attestation production, HTTP API), they're no where near as common as the `check_peer_relevance` call. As such, I think we should get this PR merged first, then come back for the state root iters. I made an issue here #2377.
  • Loading branch information
paulhauner committed May 31, 2021
1 parent 263f46b commit bcaf401
Show file tree
Hide file tree
Showing 12 changed files with 339 additions and 65 deletions.
182 changes: 142 additions & 40 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
use fork_choice::ForkChoice;
use futures::channel::mpsc::Sender;
use itertools::process_results;
use itertools::Itertools;
use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::{Mutex, RwLock};
use slasher::Slasher;
Expand Down Expand Up @@ -85,6 +86,18 @@ pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero();
pub const ETH1_CACHE_DB_KEY: Hash256 = Hash256::zero();
pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::zero();

/// Defines the behaviour when a block/block-root for a skipped slot is requested.
pub enum WhenSlotSkipped {
/// If the slot is a skip slot, return `None`.
///
/// This is how the HTTP API behaves.
None,
/// If the slot it a skip slot, return the previous non-skipped block.
///
/// This is generally how the specification behaves.
Prev,
}

/// The result of a chain segment processing.
pub enum ChainSegmentResult<T: EthSpec> {
/// Processing this chain segment finished successfully.
Expand Down Expand Up @@ -442,18 +455,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|result| result.map_err(|e| e.into())))
}

/// Traverse backwards from `block_root` to find the root of the ancestor block at `slot`.
pub fn get_ancestor_block_root(
&self,
block_root: Hash256,
slot: Slot,
) -> Result<Option<Hash256>, Error> {
process_results(self.rev_iter_block_roots_from(block_root)?, |mut iter| {
iter.find(|(_, ancestor_slot)| *ancestor_slot == slot)
.map(|(ancestor_block_root, _)| ancestor_block_root)
})
}

/// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to
/// the earliest reachable ancestor (may or may not be genesis).
///
Expand Down Expand Up @@ -489,17 +490,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Returns the block at the given slot, if any. Only returns blocks in the canonical chain.
///
/// Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot.
///
/// ## Errors
///
/// May return a database error.
pub fn block_at_slot(
&self,
slot: Slot,
request_slot: Slot,
skips: WhenSlotSkipped,
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
let root = process_results(self.rev_iter_block_roots()?, |mut iter| {
iter.find(|(_, this_slot)| *this_slot == slot)
.map(|(root, _)| root)
})?;
let root = self.block_root_at_slot(request_slot, skips)?;

if let Some(block_root) = root {
Ok(self.store.get_item(&block_root)?)
Expand All @@ -521,21 +522,132 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.
/// Returns `Ok(None)` if the given `Slot` was skipped.
///
/// ## Notes
///
/// - Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot.
/// - Returns `Ok(None)` for any slot higher than the current wall-clock slot.
pub fn block_root_at_slot(
&self,
request_slot: Slot,
skips: WhenSlotSkipped,
) -> Result<Option<Hash256>, Error> {
match skips {
WhenSlotSkipped::None => self.block_root_at_slot_skips_none(request_slot),
WhenSlotSkipped::Prev => self.block_root_at_slot_skips_prev(request_slot),
}
}

/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.
///
/// ## Notes
///
/// - Returns `Ok(None)` if the given `Slot` was skipped.
/// - Returns `Ok(None)` for any slot higher than the current wall-clock slot.
///
/// ## Errors
///
/// May return a database error.
pub fn block_root_at_slot(&self, slot: Slot) -> Result<Option<Hash256>, Error> {
process_results(self.rev_iter_block_roots()?, |mut iter| {
let root_opt = iter
.find(|(_, this_slot)| *this_slot == slot)
.map(|(root, _)| root);
if let (Some(root), Some((prev_root, _))) = (root_opt, iter.next()) {
return (prev_root != root).then(|| root);
fn block_root_at_slot_skips_none(&self, request_slot: Slot) -> Result<Option<Hash256>, Error> {
if request_slot > self.slot()? {
return Ok(None);
} else if request_slot == self.spec.genesis_slot {
return Ok(Some(self.genesis_block_root));
}

let prev_slot = request_slot.saturating_sub(1_u64);

// Try an optimized path of reading the root directly from the head state.
let fast_lookup: Option<Option<Hash256>> = self.with_head(|head| {
let state = &head.beacon_state;

// Try find the root for the `request_slot`.
let request_root_opt = match state.slot.cmp(&request_slot) {
// It's always a skip slot if the head is less than the request slot, return early.
Ordering::Less => return Ok(Some(None)),
// The request slot is the head slot.
Ordering::Equal => Some(head.beacon_block_root),
// Try find the request slot in the state.
Ordering::Greater => state.get_block_root(request_slot).ok().copied(),
};

if let Some(request_root) = request_root_opt {
if let Ok(prev_root) = state.get_block_root(prev_slot) {
return Ok(Some((*prev_root != request_root).then(|| request_root)));
}
}
root_opt
})

// Fast lookup is not possible.
Ok::<_, Error>(None)
})?;
if let Some(root_opt) = fast_lookup {
return Ok(root_opt);
}

if let Some(((prev_root, _), (curr_root, curr_slot))) =
process_results(self.forwards_iter_block_roots(prev_slot)?, |iter| {
iter.tuple_windows().next()
})?
{
// Sanity check.
if curr_slot != request_slot {
return Err(Error::InconsistentForwardsIter {
request_slot,
slot: curr_slot,
});
}
return Ok((curr_root != prev_root).then(|| curr_root));
} else {
return Ok(None);
}
}

/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.
///
/// ## Notes
///
/// - Returns the root at the previous non-skipped slot if the given `Slot` was skipped.
/// - Returns `Ok(None)` for any slot higher than the current wall-clock slot.
///
/// ## Errors
///
/// May return a database error.
fn block_root_at_slot_skips_prev(&self, request_slot: Slot) -> Result<Option<Hash256>, Error> {
if request_slot > self.slot()? {
return Ok(None);
} else if request_slot == self.spec.genesis_slot {
return Ok(Some(self.genesis_block_root));
}

// Try an optimized path of reading the root directly from the head state.
let fast_lookup: Option<Hash256> = self.with_head(|head| {
if head.beacon_block.slot() <= request_slot {
// Return the head root if all slots between the request and the head are skipped.
Ok(Some(head.beacon_block_root))
} else if let Ok(root) = head.beacon_state.get_block_root(request_slot) {
// Return the root if it's easily accessible from the head state.
Ok(Some(*root))
} else {
// Fast lookup is not possible.
Ok::<_, Error>(None)
}
})?;
if let Some(root) = fast_lookup {
return Ok(Some(root));
}

process_results(self.forwards_iter_block_roots(request_slot)?, |mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
}
} else {
Ok(None)
}
})?
}

/// Returns the block at the given root, if any.
Expand Down Expand Up @@ -825,16 +937,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(map)
}

/// Returns the block canonical root of the current canonical chain at a given slot.
///
/// Returns `None` if the given slot doesn't exist in the chain.
pub fn root_at_slot(&self, target_slot: Slot) -> Result<Option<Hash256>, Error> {
process_results(self.rev_iter_block_roots()?, |mut iter| {
iter.find(|(_, slot)| *slot == target_slot)
.map(|(root, _)| root)
})
}

/// Returns the block canonical root of the current canonical chain at a given slot, starting from the given state.
///
/// Returns `None` if the given slot doesn't exist in the chain.
Expand Down Expand Up @@ -2324,10 +2426,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_head_subscribers() {
if let Ok(Some(current_duty_dependent_root)) =
self.root_at_slot(target_epoch_start_slot - 1)
self.block_root_at_slot(target_epoch_start_slot - 1, WhenSlotSkipped::Prev)
{
if let Ok(Some(previous_duty_dependent_root)) =
self.root_at_slot(prev_target_epoch_start_slot - 1)
if let Ok(Some(previous_duty_dependent_root)) = self
.block_root_at_slot(prev_target_epoch_start_slot - 1, WhenSlotSkipped::Prev)
{
event_handler.register(EventKind::Head(SseHead {
slot: head_slot,
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ pub enum BeaconChainError {
state_epoch: Epoch,
shuffling_epoch: Epoch,
},
InconsistentForwardsIter {
request_slot: Slot,
slot: Slot,
},
}

easy_from_to!(SlotProcessingError, BeaconChainError);
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ mod validator_pubkey_cache;

pub use self::beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
ForkChoiceError, StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
ForkChoiceError, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
};
pub use self::beacon_snapshot::BeaconSnapshot;
pub use self::chain_config::ChainConfig;
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/tests/attestation_production.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ extern crate lazy_static;

use beacon_chain::{
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy},
StateSkipConfig,
StateSkipConfig, WhenSlotSkipped,
};
use store::config::StoreConfig;
use tree_hash::TreeHash;
Expand Down Expand Up @@ -60,7 +60,7 @@ fn produces_attestations() {
};

let block = chain
.block_at_slot(block_slot)
.block_at_slot(block_slot, WhenSlotSkipped::Prev)
.expect("should get block")
.expect("block should not be skipped");
let block_root = block.message.tree_hash_root();
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/tests/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ extern crate lazy_static;
use beacon_chain::{
attestation_verification::Error as AttnError,
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType},
BeaconChain, BeaconChainTypes,
BeaconChain, BeaconChainTypes, WhenSlotSkipped,
};
use int_to_bytes::int_to_bytes32;
use state_processing::{
Expand Down Expand Up @@ -912,7 +912,7 @@ fn attestation_that_skips_epochs() {
let earlier_slot = (current_epoch - 2).start_slot(MainnetEthSpec::slots_per_epoch());
let earlier_block = harness
.chain
.block_at_slot(earlier_slot)
.block_at_slot(earlier_slot, WhenSlotSkipped::Prev)
.expect("should not error getting block at slot")
.expect("should find block at slot");

Expand Down
Loading

0 comments on commit bcaf401

Please sign in to comment.