Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data column lookup and response handling #5350

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList};
use types::data_column_sidecar::DataColumnSidecarList;
use types::data_column_sidecar::{DataColumnSidecarList, FixedDataColumnSidecarList};
use types::payload::BlockProductionVersion;
use types::*;

Expand Down Expand Up @@ -2953,6 +2953,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown);
}

self.data_availability_checker.notify_gossip_data_column(
data_column.slot(),
block_root,
&data_column,
);
let r = self
.check_gossip_data_column_availability_and_import(data_column)
.await;
Expand Down Expand Up @@ -2995,6 +3000,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_data_columns(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
data_columns: FixedDataColumnSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its data columns again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown);
}

self.data_availability_checker
.notify_rpc_data_columns(slot, block_root, &data_columns);
let r = self
.check_rpc_data_column_availability_and_import(slot, block_root, data_columns)
.await;
self.remove_notified(&block_root, r)
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
Expand Down Expand Up @@ -3286,6 +3317,44 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

/// Checks if the provided data columns can make any cached blocks available, and imports immediately
/// if so, otherwise caches the data column in the data availability checker.
async fn check_rpc_data_column_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
data_columns: FixedDataColumnSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
let mut slashable_cache = self.observed_slashable.write();
for header in data_columns
.into_iter()
.filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone()))
.unique()
{
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, &header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header);
}
}
}
}
let availability = self
.data_availability_checker
.put_rpc_data_columns(block_root, data_columns)?;

self.process_availability(slot, availability).await
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
///
/// An error is returned if the block was unable to be imported. It may be partially imported
Expand Down
128 changes: 126 additions & 2 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use processing_cache::ProcessingComponents;
use slasher::test_utils::E;
use slog::{debug, error, Logger};
use slot_clock::SlotClock;
use ssz_types::FixedVector;
use std::fmt;
use std::fmt::Debug;
use std::num::NonZeroUsize;
Expand All @@ -33,9 +34,13 @@ mod overflow_lru_cache;
mod processing_cache;
mod state_lru_cache;

use crate::data_column_verification::{verify_kzg_for_data_column_list, GossipVerifiedDataColumn};
use crate::data_column_verification::{
verify_kzg_for_data_column_list, GossipVerifiedDataColumn, KzgVerifiedDataColumnList,
};
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::data_column_sidecar::{DataColumnIdentifier, DataColumnSidecarList};
use types::data_column_sidecar::{
DataColumnIdentifier, DataColumnSidecarList, FixedDataColumnSidecarList,
};
use types::non_zero_usize::new_non_zero_usize;

/// The LRU Cache stores `PendingComponents` which can store up to
Expand Down Expand Up @@ -230,6 +235,25 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_kzg_verified_blobs(block_root, verified_blobs)
}

/// Put a list of data columns received via RPC into the availability cache. This performs KZG
/// verification on the data columns in the list.
pub fn put_rpc_data_columns(
&self,
block_root: Hash256,
data_columns: FixedDataColumnSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};

let verified_data_columns =
KzgVerifiedDataColumnList::new(Vec::from(data_columns).into_iter().flatten(), kzg)
.map_err(AvailabilityCheckError::Kzg)?;

self.availability_cache
.put_kzg_verified_data_columns(block_root, verified_data_columns)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
/// have a block cached, return the `Availability` variant triggering block import.
/// Otherwise cache the blob sidecar.
Expand Down Expand Up @@ -422,6 +446,23 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.merge_single_blob(index as usize, commitment);
}

/// Add a single data column commitment to the processing cache. This commitment is unverified but caching
/// them here is useful to avoid duplicate downloads of data columns, as well as understanding
/// our block and data column download requirements.
pub fn notify_gossip_data_column(
&self,
slot: Slot,
block_root: Hash256,
data_column: &GossipVerifiedDataColumn<T>,
) {
let index = data_column.index();
self.processing_cache
.write()
.entry(block_root)
.or_insert_with(|| ProcessingComponents::new(slot))
.merge_single_data_column(index as usize, data_column.clone_data_column());
}

