Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Add AttesterCache for attestation production #2478

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9cd5d42
Add first compiling draft
paulhauner Jul 26, 2021
a920f28
Refactor to remove enum
paulhauner Jul 26, 2021
dde2880
Fix lint, ensure cache is primed
paulhauner Jul 26, 2021
7d90b26
Add max len to cache
paulhauner Jul 26, 2021
2d313e9
Add some initial comments
paulhauner Jul 26, 2021
7688e1a
Change cache key to decision root
paulhauner Jul 26, 2021
1207587
Add comments
paulhauner Jul 26, 2021
1dbe12c
Remove Vec from cache
paulhauner Jul 26, 2021
64e26a9
Fix bug with cache key, add comments
paulhauner Jul 26, 2021
0444d89
Handle uninitialized cache, add comments
paulhauner Jul 26, 2021
095f359
Add comments
paulhauner Jul 26, 2021
f538365
Tidy beacon chain
paulhauner Jul 26, 2021
66f9aa7
Apply restriction to HTTP api
paulhauner Jul 26, 2021
6fd2299
Tidy use of Option, add comments
paulhauner Jul 26, 2021
5ed7f42
Tidy comments
paulhauner Jul 26, 2021
e9ec567
Fix HTTP error
paulhauner Jul 26, 2021
d22ece5
Add explicit error for ancient slot
paulhauner Jul 26, 2021
f511137
Remove restriction from API
paulhauner Jul 26, 2021
b501963
Remove unused variable
paulhauner Jul 27, 2021
0d0c972
Tidy, add comments
paulhauner Jul 27, 2021
0697059
Move state reads into AttesterCache
paulhauner Jul 27, 2021
2fe2eaa
Add comments for cache size
paulhauner Jul 27, 2021
2906646
Add warn log for cache miss
paulhauner Jul 27, 2021
dd375a0
Tidy comments
paulhauner Jul 27, 2021
5bedecf
Add metrics
paulhauner Jul 27, 2021
0cdbe3a
Apply suggestions from code review
paulhauner Jul 28, 2021
625a69e
Partially address review comments
paulhauner Jul 28, 2021
8cb488e
Clone the head state
paulhauner Jul 28, 2021
cefab04
Fix state root, load target
paulhauner Jul 28, 2021
9e23360
Revert "Fix state root, load target"
paulhauner Jul 28, 2021
e7c3008
Revert "Clone the head state"
paulhauner Jul 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
380 changes: 380 additions & 0 deletions beacon_node/beacon_chain/src/attester_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,380 @@
//! This module provides the `AttesterCache`, a cache designed for reducing state-reads when
//! validators produce `AttestationData`.
//!
//! This cache is required *as well as* the `ShufflingCache` since the `ShufflingCache` does not
//! provide any information about the `state.current_justified_checkpoint`. It is not trivial to add
//! the justified checkpoint to the `ShufflingCache` since that cache keyed by shuffling decision
paulhauner marked this conversation as resolved.
Show resolved Hide resolved
//! root, which is not suitable for the justified checkpoint. Whilst we can know the shuffling for
//! epoch `n` during `n - 1`, we *cannot* know the justified checkpoint. Instead, we *must* perform
//! `per_epoch_processing` to transform the state from epoch `n - 1` to epoch `n` so that rewards
//! and penalties can be computed and the `state.current_justified_checkpoint` can be updated.

use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use parking_lot::RwLock;
use state_processing::state_advance::{partial_state_advance, Error as StateAdvanceError};
use std::collections::HashMap;
use std::ops::Range;
use types::{
beacon_state::{
compute_committee_index_in_epoch, compute_committee_range_in_epoch, epoch_committee_count,
},
BeaconState, BeaconStateError, ChainSpec, Checkpoint, Epoch, EthSpec, Hash256, RelativeEpoch,
Slot,
};

