From 88546cb3a22afb9a034a38608df9bb77fb5474bc Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 5 Mar 2024 15:18:39 +1100 Subject: [PATCH] Add data column lookup and response handling. --- beacon_node/beacon_chain/src/beacon_chain.rs | 71 +++++++++- .../src/data_availability_checker.rs | 128 +++++++++++++++++- .../availability_view.rs | 42 +++++- .../processing_cache.rs | 11 +- .../src/data_column_verification.rs | 34 +++++ beacon_node/beacon_processor/src/lib.rs | 25 +++- beacon_node/beacon_processor/src/metrics.rs | 5 + .../lighthouse_network/src/rpc/methods.rs | 10 ++ .../src/network_beacon_processor/mod.rs | 26 ++++ .../network_beacon_processor/sync_methods.rs | 112 +++++++++++++++ beacon_node/network/src/router.rs | 12 +- .../network/src/sync/block_lookups/common.rs | 119 +++++++++++++++- .../network/src/sync/block_lookups/mod.rs | 42 +++++- .../src/sync/block_lookups/parent_lookup.rs | 2 + .../sync/block_lookups/single_block_lookup.rs | 34 ++++- .../network/src/sync/block_lookups/tests.rs | 24 ++++ beacon_node/network/src/sync/manager.rs | 46 ++++++- .../network/src/sync/network_context.rs | 48 ++++++- consensus/types/src/data_column_sidecar.rs | 17 +++ 19 files changed, 781 insertions(+), 27 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 58960ed807..ac51846b6d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -123,7 +123,7 @@ use tokio_stream::Stream; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList}; -use types::data_column_sidecar::DataColumnSidecarList; +use types::data_column_sidecar::{DataColumnSidecarList, FixedDataColumnSidecarList}; use types::payload::BlockProductionVersion; use types::*; @@ -2953,6 +2953,11 @@ impl BeaconChain { return Err(BlockError::BlockIsAlreadyKnown); } + self.data_availability_checker.notify_gossip_data_column( + data_column.slot(), + block_root, + &data_column, + ); let r = self .check_gossip_data_column_availability_and_import(data_column) .await; @@ -2995,6 +3000,32 @@ impl BeaconChain { self.remove_notified(&block_root, r) } + /// Cache the data columns in the processing cache, process it, then evict it from the cache if it was + /// imported or errors. + pub async fn process_rpc_data_columns( + self: &Arc, + slot: Slot, + block_root: Hash256, + data_columns: FixedDataColumnSidecarList, + ) -> Result> { + // If this block has already been imported to forkchoice it must have been available, so + // we don't need to process its data columns again. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Err(BlockError::BlockIsAlreadyKnown); + } + + self.data_availability_checker + .notify_rpc_data_columns(slot, block_root, &data_columns); + let r = self + .check_rpc_data_column_availability_and_import(slot, block_root, data_columns) + .await; + self.remove_notified(&block_root, r) + } + /// Remove any block components from the *processing cache* if we no longer require them. If the /// block was imported full or erred, we no longer require them. fn remove_notified( @@ -3286,6 +3317,44 @@ impl BeaconChain { self.process_availability(slot, availability).await } + /// Checks if the provided data columns can make any cached blocks available, and imports immediately + /// if so, otherwise caches the data column in the data availability checker. + async fn check_rpc_data_column_availability_and_import( + self: &Arc, + slot: Slot, + block_root: Hash256, + data_columns: FixedDataColumnSidecarList, + ) -> Result> { + // Need to scope this to ensure the lock is dropped before calling `process_availability` + // Even an explicit drop is not enough to convince the borrow checker. + { + let mut slashable_cache = self.observed_slashable.write(); + for header in data_columns + .into_iter() + .filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone())) + .unique() + { + if verify_header_signature::>(self, &header).is_ok() { + slashable_cache + .observe_slashable( + header.message.slot, + header.message.proposer_index, + block_root, + ) + .map_err(|e| BlockError::BeaconChainError(e.into()))?; + if let Some(slasher) = self.slasher.as_ref() { + slasher.accept_block_header(header); + } + } + } + } + let availability = self + .data_availability_checker + .put_rpc_data_columns(block_root, data_columns)?; + + self.process_availability(slot, availability).await + } + /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` /// /// An error is returned if the block was unable to be imported. It may be partially imported diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 9a4f5eea04..c2577f2b38 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -15,6 +15,7 @@ pub use processing_cache::ProcessingComponents; use slasher::test_utils::E; use slog::{debug, error, Logger}; use slot_clock::SlotClock; +use ssz_types::FixedVector; use std::fmt; use std::fmt::Debug; use std::num::NonZeroUsize; @@ -33,9 +34,13 @@ mod overflow_lru_cache; mod processing_cache; mod state_lru_cache; -use crate::data_column_verification::{verify_kzg_for_data_column_list, GossipVerifiedDataColumn}; +use crate::data_column_verification::{ + verify_kzg_for_data_column_list, GossipVerifiedDataColumn, KzgVerifiedDataColumnList, +}; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; -use types::data_column_sidecar::{DataColumnIdentifier, DataColumnSidecarList}; +use types::data_column_sidecar::{ + DataColumnIdentifier, DataColumnSidecarList, FixedDataColumnSidecarList, +}; use types::non_zero_usize::new_non_zero_usize; /// The LRU Cache stores `PendingComponents` which can store up to @@ -230,6 +235,25 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(block_root, verified_blobs) } + /// Put a list of data columns received via RPC into the availability cache. This performs KZG + /// verification on the data columns in the list. + pub fn put_rpc_data_columns( + &self, + block_root: Hash256, + data_columns: FixedDataColumnSidecarList, + ) -> Result, AvailabilityCheckError> { + let Some(kzg) = self.kzg.as_ref() else { + return Err(AvailabilityCheckError::KzgNotInitialized); + }; + + let verified_data_columns = + KzgVerifiedDataColumnList::new(Vec::from(data_columns).into_iter().flatten(), kzg) + .map_err(AvailabilityCheckError::Kzg)?; + + self.availability_cache + .put_kzg_verified_data_columns(block_root, verified_data_columns) + } + /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the blob sidecar. @@ -422,6 +446,23 @@ impl DataAvailabilityChecker { .merge_single_blob(index as usize, commitment); } + /// Add a single data column commitment to the processing cache. This commitment is unverified but caching + /// them here is useful to avoid duplicate downloads of data columns, as well as understanding + /// our block and data column download requirements. + pub fn notify_gossip_data_column( + &self, + slot: Slot, + block_root: Hash256, + data_column: &GossipVerifiedDataColumn, + ) { + let index = data_column.index(); + self.processing_cache + .write() + .entry(block_root) + .or_insert_with(|| ProcessingComponents::new(slot)) + .merge_single_data_column(index as usize, data_column.clone_data_column()); + } + /// Adds blob commitments to the processing cache. These commitments are unverified but caching /// them here is useful to avoid duplicate downloads of blobs, as well as understanding /// our block and blob download requirements. @@ -444,6 +485,28 @@ impl DataAvailabilityChecker { .merge_blobs(commitments); } + /// Updates the processing cache with data columns that we received from RPC. This is useful to + /// avoid duplicate downloads of data_columns, as well as understanding our block and + /// data_column download requirements. + pub fn notify_rpc_data_columns( + &self, + slot: Slot, + block_root: Hash256, + data_columns: &FixedDataColumnSidecarList, + ) { + let mut data_column_opts = FixedVector::default(); + for data_column in data_columns.iter().flatten() { + if let Some(data_column_opt) = data_column_opts.get_mut(data_column.index as usize) { + *data_column_opt = Some(data_column.clone()); + } + } + self.processing_cache + .write() + .entry(block_root) + .or_insert_with(|| ProcessingComponents::new(slot)) + .merge_data_columns(data_column_opts); + } + /// Clears the block and all blobs from the processing cache for a give root if they exist. pub fn remove_notified(&self, block_root: &Hash256) { self.processing_cache.write().remove(block_root) @@ -735,3 +798,64 @@ impl Into> for MissingBlobs { } } } + +#[derive(Debug, Clone)] +pub enum MissingDataColumns { + /// We know for certain these data columns are missing. + KnownMissing(Vec), + /// We think these data columns might be missing. + PossibleMissing(Vec), + /// DataColumns are not required. + DataColumnsNotRequired, +} + +impl MissingDataColumns { + pub fn new_without_block(block_root: Hash256, is_deneb: bool) -> Self { + // TODO(das): update to EIP-7594 + if is_deneb { + MissingDataColumns::PossibleMissing(DataColumnIdentifier::get_all_data_column_ids::( + block_root, + )) + } else { + MissingDataColumns::DataColumnsNotRequired + } + } + pub fn is_empty(&self) -> bool { + match self { + MissingDataColumns::KnownMissing(v) => v.is_empty(), + MissingDataColumns::PossibleMissing(v) => v.is_empty(), + MissingDataColumns::DataColumnsNotRequired => true, + } + } + pub fn contains(&self, data_column_id: &DataColumnIdentifier) -> bool { + match self { + MissingDataColumns::KnownMissing(v) => v.contains(data_column_id), + MissingDataColumns::PossibleMissing(v) => v.contains(data_column_id), + MissingDataColumns::DataColumnsNotRequired => false, + } + } + pub fn remove(&mut self, data_column_id: &DataColumnIdentifier) { + match self { + MissingDataColumns::KnownMissing(v) => v.retain(|id| id != data_column_id), + MissingDataColumns::PossibleMissing(v) => v.retain(|id| id != data_column_id), + MissingDataColumns::DataColumnsNotRequired => {} + } + } + pub fn indices(&self) -> Vec { + match self { + MissingDataColumns::KnownMissing(v) => v.iter().map(|id| id.index).collect(), + MissingDataColumns::PossibleMissing(v) => v.iter().map(|id| id.index).collect(), + MissingDataColumns::DataColumnsNotRequired => vec![], + } + } +} + +impl Into> for MissingDataColumns { + fn into(self) -> Vec { + match self { + MissingDataColumns::KnownMissing(v) => v, + MissingDataColumns::PossibleMissing(v) => v, + MissingDataColumns::DataColumnsNotRequired => vec![], + } + } +} diff --git a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs index f79f28b1ca..54587988ec 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs @@ -28,7 +28,7 @@ pub trait AvailabilityView { type BlobType: Clone + GetCommitment; /// The type representing a data column in the implementation. - type DataColumnType: Clone; + type DataColumnType: Clone + GetCommitments; /// Returns an immutable reference to the cached block. fn get_cached_block(&self) -> &Option; @@ -145,10 +145,7 @@ pub trait AvailabilityView { let Some(data_column) = data_column else { continue; }; - // TODO(das): Add equivalent checks for data columns if necessary - if !self.data_column_exists(index) { - self.insert_data_column_at_index(index, data_column) - } + self.merge_single_data_column(index, data_column); } } @@ -183,6 +180,23 @@ pub trait AvailabilityView { } } + /// Merges a single data column into the cache. + /// + /// Data columns are only inserted if: + /// 1. The data column entry at the index is empty and no block exists, or + /// 2. The block exists and its commitments matches the data column's commitments. + fn merge_single_data_column(&mut self, index: usize, data_column: Self::DataColumnType) { + let commitments = data_column.get_commitments(); + if let Some(cached_block) = self.get_cached_block() { + let block_commitments = cached_block.get_commitments(); + if block_commitments == commitments { + self.insert_data_column_at_index(index, data_column) + } + } else if !self.data_column_exists(index) { + self.insert_data_column_at_index(index, data_column) + } + } + /// Inserts a new block and revalidates the existing blobs against it. /// /// Blobs that don't match the new block's commitments are evicted. @@ -260,10 +274,10 @@ impl_availability_view!( ProcessingComponents, Arc>, KzgCommitment, - (), + Arc>, block, blob_commitments, - data_column_opts + data_columns ); impl_availability_view!( @@ -330,6 +344,20 @@ impl GetCommitments for Arc> { } } +// These implementations are required to implement `AvailabilityView` for `ChildComponents`. +impl GetCommitments for Arc> { + fn get_commitments(&self) -> KzgCommitments { + self.kzg_commitments.clone() + } +} + +// These implementations are required to implement `AvailabilityView` for `PendingComponents`. +impl GetCommitments for KzgVerifiedDataColumn { + fn get_commitments(&self) -> KzgCommitments { + self.as_data_column().kzg_commitments.clone() + } +} + impl GetCommitment for Arc> { fn get_commitment(&self) -> &KzgCommitment { &self.kzg_commitment diff --git a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs index 7abbd70010..f162a6dfc6 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs @@ -4,7 +4,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; use types::beacon_block_body::KzgCommitmentOpts; -use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// This cache is used only for gossip blocks/blobs and single block/blob lookups, to give req/resp /// a view of what we have and what we require. This cache serves a slightly different purpose than @@ -54,9 +54,8 @@ pub struct ProcessingComponents { /// `KzgCommitments` for blobs are always known, even if we haven't seen the block. See /// `AvailabilityView`'s trait definition for more details. pub blob_commitments: KzgCommitmentOpts, - // TODO(das): `KzgCommitments` are available in every data column sidecar, hence it may not be useful to store them - // again here and a `()` may be sufficient to indicate what we have. - pub data_column_opts: FixedVector, E::DataColumnCount>, + /// TODO(das): figure out if we actually need this + pub data_columns: FixedVector>>, E::DataColumnCount>, } impl ProcessingComponents { @@ -65,7 +64,7 @@ impl ProcessingComponents { slot, block: None, blob_commitments: KzgCommitmentOpts::::default(), - data_column_opts: FixedVector::default(), + data_columns: FixedVector::default(), } } } @@ -78,7 +77,7 @@ impl ProcessingComponents { slot: Slot::new(0), block: None, blob_commitments: KzgCommitmentOpts::::default(), - data_column_opts: FixedVector::default(), + data_columns: FixedVector::default(), } } } diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 2ea8fe1ae4..c4653866f5 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -113,6 +113,11 @@ impl GossipVerifiedDataColumn { self.data_column.as_data_column() } + /// This is cheap as we're calling clone on an Arc + pub fn clone_data_column(&self) -> Arc> { + self.data_column.clone_data_column() + } + pub fn block_root(&self) -> Hash256 { self.block_root } @@ -179,6 +184,35 @@ pub fn verify_kzg_for_data_column( Ok(KzgVerifiedDataColumn { data_column }) } +pub struct KzgVerifiedDataColumnList { + verified_data_columns: Vec>, +} + +impl KzgVerifiedDataColumnList { + pub fn new>>>( + data_column_list: I, + kzg: &Kzg, + ) -> Result { + let data_columns = data_column_list.into_iter().collect::>(); + verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; + Ok(Self { + verified_data_columns: data_columns + .into_iter() + .map(|data_column| KzgVerifiedDataColumn { data_column }) + .collect(), + }) + } +} + +impl IntoIterator for KzgVerifiedDataColumnList { + type Item = KzgVerifiedDataColumn; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.verified_data_columns.into_iter() + } +} + /// Complete kzg verification for a list of `DataColumnSidecar`s. /// Returns an error if any of the `DataColumnSidecar`s fails kzg verification. /// diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 5ac1aaac4c..49d3335baf 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -157,6 +157,10 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; /// will be stored before we start dropping them. const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `DataColumnSidecar` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_RPC_DATA_COLUMN_QUEUE_LEN: usize = 2_048; + /// The maximum number of queued `Vec` objects received during syncing that will /// be stored before we start dropping them. const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; @@ -244,6 +248,7 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic pub const RPC_BLOCK: &str = "rpc_block"; pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block"; pub const RPC_BLOBS: &str = "rpc_blob"; +pub const RPC_DATA_COLUMNS: &str = "rpc_data_column"; pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill"; pub const STATUS_PROCESSING: &str = "status_processing"; @@ -619,6 +624,9 @@ pub enum Work { RpcBlobs { process_fn: AsyncFn, }, + RpcDataColumns { + process_fn: AsyncFn, + }, IgnoredRpcBlock { process_fn: BlockingFn, }, @@ -663,6 +671,7 @@ impl Work { Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, Work::RpcBlobs { .. } => RPC_BLOBS, + Work::RpcDataColumns { .. } => RPC_DATA_COLUMNS, Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK, Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL, @@ -819,6 +828,7 @@ impl BeaconProcessor { // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN); + let mut rpc_data_column_queue = FifoQueue::new(MAX_RPC_DATA_COLUMN_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); @@ -970,6 +980,8 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = rpc_blob_queue.pop() { self.spawn_worker(item, idle_tx); + } else if let Some(item) = rpc_data_column_queue.pop() { + self.spawn_worker(item, idle_tx); // Check delayed blocks before gossip blocks, the gossip blocks might rely // on the delayed ones. } else if let Some(item) = delayed_block_queue.pop() { @@ -1255,6 +1267,9 @@ impl BeaconProcessor { rpc_block_queue.push(work, work_id, &self.log) } Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log), + Work::RpcDataColumns { .. } => { + rpc_data_column_queue.push(work, work_id, &self.log) + } Work::ChainSegment { .. } => { chain_segment_queue.push(work, work_id, &self.log) } @@ -1342,6 +1357,10 @@ impl BeaconProcessor { &metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL, rpc_blob_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_RPC_DATA_COLUMN_QUEUE_TOTAL, + rpc_data_column_queue.len() as i64, + ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL, chain_segment_queue.len() as i64, @@ -1481,9 +1500,9 @@ impl BeaconProcessor { beacon_block_root: _, process_fn, } => task_spawner.spawn_async(process_fn), - Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => { - task_spawner.spawn_async(process_fn) - } + Work::RpcBlock { process_fn } + | Work::RpcBlobs { process_fn } + | Work::RpcDataColumns { process_fn } => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::GossipBlock(work) | Work::GossipBlobSidecar(work) diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index bcd422b357..840af752d8 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -86,6 +86,11 @@ lazy_static::lazy_static! { "beacon_processor_rpc_blob_queue_total", "Count of blobs from the rpc waiting to be verified." ); + // Rpc data columns. + pub static ref BEACON_PROCESSOR_RPC_DATA_COLUMN_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_rpc_data_column_queue_total", + "Count of data columns from the rpc waiting to be verified." + ); // Chain segments. pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: Result = try_create_int_gauge( "beacon_processor_chain_segment_queue_total", diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index efc80d55fa..a4554b1ba4 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -374,6 +374,16 @@ pub struct DataColumnsByRootRequest { pub data_column_ids: RuntimeVariableList, } +impl DataColumnsByRootRequest { + pub fn new(data_column_ids: Vec, spec: &ChainSpec) -> Self { + let data_column_ids = RuntimeVariableList::from_vec( + data_column_ids, + spec.max_request_data_column_sidecars as usize, + ); + Self { data_column_ids } + } +} + /* RPC Handling and Grouping */ // Collection of enums and structs used by the Codecs to encode/decode RPC messages diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7c444b8b52..2ee0e15899 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -33,6 +33,7 @@ use types::*; pub use sync_methods::ChainSegmentProcessId; use types::blob_sidecar::FixedBlobSidecarList; +use types::data_column_sidecar::FixedDataColumnSidecarList; pub type Error = TrySendError>; @@ -482,6 +483,31 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for some data columns, where the result from computation (if any) is + /// sent to the other side of `result_tx`. + pub fn send_rpc_data_columns( + self: &Arc, + block_root: Hash256, + data_columns: FixedDataColumnSidecarList, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), Error> { + let data_column_count = data_columns.iter().filter(|b| b.is_some()).count(); + if data_column_count == 0 { + return Ok(()); + } + let process_fn = self.clone().generate_rpc_data_columns_process_fn( + block_root, + data_columns, + seen_timestamp, + process_type, + ); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcDataColumns { process_fn }, + }) + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 7acb99a616..f210b5ae1e 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -24,6 +24,7 @@ use store::KzgCommitment; use tokio::sync::mpsc; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; +use types::data_column_sidecar::FixedDataColumnSidecarList; use types::{Epoch, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. @@ -211,6 +212,25 @@ impl NetworkBeaconProcessor { Box::pin(process_fn) } + /// Returns an async closure which processes a list of data columns received via RPC. + /// + /// This separate function was required to prevent a cycle during compiler + /// type checking. + pub fn generate_rpc_data_columns_process_fn( + self: Arc, + block_root: Hash256, + data_columns: FixedDataColumnSidecarList, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> AsyncFn { + let process_fn = async move { + self.clone() + .process_rpc_data_columns(block_root, data_columns, seen_timestamp, process_type) + .await; + }; + Box::pin(process_fn) + } + /// Attempt to process a list of blobs received from a direct RPC request. pub async fn process_rpc_blobs( self: Arc>, @@ -302,6 +322,98 @@ impl NetworkBeaconProcessor { }); } + /// Attempt to process a list of data columns received from a direct RPC request. + pub async fn process_rpc_data_columns( + self: Arc>, + block_root: Hash256, + data_columns: FixedDataColumnSidecarList, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) { + let Some(slot) = data_columns + .iter() + .find_map(|data_column| data_column.as_ref().map(|data_column| data_column.slot())) + else { + return; + }; + + let indices: Vec = data_columns + .iter() + .filter_map(|data_column_opt| { + data_column_opt + .as_ref() + .map(|data_column| data_column.index) + }) + .collect(); + + debug!( + self.log, + "RPC data_columns received"; + "indices" => ?indices, + "block_root" => %block_root, + "slot" => %slot, + ); + + if let Ok(current_slot) = self.chain.slot() { + if current_slot == slot { + // Note: this metric is useful to gauge how long it takes to receive data_columns requested + // over rpc. Since we always send the request for block components at `slot_clock.single_lookup_delay()` + // we can use that as a baseline to measure against. + let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock); + + metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay); + } + } + + let result = self + .chain + .process_rpc_data_columns(slot, block_root, data_columns) + .await; + + match &result { + Ok(AvailabilityProcessingStatus::Imported(hash)) => { + debug!( + self.log, + "Block components retrieved"; + "result" => "imported block and data_columns", + "slot" => %slot, + "block_hash" => %hash, + ); + } + Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + debug!( + self.log, + "Missing components over rpc"; + "block_hash" => %block_root, + "slot" => %slot, + ); + } + Err(BlockError::BlockIsAlreadyKnown) => { + debug!( + self.log, + "DataColumns have already been imported"; + "block_hash" => %block_root, + "slot" => %slot, + ); + } + Err(e) => { + warn!( + self.log, + "Error when importing rpc data_columns"; + "error" => ?e, + "block_hash" => %block_root, + "slot" => %slot, + ); + } + } + + // Sync handles these results + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: result.into(), + }); + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment( diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 23b14ac143..e8da57e26e 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -505,8 +505,10 @@ impl Router { RequestId::Sync(sync_id) => match sync_id { SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } + | SyncId::SingleDataColumn { .. } | SyncId::ParentLookup { .. } - | SyncId::ParentLookupBlob { .. } => { + | SyncId::ParentLookupBlob { .. } + | SyncId::ParentLookupDataColumn { .. } => { crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id); return; } @@ -583,6 +585,10 @@ impl Router { crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id); return; } + SyncId::SingleDataColumn { .. } | SyncId::ParentLookupDataColumn { .. } => { + crit!(self.log, "Data column response to block by roots request"; "peer_id" => %peer_id); + return; + } }, RequestId::Router => { crit!(self.log, "All BBRoot requests belong to sync"; "peer_id" => %peer_id); @@ -617,6 +623,10 @@ impl Router { crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id); return; } + SyncId::SingleDataColumn { .. } | SyncId::ParentLookupDataColumn { .. } => { + crit!(self.log, "Data column response to blobs by roots request"; "peer_id" => %peer_id); + return; + } SyncId::BackFillBlocks { .. } | SyncId::RangeBlocks { .. } | SyncId::RangeBlockAndBlobs { .. } diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index d989fbb336..64c9a2dcb4 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -3,26 +3,29 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State, }; use crate::sync::block_lookups::{ - BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, + BlobRequestState, BlockLookups, BlockRequestState, DataColumnRequestState, PeerId, + SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, }; use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId}; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents}; use beacon_chain::{get_block_root, BeaconChainTypes}; -use lighthouse_network::rpc::methods::BlobsByRootRequest; +use lighthouse_network::rpc::methods::{BlobsByRootRequest, DataColumnsByRootRequest}; use lighthouse_network::rpc::BlocksByRootRequest; use rand::prelude::IteratorRandom; use std::ops::IndexMut; use std::sync::Arc; use std::time::Duration; use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; -use types::{BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock}; +use types::data_column_sidecar::{DataColumnIdentifier, FixedDataColumnSidecarList}; +use types::{BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock}; #[derive(Debug, Copy, Clone)] pub enum ResponseType { Block, Blob, + DataColumn, } #[derive(Debug, Copy, Clone)] @@ -453,3 +456,113 @@ impl RequestState for BlobRequestState RequestState for DataColumnRequestState { + type RequestType = DataColumnsByRootRequest; + type ResponseType = Arc>; + type VerifiedResponseType = FixedDataColumnSidecarList; + type ReconstructedResponseType = FixedDataColumnSidecarList; + + fn new_request(&self, spec: &ChainSpec) -> DataColumnsByRootRequest { + let data_columnid_vec: Vec = self.requested_ids.clone().into(); + DataColumnsByRootRequest::new(data_columnid_vec, spec) + } + + fn make_request( + id: SingleLookupReqId, + peer_id: PeerId, + request: Self::RequestType, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + cx.data_column_lookup_request(id, peer_id, request, L::lookup_type()) + .map_err(LookupRequestError::SendFailed) + } + + fn verify_response_inner( + &mut self, + _expected_block_root: Hash256, + data_column: Option, + peer_id: PeerId, + ) -> Result>, LookupVerifyError> { + match data_column { + Some(data_column) => { + let received_id = data_column.id(); + if !self.requested_ids.contains(&received_id) { + self.state.register_failure_downloading(); + Err(LookupVerifyError::UnrequestedDataColumnId) + } else { + // State should remain downloading until we receive the stream terminator. + self.requested_ids.remove(&received_id); + let data_column_index = data_column.index; + + if data_column_index >= T::EthSpec::number_of_columns() as u64 { + return Err(LookupVerifyError::InvalidIndex(data_column.index)); + } + *self + .data_column_download_queue + .index_mut(data_column_index as usize) = Some(data_column); + Ok(None) + } + } + None => { + self.state.state = State::Processing { peer_id }; + let data_columns = std::mem::take(&mut self.data_column_download_queue); + Ok(Some(data_columns)) + } + } + } + + fn get_parent_root( + verified_response: &FixedDataColumnSidecarList, + ) -> Option { + verified_response + .into_iter() + .filter_map(|data_column| data_column.as_ref()) + .map(|data_column| data_column.block_parent_root()) + .next() + } + + fn add_to_child_components( + verified_response: FixedDataColumnSidecarList, + components: &mut ChildComponents, + ) { + components.merge_data_columns(verified_response); + } + + fn verified_to_reconstructed( + _block_root: Hash256, + data_columns: FixedDataColumnSidecarList, + ) -> FixedDataColumnSidecarList { + data_columns + } + + fn send_reconstructed_for_processing( + id: Id, + bl: &BlockLookups, + block_root: Hash256, + verified: FixedDataColumnSidecarList, + duration: Duration, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + bl.send_data_columns_for_processing( + block_root, + verified, + duration, + BlockProcessType::SingleDataColumn { id }, + cx, + ) + } + + fn response_type() -> ResponseType { + ResponseType::DataColumn + } + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { + &mut request.data_column_request_state + } + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 62cdc4fa22..4c0a2327df 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -24,7 +24,7 @@ use fnv::FnvHashMap; use lighthouse_network::rpc::RPCError; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState}; +pub use single_block_lookup::{BlobRequestState, BlockRequestState, DataColumnRequestState}; use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use std::collections::{HashMap, VecDeque}; @@ -32,6 +32,7 @@ use std::sync::Arc; use std::time::Duration; use store::Hash256; use types::blob_sidecar::FixedBlobSidecarList; +use types::data_column_sidecar::FixedDataColumnSidecarList; use types::Slot; pub mod common; @@ -541,6 +542,7 @@ impl BlockLookups { | ParentVerifyError::NotEnoughBlobsReturned | ParentVerifyError::ExtraBlocksReturned | ParentVerifyError::UnrequestedBlobId + | ParentVerifyError::UnrequestedDataColumnId | ParentVerifyError::ExtraBlobsReturned | ParentVerifyError::InvalidIndex(_) => { let e = e.into(); @@ -1286,6 +1288,44 @@ impl BlockLookups { } } + fn send_data_columns_for_processing( + &self, + block_root: Hash256, + data_columns: FixedDataColumnSidecarList, + duration: Duration, + process_type: BlockProcessType, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + match cx.beacon_processor_if_enabled() { + Some(beacon_processor) => { + trace!(self.log, "Sending data columns for processing"; "block" => ?block_root, "process_type" => ?process_type); + if let Err(e) = beacon_processor.send_rpc_data_columns( + block_root, + data_columns, + duration, + process_type, + ) { + error!( + self.log, + "Failed to send sync data columns to processor"; + "error" => ?e + ); + Err(LookupRequestError::SendFailed( + "beacon processor send failure", + )) + } else { + Ok(()) + } + } + None => { + trace!(self.log, "Dropping data columns ready for processing. Beacon processor not available"; "block_root" => %block_root); + Err(LookupRequestError::SendFailed( + "beacon processor unavailable", + )) + } + } + } + /// Attempts to request the next unknown parent. This method handles peer scoring and dropping /// the lookup in the event of failure. fn request_parent(&mut self, mut parent_lookup: ParentLookup, cx: &SyncNetworkContext) { diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 5c2e90b48c..70d9b1f887 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -37,6 +37,7 @@ pub enum ParentVerifyError { NotEnoughBlobsReturned, ExtraBlocksReturned, UnrequestedBlobId, + UnrequestedDataColumnId, ExtraBlobsReturned, InvalidIndex(u64), PreviousFailure { parent_root: Hash256 }, @@ -243,6 +244,7 @@ impl From for ParentVerifyError { E::NoBlockReturned => ParentVerifyError::NoBlockReturned, E::ExtraBlocksReturned => ParentVerifyError::ExtraBlocksReturned, E::UnrequestedBlobId => ParentVerifyError::UnrequestedBlobId, + E::UnrequestedDataColumnId => ParentVerifyError::UnrequestedDataColumnId, E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned, E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index), E::NotEnoughBlobsReturned => ParentVerifyError::NotEnoughBlobsReturned, diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 989bfab00f..b0d2024948 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -4,7 +4,7 @@ use crate::sync::block_lookups::Id; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::{ - AvailabilityCheckError, DataAvailabilityChecker, MissingBlobs, + AvailabilityCheckError, DataAvailabilityChecker, MissingBlobs, MissingDataColumns, }; use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents}; use beacon_chain::BeaconChainTypes; @@ -17,6 +17,7 @@ use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; +use types::data_column_sidecar::FixedDataColumnSidecarList; use types::EthSpec; #[derive(Debug, PartialEq, Eq)] @@ -33,6 +34,7 @@ pub enum LookupVerifyError { ExtraBlocksReturned, UnrequestedBlobId, ExtraBlobsReturned, + UnrequestedDataColumnId, NotEnoughBlobsReturned, InvalidIndex(u64), } @@ -52,6 +54,7 @@ pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, pub blob_request_state: BlobRequestState, + pub data_column_request_state: DataColumnRequestState, pub da_checker: Arc>, /// Only necessary for requests triggered by an `UnknownBlockParent` or `UnknownBlockParent` /// because any blocks or blobs without parents won't hit the data availability cache. @@ -71,6 +74,11 @@ impl SingleBlockLookup { id, block_request_state: BlockRequestState::new(requested_block_root, peers), blob_request_state: BlobRequestState::new(requested_block_root, peers, is_deneb), + data_column_request_state: DataColumnRequestState::new( + requested_block_root, + peers, + is_deneb, + ), da_checker, child_components, } @@ -342,6 +350,30 @@ impl BlobRequestState { } } +/// The state of the data column request component of a `SingleBlockLookup`. +pub struct DataColumnRequestState { + /// The latest picture of which data columns still need to be requested. This includes information + /// from both block/data columns downloaded in the network layer and any blocks/data columns that exist in + /// the data availability checker. + pub requested_ids: MissingDataColumns, + /// Where we store data columns until we receive the stream terminator. + pub data_column_download_queue: FixedDataColumnSidecarList, + pub state: SingleLookupRequestState, + _phantom: PhantomData, +} + +impl DataColumnRequestState { + pub fn new(block_root: Hash256, peer_source: &[PeerId], is_deneb: bool) -> Self { + let default_ids = MissingDataColumns::new_without_block(block_root, is_deneb); + Self { + requested_ids: default_ids, + data_column_download_queue: <_>::default(), + state: SingleLookupRequestState::new(peer_source), + _phantom: PhantomData, + } + } +} + /// The state of the block request component of a `SingleBlockLookup`. pub struct BlockRequestState { pub requested_block_root: Hash256, diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index f81f16dfb5..e35da622d0 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -118,6 +118,16 @@ impl TestRig { panic!("Expected blob request, found {:?}", other); } }, + ResponseType::DataColumn => match self.network_rx.try_recv() { + Ok(NetworkMessage::SendRequest { + peer_id: _, + request: Request::DataColumnsByRoot(_request), + request_id: RequestId::Sync(SyncId::SingleDataColumn { id }), + }) => id, + other => { + panic!("Expected data column request, found {:?}", other); + } + }, } } @@ -140,6 +150,14 @@ impl TestRig { }) => id, other => panic!("Expected parent blobs request, found {:?}", other), }, + ResponseType::DataColumn => match self.network_rx.try_recv() { + Ok(NetworkMessage::SendRequest { + peer_id: _, + request: Request::DataColumnsByRoot(_request), + request_id: RequestId::Sync(SyncId::ParentLookupDataColumn { id }), + }) => id, + other => panic!("Expected parent data columns request, found {:?}", other), + }, } } @@ -158,6 +176,12 @@ impl TestRig { } other => panic!("Expected blob process, found {:?}", other), }, + ResponseType::DataColumn => match self.beacon_processor_rx.try_recv() { + Ok(work) => { + assert_eq!(work.work_type(), beacon_processor::RPC_DATA_COLUMNS); + } + other => panic!("Expected data column process, found {:?}", other), + }, } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7fff76dd9e..3c0daf01ab 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -42,7 +42,7 @@ use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProces use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::common::{Current, Parent}; -use crate::sync::block_lookups::{BlobRequestState, BlockRequestState}; +use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, DataColumnRequestState}; use crate::sync::network_context::BlocksAndBlobsByRangeRequest; use crate::sync::range_sync::ByRangeRequestType; use beacon_chain::block_verification_types::AsBlock; @@ -90,12 +90,17 @@ pub enum RequestId { SingleBlock { id: SingleLookupReqId }, /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, + /// Request searching for a set of data columns given a hash. + SingleDataColumn { id: SingleLookupReqId }, /// Request searching for a block's parent. The id is the chain, share with the corresponding /// blob id. ParentLookup { id: SingleLookupReqId }, /// Request searching for a block's parent blobs. The id is the chain, shared with the corresponding /// block id. ParentLookupBlob { id: SingleLookupReqId }, + /// Request searching for a block's parent data columns. The id is the chain, shared with the corresponding + /// block id. + ParentLookupDataColumn { id: SingleLookupReqId }, /// Request was from the backfill sync algorithm. BackFillBlocks { id: Id }, /// Backfill request that is composed by both a block range request and a blob range request. @@ -166,6 +171,7 @@ pub enum SyncMessage { pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, + SingleDataColumn { id: Id }, ParentLookup { chain_hash: Hash256 }, } @@ -319,6 +325,15 @@ impl SyncManager { error, ); } + RequestId::SingleDataColumn { id } => { + self.block_lookups + .single_block_lookup_failed::>( + id, + &peer_id, + &self.network, + error, + ); + } RequestId::ParentLookup { id } => { self.block_lookups .parent_lookup_failed::>( @@ -337,6 +352,15 @@ impl SyncManager { error, ); } + RequestId::ParentLookupDataColumn { id } => { + self.block_lookups + .parent_lookup_failed::>( + id, + peer_id, + &self.network, + error, + ); + } RequestId::BackFillBlocks { id } => { if let Some(batch_id) = self .network @@ -680,6 +704,14 @@ impl SyncManager { result, &mut self.network, ), + BlockProcessType::SingleDataColumn { id } => self + .block_lookups + .single_block_component_processed::>( + id, result, &mut self.network + ), BlockProcessType::ParentLookup { chain_hash } => self .block_lookups .parent_block_processed(chain_hash, result, &mut self.network), @@ -844,6 +876,9 @@ impl SyncManager { RequestId::SingleBlob { .. } => { crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id ); } + RequestId::SingleDataColumn { .. } => { + crit!(self.log, "Block received during data column request"; "peer_id" => %peer_id ); + } RequestId::ParentLookup { id } => self .block_lookups .parent_lookup_response::>( @@ -856,6 +891,9 @@ impl SyncManager { RequestId::ParentLookupBlob { id: _ } => { crit!(self.log, "Block received during parent blob request"; "peer_id" => %peer_id ); } + RequestId::ParentLookupDataColumn { id: _ } => { + crit!(self.log, "Block received during parent data column request"; "peer_id" => %peer_id ); + } RequestId::BackFillBlocks { id } => { let is_stream_terminator = block.is_none(); if let Some(batch_id) = self @@ -916,6 +954,9 @@ impl SyncManager { RequestId::SingleBlock { .. } => { crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); } + RequestId::SingleDataColumn { .. } => { + crit!(self.log, "Single blob received during data column request"; "peer_id" => %peer_id ); + } RequestId::SingleBlob { id } => { if let Some(blob) = blob.as_ref() { debug!(self.log, @@ -937,6 +978,9 @@ impl SyncManager { RequestId::ParentLookup { id: _ } => { crit!(self.log, "Single blob received during parent block request"; "peer_id" => %peer_id ); } + RequestId::ParentLookupDataColumn { id: _ } => { + crit!(self.log, "Single blob received during parent data column request"; "peer_id" => %peer_id ); + } RequestId::ParentLookupBlob { id } => { if let Some(blob) = blob.as_ref() { debug!(self.log, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 04feb8fdc2..4160e0944f 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -12,7 +12,9 @@ use crate::sync::manager::SingleLookupReqId; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRootRequest, +}; use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; @@ -506,6 +508,50 @@ impl SyncNetworkContext { Ok(()) } + pub fn data_column_lookup_request( + &self, + id: SingleLookupReqId, + data_column_peer_id: PeerId, + data_column_request: DataColumnsByRootRequest, + lookup_type: LookupType, + ) -> Result<(), &'static str> { + let sync_id = match lookup_type { + LookupType::Current => SyncRequestId::SingleDataColumn { id }, + LookupType::Parent => SyncRequestId::ParentLookupDataColumn { id }, + }; + let request_id = RequestId::Sync(sync_id); + + if let Some(block_root) = data_column_request + .data_column_ids + .as_slice() + .first() + .map(|id| id.block_root) + { + let indices = data_column_request + .data_column_ids + .as_slice() + .iter() + .map(|id| id.index) + .collect::>(); + debug!( + self.log, + "Sending DataColumnsByRoot Request"; + "method" => "DataColumnsByRoot", + "block_root" => ?block_root, + "data_column_indices" => ?indices, + "peer" => %data_column_peer_id, + "lookup_type" => ?lookup_type + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id: data_column_peer_id, + request: Request::DataColumnsByRoot(data_column_request), + request_id, + })?; + } + Ok(()) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index a6fc4c5674..a37432ab97 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -33,6 +33,19 @@ pub struct DataColumnIdentifier { pub index: ColumnIndex, } +impl DataColumnIdentifier { + pub fn get_all_data_column_ids(block_root: Hash256) -> Vec { + let mut data_column_ids = Vec::with_capacity(E::number_of_columns()); + for i in 0..E::number_of_columns() { + data_column_ids.push(DataColumnIdentifier { + block_root, + index: i as u64, + }); + } + data_column_ids + } +} + #[derive( Debug, Clone, @@ -152,6 +165,10 @@ impl DataColumnSidecar { self.signed_block_header.message.tree_hash_root() } + pub fn block_parent_root(&self) -> Hash256 { + self.signed_block_header.message.parent_root + } + pub fn min_size() -> usize { // min size is one cell Self {