/// Adds blob commitments to the processing cache. These commitments are unverified but caching
/// them here is useful to avoid duplicate downloads of blobs, as well as understanding
/// our block and blob download requirements.
Expand All @@ -444,6 +485,28 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.merge_blobs(commitments);
}

/// Updates the processing cache with data columns that we received from RPC. This is useful to
/// avoid duplicate downloads of data_columns, as well as understanding our block and
/// data_column download requirements.
pub fn notify_rpc_data_columns(
&self,
slot: Slot,
block_root: Hash256,
data_columns: &FixedDataColumnSidecarList<T::EthSpec>,
) {
let mut data_column_opts = FixedVector::default();
for data_column in data_columns.iter().flatten() {
if let Some(data_column_opt) = data_column_opts.get_mut(data_column.index as usize) {
*data_column_opt = Some(data_column.clone());
}
}
self.processing_cache
.write()
.entry(block_root)
.or_insert_with(|| ProcessingComponents::new(slot))
.merge_data_columns(data_column_opts);
}

/// Clears the block and all blobs from the processing cache for a give root if they exist.
pub fn remove_notified(&self, block_root: &Hash256) {
self.processing_cache.write().remove(block_root)
Expand Down Expand Up @@ -735,3 +798,64 @@ impl Into<Vec<BlobIdentifier>> for MissingBlobs {
}
}
}

#[derive(Debug, Clone)]
pub enum MissingDataColumns {
/// We know for certain these data columns are missing.
KnownMissing(Vec<DataColumnIdentifier>),
/// We think these data columns might be missing.
PossibleMissing(Vec<DataColumnIdentifier>),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This state is dangerous because it can trigger us to fetch all samples. If we haven't received the block yet we should not fetch anything and just wait for the block.

/// DataColumns are not required.
DataColumnsNotRequired,
}

impl MissingDataColumns {
pub fn new_without_block(block_root: Hash256, is_deneb: bool) -> Self {
// TODO(das): update to EIP-7594
if is_deneb {
MissingDataColumns::PossibleMissing(DataColumnIdentifier::get_all_data_column_ids::<E>(
block_root,
))
} else {
MissingDataColumns::DataColumnsNotRequired
}
}
pub fn is_empty(&self) -> bool {
match self {
MissingDataColumns::KnownMissing(v) => v.is_empty(),
MissingDataColumns::PossibleMissing(v) => v.is_empty(),
MissingDataColumns::DataColumnsNotRequired => true,
}
}
pub fn contains(&self, data_column_id: &DataColumnIdentifier) -> bool {
match self {
MissingDataColumns::KnownMissing(v) => v.contains(data_column_id),
MissingDataColumns::PossibleMissing(v) => v.contains(data_column_id),
MissingDataColumns::DataColumnsNotRequired => false,
}
}
pub fn remove(&mut self, data_column_id: &DataColumnIdentifier) {
match self {
MissingDataColumns::KnownMissing(v) => v.retain(|id| id != data_column_id),
MissingDataColumns::PossibleMissing(v) => v.retain(|id| id != data_column_id),
MissingDataColumns::DataColumnsNotRequired => {}
}
}
pub fn indices(&self) -> Vec<u64> {
match self {
MissingDataColumns::KnownMissing(v) => v.iter().map(|id| id.index).collect(),
MissingDataColumns::PossibleMissing(v) => v.iter().map(|id| id.index).collect(),
MissingDataColumns::DataColumnsNotRequired => vec![],
}
}
}