type JustifiedCheckpoint = Checkpoint;
type CommitteeLength = usize;
type CommitteeIndex = u64;
type CacheHashMap = HashMap<AttesterCacheKey, AttesterCacheValue>;

/// The maximum number of `AttesterCacheValues` to be kept in memory.
///
/// Each `AttesterCacheValues` is very small (~16 bytes) and the cache will generally be kept small
/// by pruning on finality.
///
/// The value provided here is much larger than will be used during ideal network conditions,
/// however we make it large since the values are so small.
const MAX_CACHE_LEN: usize = 1_024;

#[derive(Debug)]
pub enum Error {
BeaconState(BeaconStateError),
// Boxed to avoid an infinite-size recursion issue.
BeaconChain(Box<BeaconChainError>),
MissingBeaconState(Hash256),
FailedToTransitionState(StateAdvanceError),
CannotAttestToFutureState {
state_slot: Slot,
request_slot: Slot,
},
/// Indicates a cache inconsistency.
WrongEpoch {
request_epoch: Epoch,
epoch: Epoch,
},
InvalidCommitteeIndex {
committee_index: u64,
},
/// Indicates an inconsistency with the beacon state committees.
InverseRange {
range: Range<usize>,
},
}

impl From<BeaconStateError> for Error {
fn from(e: BeaconStateError) -> Self {
Error::BeaconState(e)
}
}

impl From<BeaconChainError> for Error {
fn from(e: BeaconChainError) -> Self {
Error::BeaconChain(Box::new(e))
}
}

/// Stores the minimal amount of data required compute the committee length for any committee at any
paulhauner marked this conversation as resolved.
Show resolved Hide resolved
/// slot in a given `epoch`.
struct CommitteeLengths {
/// The `epoch` to which the lengths pertain.
epoch: Epoch,
/// The length of the shuffling in `self.epoch`.
active_validator_indices_len: usize,
}

impl CommitteeLengths {
/// Instantiate `Self` using `state.current_epoch()`.
fn new<T: EthSpec>(state: &BeaconState<T>, spec: &ChainSpec) -> Result<Self, Error> {
let active_validator_indices_len = if let Ok(committee_cache) =
state.committee_cache(RelativeEpoch::Current)
{
committee_cache.active_validator_indices().len()
} else {
// Building the cache like this avoids taking a mutable reference to `BeaconState`.
let committee_cache = state.initialize_committee_cache(state.current_epoch(), spec)?;
committee_cache.active_validator_indices().len()
};

Ok(Self {
epoch: state.current_epoch(),
active_validator_indices_len,
})
}

/// Get the length of the committee at the given `slot` and `committee_index`.
fn get<T: EthSpec>(
&self,
slot: Slot,
committee_index: CommitteeIndex,
spec: &ChainSpec,
) -> Result<CommitteeLength, Error> {
let slots_per_epoch = T::slots_per_epoch();
let request_epoch = slot.epoch(slots_per_epoch);

// Sanity check.
if request_epoch != self.epoch {
return Err(Error::WrongEpoch {
request_epoch,
epoch: self.epoch,
});
}

let slots_per_epoch = slots_per_epoch as usize;
let committees_per_slot =
T::get_committee_count_per_slot(self.active_validator_indices_len, spec)?;
let index_in_epoch = compute_committee_index_in_epoch(
slot,
slots_per_epoch,
committees_per_slot,
committee_index as usize,
);
let range = compute_committee_range_in_epoch(
epoch_committee_count(committees_per_slot, slots_per_epoch),
index_in_epoch,
self.active_validator_indices_len,
)
.ok_or(Error::InvalidCommitteeIndex { committee_index })?;

range
.end
.checked_sub(range.start)
.ok_or(Error::InverseRange { range })
}
}

/// Provides the following information for some epoch:
///
/// - The `state.current_justified_checkpoint` value.
/// - The committee lengths for all indices and slots.
///
/// These values are used during attestation production.
pub struct AttesterCacheValue {
current_justified_checkpoint: Checkpoint,
committee_lengths: CommitteeLengths,
}

