diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs new file mode 100644 index 00000000000..01662efc135 --- /dev/null +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -0,0 +1,377 @@ +//! This module provides the `AttesterCache`, a cache designed for reducing state-reads when +//! validators produce `AttestationData`. +//! +//! This cache is required *as well as* the `ShufflingCache` since the `ShufflingCache` does not +//! provide any information about the `state.current_justified_checkpoint`. It is not trivial to add +//! the justified checkpoint to the `ShufflingCache` since that cache is keyed by shuffling decision +//! root, which is not suitable for the justified checkpoint. Whilst we can know the shuffling for +//! epoch `n` during `n - 1`, we *cannot* know the justified checkpoint. Instead, we *must* perform +//! `per_epoch_processing` to transform the state from epoch `n - 1` to epoch `n` so that rewards +//! and penalties can be computed and the `state.current_justified_checkpoint` can be updated. + +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use parking_lot::RwLock; +use state_processing::state_advance::{partial_state_advance, Error as StateAdvanceError}; +use std::collections::HashMap; +use std::ops::Range; +use types::{ + beacon_state::{ + compute_committee_index_in_epoch, compute_committee_range_in_epoch, epoch_committee_count, + }, + BeaconState, BeaconStateError, ChainSpec, Checkpoint, Epoch, EthSpec, Hash256, RelativeEpoch, + Slot, +}; + +type JustifiedCheckpoint = Checkpoint; +type CommitteeLength = usize; +type CommitteeIndex = u64; +type CacheHashMap = HashMap; + +/// The maximum number of `AttesterCacheValues` to be kept in memory. +/// +/// Each `AttesterCacheValues` is very small (~16 bytes) and the cache will generally be kept small +/// by pruning on finality. +/// +/// The value provided here is much larger than will be used during ideal network conditions, +/// however we make it large since the values are so small. +const MAX_CACHE_LEN: usize = 1_024; + +#[derive(Debug)] +pub enum Error { + BeaconState(BeaconStateError), + // Boxed to avoid an infinite-size recursion issue. + BeaconChain(Box), + MissingBeaconState(Hash256), + FailedToTransitionState(StateAdvanceError), + CannotAttestToFutureState { + state_slot: Slot, + request_slot: Slot, + }, + /// Indicates a cache inconsistency. + WrongEpoch { + request_epoch: Epoch, + epoch: Epoch, + }, + InvalidCommitteeIndex { + committee_index: u64, + }, + /// Indicates an inconsistency with the beacon state committees. + InverseRange { + range: Range, + }, +} + +impl From for Error { + fn from(e: BeaconStateError) -> Self { + Error::BeaconState(e) + } +} + +impl From for Error { + fn from(e: BeaconChainError) -> Self { + Error::BeaconChain(Box::new(e)) + } +} + +/// Stores the minimal amount of data required to compute the committee length for any committee at any +/// slot in a given `epoch`. +struct CommitteeLengths { + /// The `epoch` to which the lengths pertain. + epoch: Epoch, + /// The length of the shuffling in `self.epoch`. + active_validator_indices_len: usize, +} + +impl CommitteeLengths { + /// Instantiate `Self` using `state.current_epoch()`. + fn new(state: &BeaconState, spec: &ChainSpec) -> Result { + let active_validator_indices_len = if let Ok(committee_cache) = + state.committee_cache(RelativeEpoch::Current) + { + committee_cache.active_validator_indices().len() + } else { + // Building the cache like this avoids taking a mutable reference to `BeaconState`. + let committee_cache = state.initialize_committee_cache(state.current_epoch(), spec)?; + committee_cache.active_validator_indices().len() + }; + + Ok(Self { + epoch: state.current_epoch(), + active_validator_indices_len, + }) + } + + /// Get the length of the committee at the given `slot` and `committee_index`. + fn get( + &self, + slot: Slot, + committee_index: CommitteeIndex, + spec: &ChainSpec, + ) -> Result { + let slots_per_epoch = T::slots_per_epoch(); + let request_epoch = slot.epoch(slots_per_epoch); + + // Sanity check. + if request_epoch != self.epoch { + return Err(Error::WrongEpoch { + request_epoch, + epoch: self.epoch, + }); + } + + let slots_per_epoch = slots_per_epoch as usize; + let committees_per_slot = + T::get_committee_count_per_slot(self.active_validator_indices_len, spec)?; + let index_in_epoch = compute_committee_index_in_epoch( + slot, + slots_per_epoch, + committees_per_slot, + committee_index as usize, + ); + let range = compute_committee_range_in_epoch( + epoch_committee_count(committees_per_slot, slots_per_epoch), + index_in_epoch, + self.active_validator_indices_len, + ) + .ok_or(Error::InvalidCommitteeIndex { committee_index })?; + + range + .end + .checked_sub(range.start) + .ok_or(Error::InverseRange { range }) + } +} + +/// Provides the following information for some epoch: +/// +/// - The `state.current_justified_checkpoint` value. +/// - The committee lengths for all indices and slots. +/// +/// These values are used during attestation production. +pub struct AttesterCacheValue { + current_justified_checkpoint: Checkpoint, + committee_lengths: CommitteeLengths, +} + +impl AttesterCacheValue { + /// Instantiate `Self` using `state.current_epoch()`. + pub fn new(state: &BeaconState, spec: &ChainSpec) -> Result { + let current_justified_checkpoint = state.current_justified_checkpoint(); + let committee_lengths = CommitteeLengths::new(state, spec)?; + Ok(Self { + current_justified_checkpoint, + committee_lengths, + }) + } + + /// Get the justified checkpoint and committee length for some `slot` and `committee_index`. + fn get( + &self, + slot: Slot, + committee_index: CommitteeIndex, + spec: &ChainSpec, + ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { + self.committee_lengths + .get::(slot, committee_index, spec) + .map(|committee_length| (self.current_justified_checkpoint, committee_length)) + } +} + +/// The `AttesterCacheKey` is fundamentally the same thing as the proposer shuffling decision root, +/// however here we use it as an identity for both of the following values: +/// +/// 1. The `state.current_justified_checkpoint`. +/// 2. The attester shuffling. +/// +/// This struct relies upon the premise that the `state.current_justified_checkpoint` in epoch `n` +/// is determined by the root of the latest block in epoch `n - 1`. Notably, this is identical to +/// how the proposer shuffling is keyed in `BeaconProposerCache`. +/// +/// It is also safe, but not maximally efficient, to key the attester shuffling with the same +/// strategy. For better shuffling keying strategies, see the `ShufflingCache`. +#[derive(Eq, PartialEq, Hash, Clone, Copy)] +pub struct AttesterCacheKey { + /// The epoch from which the justified checkpoint should be observed. + /// + /// Attestations which use `self.epoch` as `target.epoch` should use this key. + epoch: Epoch, + /// The root of the block at the last slot of `self.epoch - 1`. + decision_root: Hash256, +} + +impl AttesterCacheKey { + /// Instantiate `Self` to key `state.current_epoch()`. + /// + /// The `latest_block_root` should be the latest block that has been applied to `state`. This + /// parameter is required since the state does not store the block root for any block with the + /// same slot as `state.slot()`. + /// + /// ## Errors + /// + /// May error if `epoch` is out of the range of `state.block_roots`. + pub fn new( + epoch: Epoch, + state: &BeaconState, + latest_block_root: Hash256, + ) -> Result { + let slots_per_epoch = T::slots_per_epoch(); + let decision_slot = epoch.start_slot(slots_per_epoch).saturating_sub(1_u64); + + let decision_root = if decision_slot.epoch(slots_per_epoch) == epoch { + // This scenario is only possible during the genesis epoch. In this scenario, all-zeros + // is used as an alias to the genesis block. + Hash256::zero() + } else if epoch > state.current_epoch() { + // If the requested epoch is higher than the current epoch, the latest block will always + // be the decision root. + latest_block_root + } else { + *state.get_block_root(decision_slot)? + }; + + Ok(Self { + epoch, + decision_root, + }) + } +} + +/// Provides a cache for the justified checkpoint and committee length when producing an +/// attestation. +/// +/// See the module-level documentation for more information. +#[derive(Default)] +pub struct AttesterCache { + cache: RwLock, +} + +impl AttesterCache { + /// Get the justified checkpoint and committee length for the `slot` and `committee_index` in + /// the state identified by the cache `key`. + pub fn get( + &self, + key: &AttesterCacheKey, + slot: Slot, + committee_index: CommitteeIndex, + spec: &ChainSpec, + ) -> Result, Error> { + self.cache + .read() + .get(key) + .map(|cache_item| cache_item.get::(slot, committee_index, spec)) + .transpose() + } + + /// Cache the `state.current_epoch()` values if they are not already present in the state. + pub fn maybe_cache_state( + &self, + state: &BeaconState, + latest_block_root: Hash256, + spec: &ChainSpec, + ) -> Result<(), Error> { + let key = AttesterCacheKey::new(state.current_epoch(), state, latest_block_root)?; + let mut cache = self.cache.write(); + if !cache.contains_key(&key) { + let cache_item = AttesterCacheValue::new(state, spec)?; + Self::insert_respecting_max_len(&mut cache, key, cache_item); + } + Ok(()) + } + + /// Read the state identified by `state_root` from the database, advance it to the required + /// slot, use it to prime the cache and return the values for the provided `slot` and + /// `committee_index`. + /// + /// ## Notes + /// + /// This function takes a write-lock on the internal cache. Prefer attempting a `Self::get` call + /// before running this function as `Self::get` only takes a read-lock and is therefore less + /// likely to create contention. + pub fn load_and_cache_state( + &self, + state_root: Hash256, + key: AttesterCacheKey, + slot: Slot, + committee_index: CommitteeIndex, + chain: &BeaconChain, + ) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> { + let spec = &chain.spec; + let slots_per_epoch = T::EthSpec::slots_per_epoch(); + let epoch = slot.epoch(slots_per_epoch); + + // Take a write-lock on the cache before starting the state read. + // + // Whilst holding the write-lock during the state read will create contention, it prevents + // the scenario where multiple requests from separate threads cause duplicate state reads. + let mut cache = self.cache.write(); + + // Try the cache to see if someone has already primed it between the time the function was + // called and when the cache write-lock was obtained. This avoids performing duplicate state + // reads. + if let Some(value) = cache + .get(&key) + .map(|cache_item| cache_item.get::(slot, committee_index, spec)) + .transpose()? + { + return Ok(value); + } + + let mut state: BeaconState = chain + .get_state(&state_root, None)? + .ok_or(Error::MissingBeaconState(state_root))?; + + if state.slot() > slot { + // This indicates an internal inconsistency. + return Err(Error::CannotAttestToFutureState { + state_slot: state.slot(), + request_slot: slot, + }); + } else if state.current_epoch() < epoch { + // Only perform a "partial" state advance since we do not require the state roots to be + // accurate. + partial_state_advance( + &mut state, + Some(state_root), + epoch.start_slot(slots_per_epoch), + spec, + ) + .map_err(Error::FailedToTransitionState)?; + state.build_committee_cache(RelativeEpoch::Current, spec)?; + } + + let cache_item = AttesterCacheValue::new(&state, spec)?; + let value = cache_item.get::(slot, committee_index, spec)?; + Self::insert_respecting_max_len(&mut cache, key, cache_item); + Ok(value) + } + + /// Insert a value to `cache`, ensuring it does not exceed the maximum length. + /// + /// If the cache is already full, the item with the lowest epoch will be removed. + fn insert_respecting_max_len( + cache: &mut CacheHashMap, + key: AttesterCacheKey, + value: AttesterCacheValue, + ) { + while cache.len() >= MAX_CACHE_LEN { + if let Some(oldest) = cache + .iter() + .map(|(key, _)| *key) + .min_by_key(|key| key.epoch) + { + cache.remove(&oldest); + } else { + break; + } + } + + cache.insert(key, value); + } + + /// Remove all entries where the `key.epoch` is lower than the given `epoch`. + /// + /// Generally, the provided `epoch` should be the finalized epoch. + pub fn prune_below(&self, epoch: Epoch) { + self.cache.write().retain(|target, _| target.epoch >= epoch); + } +} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 60924f6beb0..da21e264295 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2,6 +2,7 @@ use crate::attestation_verification::{ Error as AttestationError, SignatureVerifiedAttestation, VerifiedAggregatedAttestation, VerifiedUnaggregatedAttestation, }; +use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::block_verification::{ check_block_is_finalized_descendant, check_block_relevancy, get_block_root, @@ -289,6 +290,8 @@ pub struct BeaconChain { pub beacon_proposer_cache: Mutex, /// Caches a map of `validator_index -> validator_pubkey`. pub(crate) validator_pubkey_cache: TimeoutRwLock>, + /// A cache used when producing attestations. + pub(crate) attester_cache: Arc, /// A list of any hard-coded forks that have been disabled. pub disabled_forks: Vec, /// Sender given to tasks, so that if they encounter a state in which execution cannot @@ -1217,44 +1220,174 @@ impl BeaconChain { /// validator that is in the committee for `slot` and `index` in the canonical chain. /// /// Always attests to the canonical chain. + /// + /// ## Errors + /// + /// May return an error if the `request_slot` is too far behind the head state. pub fn produce_unaggregated_attestation( &self, - slot: Slot, - index: CommitteeIndex, + request_slot: Slot, + request_index: CommitteeIndex, ) -> Result, Error> { - // Note: we're taking a lock on the head. The work involved here should be trivial enough - // that the lock should not be held for long. - let head = self - .canonical_head - .try_read_for(HEAD_LOCK_TIMEOUT) - .ok_or(Error::CanonicalHeadLockTimeout)?; + let _total_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_SECONDS); - if slot >= head.beacon_block.slot() { - self.produce_unaggregated_attestation_for_block( - slot, - index, - head.beacon_block_root, - Cow::Borrowed(&head.beacon_state), - head.beacon_state_root(), - ) + let slots_per_epoch = T::EthSpec::slots_per_epoch(); + let request_epoch = request_slot.epoch(slots_per_epoch); + + /* + * Phase 1/2: + * + * Take a short-lived read-lock on the head and copy the necessary information from it. + * + * It is important that this first phase is as quick as possible; creating contention for + * the head-lock is not desirable. + */ + + let head_state_slot; + let beacon_block_root; + let beacon_state_root; + let target; + let current_epoch_attesting_info: Option<(Checkpoint, usize)>; + let attester_cache_key; + let head_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_HEAD_SCRAPE_SECONDS); + if let Some(head) = self.canonical_head.try_read_for(HEAD_LOCK_TIMEOUT) { + let head_state = &head.beacon_state; + head_state_slot = head_state.slot(); + + // There is no value in producing an attestation to a block that is pre-finalization and + // it is likely to cause expensive and pointless reads to the freezer database. Exit + // early if this is the case. + let finalized_slot = head_state + .finalized_checkpoint() + .epoch + .start_slot(slots_per_epoch); + if request_slot < finalized_slot { + return Err(Error::AttestingToFinalizedSlot { + finalized_slot, + request_slot, + }); + } + + // This function will eventually fail when trying to access a slot which is + // out-of-bounds of `state.block_roots`. This explicit error is intended to provide a + // clearer message to the user than an ambiguous `SlotOutOfBounds` error. + let slots_per_historical_root = T::EthSpec::slots_per_historical_root() as u64; + let lowest_permissible_slot = + head_state.slot().saturating_sub(slots_per_historical_root); + if request_slot < lowest_permissible_slot { + return Err(Error::AttestingToAncientSlot { + lowest_permissible_slot, + request_slot, + }); + } + + if request_slot >= head_state.slot() { + // When attesting to the head slot or later, always use the head of the chain. + beacon_block_root = head.beacon_block_root; + beacon_state_root = head.beacon_state_root(); + } else { + // Permit attesting to slots *prior* to the current head. This is desirable when + // the VC and BN are out-of-sync due to time issues or overloading. + beacon_block_root = *head_state.get_block_root(request_slot)?; + beacon_state_root = *head_state.get_state_root(request_slot)?; + }; + + let target_slot = request_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let target_root = if head_state.slot() <= target_slot { + // If the state is earlier than the target slot then the target *must* be the head + // block root. + beacon_block_root + } else { + *head_state.get_block_root(target_slot)? + }; + target = Checkpoint { + epoch: request_epoch, + root: target_root, + }; + + current_epoch_attesting_info = if head_state.current_epoch() == request_epoch { + // When the head state is in the same epoch as the request, all the information + // required to attest is available on the head state. + Some(( + head_state.current_justified_checkpoint(), + head_state + .get_beacon_committee(request_slot, request_index)? + .committee + .len(), + )) + } else { + // If the head state is in a *different* epoch to the request, more work is required + // to determine the justified checkpoint and committee length. + None + }; + + // Determine the key for `self.attester_cache`, in case it is required later in this + // routine. + attester_cache_key = + AttesterCacheKey::new(request_epoch, head_state, beacon_block_root)?; } else { - // We disallow producing attestations *prior* to the current head since such an - // attestation would require loading a `BeaconState` from disk. Loading `BeaconState` - // from disk is very resource intensive and proposes a DoS risk from validator clients. - // - // Although we generally allow validator clients to do things that might harm us (i.e., - // we trust them), sometimes we need to protect the BN from accidental errors which - // could cause it significant harm. - // - // This case is particularity harmful since the HTTP API can effectively call this - // function an unlimited amount of times. If `n` validators all happen to call it at - // the same time, we're going to load `n` states (and tree hash caches) into memory all - // at once. With `n >= 10` we're looking at hundreds of MB or GBs of RAM. - Err(Error::AttestingPriorToHead { - head_slot: head.beacon_block.slot(), - request_slot: slot, - }) + return Err(Error::CanonicalHeadLockTimeout); } + drop(head_timer); + + /* + * Phase 2/2: + * + * If the justified checkpoint and committee length from the head are suitable for this + * attestation, use them. If not, try the attester cache. If the cache misses, load a state + * from disk and prime the cache with it. + */ + + let cache_timer = + metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_CACHE_INTERACTION_SECONDS); + let (justified_checkpoint, committee_len) = + if let Some((justified_checkpoint, committee_len)) = current_epoch_attesting_info { + // The head state is in the same epoch as the attestation, so there is no more + // required information. + (justified_checkpoint, committee_len) + } else if let Some(cached_values) = self.attester_cache.get::( + &attester_cache_key, + request_slot, + request_index, + &self.spec, + )? { + // The suitable values were already cached. Return them. + cached_values + } else { + debug!( + self.log, + "Attester cache miss"; + "beacon_block_root" => ?beacon_block_root, + "head_state_slot" => %head_state_slot, + "request_slot" => %request_slot, + ); + + // Neither the head state, nor the attester cache was able to produce the required + // information to attest in this epoch. So, load a `BeaconState` from disk and use + // it to fulfil the request (and prime the cache to avoid this next time). + let _cache_build_timer = + metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_CACHE_PRIME_SECONDS); + self.attester_cache.load_and_cache_state( + beacon_state_root, + attester_cache_key, + request_slot, + request_index, + &self, + )? + }; + drop(cache_timer); + + Ok(Attestation { + aggregation_bits: BitList::with_capacity(committee_len)?, + data: AttestationData { + slot: request_slot, + index: request_index, + beacon_block_root, + source: justified_checkpoint, + target, + }, + signature: AggregateSignature::empty(), + }) } /// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to @@ -2023,6 +2156,7 @@ impl BeaconChain { let block_root = fully_verified_block.block_root; let mut state = fully_verified_block.state; let current_slot = self.slot()?; + let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); let mut ops = fully_verified_block.confirmation_db_batch; let attestation_observation_timer = @@ -2086,6 +2220,17 @@ impl BeaconChain { } } + // Apply the state to the attester cache, only if it is from the previous epoch or later. + // + // In a perfect scenario there should be no need to add previous-epoch states to the cache. + // However, latency between the VC and the BN might cause the VC to produce attestations at + // a previous slot. + if state.current_epoch().saturating_add(1_u64) >= current_epoch { + self.attester_cache + .maybe_cache_state(&state, block_root, &self.spec) + .map_err(BeaconChainError::from)?; + } + let mut fork_choice = self.fork_choice.write(); // Do not import a block that doesn't descend from the finalized root. @@ -2936,6 +3081,9 @@ impl BeaconChain { self.head_tracker.clone(), )?; + self.attester_cache + .prune_below(new_finalized_checkpoint.epoch); + if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_finalized_subscribers() { event_handler.register(EventKind::FinalizedCheckpoint(SseFinalizedCheckpoint { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a270872b26f..bb13dab56f0 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -547,6 +547,7 @@ where shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), beacon_proposer_cache: <_>::default(), validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), + attester_cache: <_>::default(), disabled_forks: self.disabled_forks, shutdown_sender: self .shutdown_sender @@ -561,6 +562,16 @@ where .head() .map_err(|e| format!("Failed to get head: {:?}", e))?; + // Prime the attester cache with the head state. + beacon_chain + .attester_cache + .maybe_cache_state( + &head.beacon_state, + head.beacon_block_root, + &beacon_chain.spec, + ) + .map_err(|e| format!("Failed to prime attester cache: {:?}", e))?; + // Only perform the check if it was configured. if let Some(wss_checkpoint) = beacon_chain.config.weak_subjectivity_checkpoint { if let Err(e) = beacon_chain.verify_weak_subjectivity_checkpoint( diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index c4a0bb6d4a6..f484b194549 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -1,3 +1,4 @@ +use crate::attester_cache::Error as AttesterCacheError; use crate::beacon_chain::ForkChoiceError; use crate::beacon_fork_choice_store::Error as ForkChoiceStoreError; use crate::eth1_chain::Error as Eth1ChainError; @@ -91,6 +92,7 @@ pub enum BeaconChainError { ObservedAttestationsError(ObservedAttestationsError), ObservedAttestersError(ObservedAttestersError), ObservedBlockProducersError(ObservedBlockProducersError), + AttesterCacheError(AttesterCacheError), PruningError(PruningError), ArithError(ArithError), InvalidShufflingId { @@ -99,8 +101,12 @@ pub enum BeaconChainError { }, WeakSubjectivtyVerificationFailure, WeakSubjectivtyShutdownError(TrySendError), - AttestingPriorToHead { - head_slot: Slot, + AttestingToFinalizedSlot { + finalized_slot: Slot, + request_slot: Slot, + }, + AttestingToAncientSlot { + lowest_permissible_slot: Slot, request_slot: Slot, }, BadPreState { @@ -137,6 +143,7 @@ easy_from_to!(NaiveAggregationError, BeaconChainError); easy_from_to!(ObservedAttestationsError, BeaconChainError); easy_from_to!(ObservedAttestersError, BeaconChainError); easy_from_to!(ObservedBlockProducersError, BeaconChainError); +easy_from_to!(AttesterCacheError, BeaconChainError); easy_from_to!(BlockSignatureVerifierError, BeaconChainError); easy_from_to!(PruningError, BeaconChainError); easy_from_to!(ArithError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index c67e9f98e2f..973564c699d 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -1,5 +1,6 @@ #![recursion_limit = "128"] // For lazy-static pub mod attestation_verification; +mod attester_cache; mod beacon_chain; mod beacon_fork_choice_store; mod beacon_proposer_cache; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index e0d7052f8e4..535c717bc37 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -210,18 +210,22 @@ lazy_static! { /* * Attestation Production */ - pub static ref ATTESTATION_PRODUCTION_REQUESTS: Result = try_create_int_counter( - "beacon_attestation_production_requests_total", - "Count of all attestation production requests" - ); - pub static ref ATTESTATION_PRODUCTION_SUCCESSES: Result = try_create_int_counter( - "beacon_attestation_production_successes_total", - "Count of attestations processed without error" - ); - pub static ref ATTESTATION_PRODUCTION_TIMES: Result = try_create_histogram( + pub static ref ATTESTATION_PRODUCTION_SECONDS: Result = try_create_histogram( "beacon_attestation_production_seconds", "Full runtime of attestation production" ); + pub static ref ATTESTATION_PRODUCTION_HEAD_SCRAPE_SECONDS: Result = try_create_histogram( + "attestation_production_head_scrape_seconds", + "Time taken to read the head state" + ); + pub static ref ATTESTATION_PRODUCTION_CACHE_INTERACTION_SECONDS: Result = try_create_histogram( + "attestation_production_cache_interaction_seconds", + "Time spent interacting with the attester cache" + ); + pub static ref ATTESTATION_PRODUCTION_CACHE_PRIME_SECONDS: Result = try_create_histogram( + "attestation_production_cache_prime_seconds", + "Time spent loading a new state from the disk due to a cache miss" + ); } // Second lazy-static block is used to account for macro recursion limit. diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index eea329a2a3d..56b000385f4 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -304,6 +304,12 @@ fn advance_head( ); } + // Apply the state to the attester cache, if the cache deems it interesting. + beacon_chain + .attester_cache + .maybe_cache_state(&state, head_root, &beacon_chain.spec) + .map_err(BeaconChainError::from)?; + let final_slot = state.slot(); // Insert the advanced state back into the snapshot cache. diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 673bca4a889..89f9fe1d37c 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -22,7 +22,10 @@ use test_random_derive::TestRandom; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; -pub use self::committee_cache::CommitteeCache; +pub use self::committee_cache::{ + compute_committee_index_in_epoch, compute_committee_range_in_epoch, epoch_committee_count, + CommitteeCache, +}; pub use clone_config::CloneConfig; pub use eth_spec::*; pub use iter::BlockRootsIter; @@ -1308,10 +1311,22 @@ impl BeaconState { let epoch = relative_epoch.into_epoch(self.current_epoch()); let i = Self::committee_cache_index(relative_epoch); - *self.committee_cache_at_index_mut(i)? = CommitteeCache::initialized(&self, epoch, spec)?; + *self.committee_cache_at_index_mut(i)? = self.initialize_committee_cache(epoch, spec)?; Ok(()) } + /// Initializes a new committee cache for the given `epoch`, regardless of whether one already + /// exists. Returns the committee cache without attaching it to `self`. + /// + /// To build a cache and store it on `self`, use `Self::build_committee_cache`. + pub fn initialize_committee_cache( + &self, + epoch: Epoch, + spec: &ChainSpec, + ) -> Result { + CommitteeCache::initialized(&self, epoch, spec) + } + /// Advances the cache for this state into the next epoch. /// /// This should be used if the `slot` of this state is advanced beyond an epoch boundary. diff --git a/consensus/types/src/beacon_state/committee_cache.rs b/consensus/types/src/beacon_state/committee_cache.rs index 9c8f428d83e..a4e446aee27 100644 --- a/consensus/types/src/beacon_state/committee_cache.rs +++ b/consensus/types/src/beacon_state/committee_cache.rs @@ -121,8 +121,12 @@ impl CommitteeCache { return None; } - let committee_index = - (slot.as_u64() % self.slots_per_epoch) * self.committees_per_slot + index; + let committee_index = compute_committee_index_in_epoch( + slot, + self.slots_per_epoch as usize, + self.committees_per_slot as usize, + index as usize, + ); let committee = self.compute_committee(committee_index as usize)?; Some(BeaconCommittee { @@ -219,7 +223,10 @@ impl CommitteeCache { /// /// Spec v0.12.1 pub fn epoch_committee_count(&self) -> usize { - self.committees_per_slot as usize * self.slots_per_epoch as usize + epoch_committee_count( + self.committees_per_slot as usize, + self.slots_per_epoch as usize, + ) } /// Returns the number of committees per slot for this cache's epoch. @@ -242,16 +249,7 @@ impl CommitteeCache { /// /// Spec v0.12.1 fn compute_committee_range(&self, index: usize) -> Option> { - let count = self.epoch_committee_count(); - if count == 0 || index >= count { - return None; - } - - let num_validators = self.shuffling.len(); - let start = (num_validators * index) / count; - let end = (num_validators * (index + 1)) / count; - - Some(start..end) + compute_committee_range_in_epoch(self.epoch_committee_count(), index, self.shuffling.len()) } /// Returns the index of some validator in `self.shuffling`. @@ -264,6 +262,44 @@ impl CommitteeCache { } } +/// Computes the position of the given `committee_index` with respect to all committees in the +/// epoch. +/// +/// The return result may be used to provide input to the `compute_committee_range_in_epoch` +/// function. +pub fn compute_committee_index_in_epoch( + slot: Slot, + slots_per_epoch: usize, + committees_per_slot: usize, + committee_index: usize, +) -> usize { + (slot.as_usize() % slots_per_epoch) * committees_per_slot + committee_index +} + +/// Computes the range for slicing the shuffled indices to determine the members of a committee. +/// +/// The `index_in_epoch` parameter can be computed computed using +/// `compute_committee_index_in_epoch`. +pub fn compute_committee_range_in_epoch( + epoch_committee_count: usize, + index_in_epoch: usize, + shuffling_len: usize, +) -> Option> { + if epoch_committee_count == 0 || index_in_epoch >= epoch_committee_count { + return None; + } + + let start = (shuffling_len * index_in_epoch) / epoch_committee_count; + let end = (shuffling_len * (index_in_epoch + 1)) / epoch_committee_count; + + Some(start..end) +} + +/// Returns the total number of committees in an epoch. +pub fn epoch_committee_count(committees_per_slot: usize, slots_per_epoch: usize) -> usize { + committees_per_slot * slots_per_epoch +} + /// Returns a list of all `validators` indices where the validator is active at the given /// `epoch`. ///