impl Into<Vec<DataColumnIdentifier>> for MissingDataColumns {
fn into(self) -> Vec<DataColumnIdentifier> {
match self {
MissingDataColumns::KnownMissing(v) => v,
MissingDataColumns::PossibleMissing(v) => v,
MissingDataColumns::DataColumnsNotRequired => vec![],
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub trait AvailabilityView<E: EthSpec> {
type BlobType: Clone + GetCommitment<E>;

/// The type representing a data column in the implementation.
type DataColumnType: Clone;
type DataColumnType: Clone + GetCommitments<E>;

/// Returns an immutable reference to the cached block.
fn get_cached_block(&self) -> &Option<Self::BlockType>;
Expand Down Expand Up @@ -145,10 +145,7 @@ pub trait AvailabilityView<E: EthSpec> {
let Some(data_column) = data_column else {
continue;
};
// TODO(das): Add equivalent checks for data columns if necessary
if !self.data_column_exists(index) {
self.insert_data_column_at_index(index, data_column)
}
self.merge_single_data_column(index, data_column);
}
}

Expand Down Expand Up @@ -183,6 +180,23 @@ pub trait AvailabilityView<E: EthSpec> {
}
}

/// Merges a single data column into the cache.
///
/// Data columns are only inserted if:
/// 1. The data column entry at the index is empty and no block exists, or
/// 2. The block exists and its commitments matches the data column's commitments.
fn merge_single_data_column(&mut self, index: usize, data_column: Self::DataColumnType) {
let commitments = data_column.get_commitments();
if let Some(cached_block) = self.get_cached_block() {
let block_commitments = cached_block.get_commitments();
if block_commitments == commitments {
self.insert_data_column_at_index(index, data_column)
}
} else if !self.data_column_exists(index) {
self.insert_data_column_at_index(index, data_column)
}
}

/// Inserts a new block and revalidates the existing blobs against it.
///
/// Blobs that don't match the new block's commitments are evicted.
Expand Down Expand Up @@ -260,10 +274,10 @@ impl_availability_view!(
ProcessingComponents,
Arc<SignedBeaconBlock<E>>,
KzgCommitment,
(),
Arc<DataColumnSidecar<E>>,
block,
blob_commitments,
data_column_opts
data_columns
);

impl_availability_view!(
Expand Down Expand Up @@ -330,6 +344,20 @@ impl<E: EthSpec> GetCommitments<E> for Arc<SignedBeaconBlock<E>> {
}
}

// These implementations are required to implement `AvailabilityView` for `ChildComponents`.
impl<E: EthSpec> GetCommitments<E> for Arc<DataColumnSidecar<E>> {
fn get_commitments(&self) -> KzgCommitments<E> {
self.kzg_commitments.clone()
}
}

// These implementations are required to implement `AvailabilityView` for `PendingComponents`.
impl<E: EthSpec> GetCommitments<E> for KzgVerifiedDataColumn<E> {
fn get_commitments(&self) -> KzgCommitments<E> {
self.as_data_column().kzg_commitments.clone()
}
}

impl<E: EthSpec> GetCommitment<E> for Arc<BlobSidecar<E>> {
fn get_commitment(&self) -> &KzgCommitment {
&self.kzg_commitment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use types::beacon_block_body::KzgCommitmentOpts;
use types::{EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};

/// This cache is used only for gossip blocks/blobs and single block/blob lookups, to give req/resp
/// a view of what we have and what we require. This cache serves a slightly different purpose than
Expand Down Expand Up @@ -54,9 +54,8 @@ pub struct ProcessingComponents<E: EthSpec> {
/// `KzgCommitments` for blobs are always known, even if we haven't seen the block. See
/// `AvailabilityView`'s trait definition for more details.
pub blob_commitments: KzgCommitmentOpts<E>,
// TODO(das): `KzgCommitments` are available in every data column sidecar, hence it may not be useful to store them
// again here and a `()` may be sufficient to indicate what we have.
pub data_column_opts: FixedVector<Option<()>, E::DataColumnCount>,
/// TODO(das): figure out if we actually need this
pub data_columns: FixedVector<Option<Arc<DataColumnSidecar<E>>>, E::DataColumnCount>,
}

impl<E: EthSpec> ProcessingComponents<E> {
Expand All @@ -65,7 +64,7 @@ impl<E: EthSpec> ProcessingComponents<E> {
slot,
block: None,
blob_commitments: KzgCommitmentOpts::<E>::default(),
data_column_opts: FixedVector::default(),
data_columns: FixedVector::default(),
}
}
}
Expand All @@ -78,7 +77,7 @@ impl<E: EthSpec> ProcessingComponents<E> {
slot: Slot::new(0),
block: None,
blob_commitments: KzgCommitmentOpts::<E>::default(),
data_column_opts: FixedVector::default(),
data_columns: FixedVector::default(),
}
}
}
Loading
Loading