impl AttesterCacheValue {
/// Instantiate `Self` using `state.current_epoch()`.
pub fn new<T: EthSpec>(state: &BeaconState<T>, spec: &ChainSpec) -> Result<Self, Error> {
let current_justified_checkpoint = state.current_justified_checkpoint();
let committee_lengths = CommitteeLengths::new(state, spec)?;
Ok(Self {
current_justified_checkpoint,
committee_lengths,
})
}

/// Get the justified checkpoint and committee length for some `slot` and `committee_index`.
fn get<T: EthSpec>(
&self,
slot: Slot,
committee_index: CommitteeIndex,
spec: &ChainSpec,
) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> {
self.committee_lengths
.get::<T>(slot, committee_index, spec)
.map(|committee_length| (self.current_justified_checkpoint, committee_length))
}
}

/// The `AttesterCacheKey` is fundamentally the same thing as the proposer shuffling decision root,
/// however here we use it as an identity for both of the following values:
///
/// 1. The `state.current_justified_checkpoint`.
/// 2. The attester shuffling.
///
/// This struct relies upon the premise that the `state.current_justified_checkpoint` in epoch `n`
/// is determined by the root of the latest block in epoch `n - 1`. Notably, this is identical to
/// how the proposer shuffling is keyed in `BeaconProposerCache`.
///
/// It is also safe, but not maximally efficient, to key the attester shuffling with the same
/// strategy. For better shuffling keying strategies, see the `ShufflingCache`.
#[derive(PartialEq, Hash, Clone, Copy)]
pub struct AttesterCacheKey {
/// The epoch from which the justified checkpoint should be observed.
///
/// Attestations which use `self.epoch` as `target.epoch` should use this key.
epoch: Epoch,
/// The root of the block at the last slot of `self.epoch - 1`.
decision_root: Hash256,
}

impl AttesterCacheKey {
/// Instantiate `Self` to key `state.current_epoch()`.
///
/// The `latest_block_root` should be the latest block that has been applied to `state`. This
/// parameter is required since the state does not store the block root for any block with the
/// same slot as `slot.slot()`.
paulhauner marked this conversation as resolved.
Show resolved Hide resolved
///
/// ## Errors
///
/// May error if `epoch` is out of the range of `state.block_roots`.
pub fn new<T: EthSpec>(
epoch: Epoch,
state: &BeaconState<T>,
latest_block_root: Hash256,
) -> Result<Self, Error> {
let slots_per_epoch = T::slots_per_epoch();
let decision_slot = epoch.start_slot(slots_per_epoch).saturating_sub(1_u64);

let decision_root = if decision_slot.epoch(slots_per_epoch) == epoch {
// This scenario is only possible during the genesis epoch. In this scenario, all-zeros
// is used as an alias to the genesis block.
Hash256::zero()
} else if epoch > state.current_epoch() {
// If the requested epoch is higher than the current epoch, the latest block will always
// be the decision root.
latest_block_root
} else {
*state.get_block_root(decision_slot)?
};

Ok(Self {
epoch,
decision_root,
})
}
}

impl Eq for AttesterCacheKey {}
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved

/// Provides a cache for the justified checkpoint and committee length when producing an
/// attestation.
///
/// See the module-level documentation for more information.
#[derive(Default)]
pub struct AttesterCache {
cache: RwLock<CacheHashMap>,
}

impl AttesterCache {
/// Get the justified checkpoint and committee length for the `slot` and `committee_index` in
/// the state identified by the cache `key`.
pub fn get<T: EthSpec>(
&self,
key: &AttesterCacheKey,
slot: Slot,
committee_index: CommitteeIndex,
spec: &ChainSpec,
) -> Result<Option<(JustifiedCheckpoint, CommitteeLength)>, Error> {
self.cache
.read()
.get(key)
.map(|cache_item| cache_item.get::<T>(slot, committee_index, spec))
.transpose()
}

/// Cache the `state.current_epoch()` values if they are not already present in the state.
pub fn maybe_cache_state<T: EthSpec>(
&self,
state: &BeaconState<T>,
latest_block_root: Hash256,
spec: &ChainSpec,
) -> Result<(), Error> {
let key = AttesterCacheKey::new(state.current_epoch(), state, latest_block_root)?;
let mut cache = self.cache.write();
if !cache.contains_key(&key) {
let cache_item = AttesterCacheValue::new(state, spec)?;
Self::insert_respecting_max_len(&mut cache, key, cache_item);
}
Ok(())
}

/// Read the state identified by `state_root` from the database, advance it to the required
/// slot, use it to prime the cache and return the values for the provided `slot` and
/// `committee_index`.
///
/// ## Notes
///
/// This function takes a write-lock on the internal cache. Prefer attempting a `Self::get` call
/// before running this function as `Self::get` only takes a read-lock and is therefore less
/// likely to create head contention.
paulhauner marked this conversation as resolved.
Show resolved Hide resolved
pub fn load_and_cache_state<T: BeaconChainTypes>(
&self,
state_root: Hash256,
key: AttesterCacheKey,
slot: Slot,
committee_index: CommitteeIndex,
chain: &BeaconChain<T>,
) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> {
let spec = &chain.spec;
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let epoch = slot.epoch(slots_per_epoch);

// Take a write-lock on the cache before starting the state read.
//
// Whilst holding the write-lock during the state read will create contention, it prevents
// the scenario where multiple requests from separate threads cause duplicate state reads.
let mut cache = self.cache.write();

// Try the cache to see if someone has already primed it between the time the function was
// called and when the cache write-lock was obtained. This avoids performing duplicate state
// reads.
if let Some(value) = cache
.get(&key)
.map(|cache_item| cache_item.get::<T::EthSpec>(slot, committee_index, spec))
.transpose()?
{
return Ok(value);
}

let mut state: BeaconState<T::EthSpec> = chain
.get_state(&state_root, None)?
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
.ok_or(Error::MissingBeaconState(state_root))?;

if state.slot() > slot {
// This indicates an internal inconsistency.
return Err(Error::CannotAttestToFutureState {
state_slot: state.slot(),
request_slot: slot,
});
} else if state.current_epoch() < epoch {
// Only perform a "partial" state advance since we do not require the state roots to be
// accurate.
partial_state_advance(
&mut state,
Some(state_root),
epoch.start_slot(slots_per_epoch),
spec,
)
.map_err(Error::FailedToTransitionState)?;
state.build_committee_cache(RelativeEpoch::Current, spec)?;
}

let cache_item = AttesterCacheValue::new(&state, spec)?;
let value = cache_item.get::<T::EthSpec>(slot, committee_index, spec)?;
Self::insert_respecting_max_len(&mut cache, key, cache_item);
Ok(value)
}

/// Insert a value to `cache`, ensuring it does not exceed the maximum length.
///
/// If the cache is already full, the item with the lowest epoch will be removed.
fn insert_respecting_max_len(
cache: &mut CacheHashMap,
key: AttesterCacheKey,
value: AttesterCacheValue,
) {
if cache.len() >= MAX_CACHE_LEN {
while let Some(oldest) = cache
.iter()
.map(|(key, _)| key)
.min_by_key(|key| key.epoch)
// Only return values whilst the cache is full.
.filter(|_| cache.len() >= MAX_CACHE_LEN)
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
.copied()
{
cache.remove(&oldest);
}
}

cache.insert(key, value);
}

/// Remove all entries where the `key.epoch` is lower than the given `epoch`.
///
/// Generally, the provided `epoch` should be the finalized epoch.
pub fn prune_below(&self, epoch: Epoch) {
self.cache.write().retain(|target, _| target.epoch >= epoch);
}
}
Loading