From 34f796d3ec69cd55a9312357dbe8c4f7c13e1c65 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:46:22 +0200 Subject: [PATCH 1/3] Implement PeerDAS RPC handlers --- beacon_node/beacon_chain/src/beacon_chain.rs | 31 +++ .../src/data_availability_checker.rs | 12 +- .../overflow_lru_cache.rs | 21 +- .../src/data_column_verification.rs | 7 + .../lighthouse_network/src/rpc/methods.rs | 12 + .../network_beacon_processor/rpc_methods.rs | 234 +++++++++++++++++- 6 files changed, 306 insertions(+), 11 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3bf75284779..4bb747025ff 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1155,6 +1155,25 @@ impl BeaconChain { .map_or_else(|| self.get_blobs(block_root), Ok) } + pub fn get_data_column_checking_all_caches( + &self, + block_root: Hash256, + index: ColumnIndex, + ) -> Result>>, Error> { + if let Some(column) = self + .data_availability_checker + .get_data_column(&DataColumnIdentifier { block_root, index })? + { + return Ok(Some(column)); + } + + if let Some(columns) = self.early_attester_cache.get_data_columns(block_root) { + return Ok(columns.iter().find(|c| c.index == index).cloned()); + } + + self.get_data_column(&block_root, &index) + } + /// Returns the block at the given root, if any. /// /// ## Errors @@ -1230,6 +1249,18 @@ impl BeaconChain { } } + /// Returns the data columns at the given root, if any. + /// + /// ## Errors + /// May return a database error. + pub fn get_data_column( + &self, + block_root: &Hash256, + column_index: &ColumnIndex, + ) -> Result>>, Error> { + Ok(self.store.get_data_column(block_root, column_index)?) + } + pub fn get_blinded_block( &self, block_root: &Hash256, diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index b4336a054e2..4577a1d974f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -15,8 +15,8 @@ use std::time::Duration; use task_executor::TaskExecutor; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ - BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, - Slot, + BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList, + Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, }; mod error; @@ -156,6 +156,14 @@ impl DataAvailabilityChecker { self.availability_cache.peek_blob(blob_id) } + /// Get a data column from the availability cache. + pub fn get_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + self.availability_cache.peek_data_column(data_column_id) + } + /// Put a list of blobs received via RPC into the availability cache. This performs KZG /// verification on the blobs in the list. pub fn put_rpc_blobs( diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 6c9964bdf86..cd39655d5bf 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -14,7 +14,10 @@ use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; +use types::{ + BlobSidecar, ChainSpec, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec, Hash256, + SignedBeaconBlock, +}; /// This represents the components of a partially available block /// @@ -369,6 +372,22 @@ impl DataAvailabilityCheckerInner { } } + /// Fetch a data column from the cache without affecting the LRU ordering + pub fn peek_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + if let Some(pending_components) = self.critical.read().peek(&data_column_id.block_root) { + Ok(pending_components + .verified_data_columns + .iter() + .find(|data_column| data_column.as_data_column().index == data_column_id.index) + .map(|data_column| data_column.clone_arc())) + } else { + Ok(None) + } + } + pub fn peek_pending_components>) -> R>( &self, block_root: &Hash256, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index da639e3695e..8ae7ca97d45 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -250,6 +250,13 @@ impl KzgVerifiedCustodyDataColumn { pub fn into_inner(self) -> Arc> { self.data } + + pub fn as_data_column(&self) -> &DataColumnSidecar { + &self.data + } + pub fn clone_arc(&self) -> Arc> { + self.data.clone() + } } /// Complete kzg verification for a `DataColumnSidecar`. diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 8849a5433d4..7c7dca02f50 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -6,6 +6,7 @@ use serde::Serialize; use ssz::Encode; use ssz_derive::{Decode, Encode}; use ssz_types::{typenum::U256, VariableList}; +use std::collections::BTreeMap; use std::fmt::Display; use std::marker::PhantomData; use std::ops::Deref; @@ -426,6 +427,17 @@ impl DataColumnsByRootRequest { pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self { Self::new(vec![DataColumnIdentifier { block_root, index }], spec) } + + pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec)> { + let mut column_indexes_by_block = BTreeMap::>::new(); + for request_id in self.data_column_ids.as_slice() { + column_indexes_by_block + .entry(request_id.block_root) + .or_default() + .push(request_id.index); + } + column_indexes_by_block.into_iter().collect() + } } /* RPC Handling and Grouping */ diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 3f8cf14dcbe..f75221b1869 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -320,14 +320,53 @@ impl NetworkBeaconProcessor { pub fn handle_data_columns_by_root_request( self: Arc, peer_id: PeerId, - _request_id: PeerRequestId, + request_id: PeerRequestId, request: DataColumnsByRootRequest, ) { - // TODO(das): implement handler - debug!(self.log, "Received DataColumnsByRoot Request"; - "peer_id" => %peer_id, - "count" => request.data_column_ids.len() + let mut send_data_column_count = 0; + + for data_column_id in request.data_column_ids.as_slice() { + match self.chain.get_data_column_checking_all_caches( + data_column_id.block_root, + data_column_id.index, + ) { + Ok(Some(data_column)) => { + send_data_column_count += 1; + self.send_response( + peer_id, + Response::DataColumnsByRoot(Some(data_column)), + request_id, + ); + } + Ok(None) => {} // no-op + Err(e) => { + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + // TODO(das): leak error details to ease debugging + format!("{:?}", e).to_string(), + request_id, + ); + error!(self.log, "Error getting data column"; + "block_root" => ?data_column_id.block_root, + "peer" => %peer_id, + "error" => ?e + ); + return; + } + } + } + + debug!( + self.log, + "Received DataColumnsByRoot Request"; + "peer" => %peer_id, + "request" => ?request.group_by_ordered_block_root(), + "returned" => send_data_column_count ); + + // send stream termination + self.send_response(peer_id, Response::DataColumnsByRoot(None), request_id); } /// Handle a `LightClientBootstrap` request from the peer. @@ -833,17 +872,196 @@ impl NetworkBeaconProcessor { /// Handle a `DataColumnsByRange` request from the peer. pub fn handle_data_columns_by_range_request( - self: Arc, + &self, peer_id: PeerId, - _request_id: PeerRequestId, + request_id: PeerRequestId, req: DataColumnsByRangeRequest, ) { - // TODO(das): implement handler + self.terminate_response_stream( + peer_id, + request_id, + self.handle_data_columns_by_range_request_inner(peer_id, request_id, req), + Response::DataColumnsByRange, + ); + } + + /// Handle a `DataColumnsByRange` request from the peer. + pub fn handle_data_columns_by_range_request_inner( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + req: DataColumnsByRangeRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { debug!(self.log, "Received DataColumnsByRange Request"; "peer_id" => %peer_id, "count" => req.count, "start_slot" => req.start_slot, ); + + // Should not send more than max request data columns + if req.max_requested::() > self.chain.spec.max_request_data_column_sidecars { + return Err(( + RPCResponseErrorCode::InvalidRequest, + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", + )); + } + + let request_start_slot = Slot::from(req.start_slot); + + let data_availability_boundary_slot = match self.chain.data_availability_boundary() { + Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), + None => { + debug!(self.log, "Deneb fork is disabled"); + return Err(( + RPCResponseErrorCode::InvalidRequest, + "Deneb fork is disabled", + )); + } + }; + + let oldest_data_column_slot = self + .chain + .store + .get_data_column_info() + .oldest_data_column_slot + .unwrap_or(data_availability_boundary_slot); + + if request_start_slot < oldest_data_column_slot { + debug!( + self.log, + "Range request start slot is older than data availability boundary."; + "requested_slot" => request_start_slot, + "oldest_data_column_slot" => oldest_data_column_slot, + "data_availability_boundary" => data_availability_boundary_slot + ); + + return if data_availability_boundary_slot < oldest_data_column_slot { + Err(( + RPCResponseErrorCode::ResourceUnavailable, + "blobs pruned within boundary", + )) + } else { + Err(( + RPCResponseErrorCode::InvalidRequest, + "Req outside availability period", + )) + }; + } + + let forwards_block_root_iter = + match self.chain.forwards_iter_block_roots(request_start_slot) { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockError( + HistoricalBlockError::BlockOutOfRange { + slot, + oldest_block_slot, + }, + )) => { + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); + return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling")); + } + Err(e) => { + error!(self.log, "Unable to obtain root iter"; + "request" => ?req, + "peer" => %peer_id, + "error" => ?e + ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); + } + }; + + // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to + // `request_start_slot` in order to check whether the `request_start_slot` is a skip. + let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { + self.chain + .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) + .ok() + .flatten() + }); + + // Pick out the required blocks, ignoring skip-slots. + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => { + error!(self.log, "Error during iteration over blocks"; + "request" => ?req, + "peer" => %peer_id, + "error" => ?e + ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); + } + }; + + // remove all skip slots + let block_roots = block_roots.into_iter().flatten(); + let mut data_columns_sent = 0; + + for root in block_roots { + for index in &req.columns { + match self.chain.get_data_column(&root, index) { + Ok(Some(data_column_sidecar)) => { + data_columns_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::DataColumnsByRange(Some( + data_column_sidecar.clone(), + )), + id: request_id, + }); + } + Ok(None) => {} // no-op + Err(e) => { + error!( + self.log, + "Error fetching data columns block root"; + "request" => ?req, + "peer" => %peer_id, + "block_root" => ?root, + "error" => ?e + ); + return Err(( + RPCResponseErrorCode::ServerError, + "No data columns and failed fetching corresponding block", + )); + } + } + } + } + + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + debug!( + self.log, + "DataColumnsByRange Response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => data_columns_sent + ); + + Ok(()) } /// Helper function to ensure single item protocol always end with either a single chunk or an From aa0b9037c689de617a9092cdfc0bc9fa5283a4f2 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 14 Aug 2024 00:55:42 +0300 Subject: [PATCH 2/3] use terminate_response_stream --- .../network_beacon_processor/rpc_methods.rs | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index f75221b1869..0defe7ad879 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -323,6 +323,21 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, request: DataColumnsByRootRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.handle_data_columns_by_root_request_inner(peer_id, request_id, request), + Response::DataColumnsByRoot, + ); + } + + /// Handle a `DataColumnsByRoot` request from the peer. + pub fn handle_data_columns_by_root_request_inner( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + request: DataColumnsByRootRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { let mut send_data_column_count = 0; for data_column_id in request.data_column_ids.as_slice() { @@ -340,19 +355,16 @@ impl NetworkBeaconProcessor { } Ok(None) => {} // no-op Err(e) => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - // TODO(das): leak error details to ease debugging - format!("{:?}", e).to_string(), - request_id, - ); + // TODO(das): lower log level when feature is stabilized error!(self.log, "Error getting data column"; "block_root" => ?data_column_id.block_root, "peer" => %peer_id, "error" => ?e ); - return; + return Err(( + RPCResponseErrorCode::ServerError, + "Error getting data column", + )); } } } @@ -365,8 +377,7 @@ impl NetworkBeaconProcessor { "returned" => send_data_column_count ); - // send stream termination - self.send_response(peer_id, Response::DataColumnsByRoot(None), request_id); + Ok(()) } /// Handle a `LightClientBootstrap` request from the peer. From 906e2db54c97b329cb0d4ed12363d5c0abf90d0a Mon Sep 17 00:00:00 2001 From: realbigsean Date: Thu, 15 Aug 2024 09:33:46 -0700 Subject: [PATCH 3/3] cargo fmt --- .../src/data_availability_checker/overflow_lru_cache.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 38e7d8a3bbb..50fae091196 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -14,8 +14,8 @@ use std::num::NonZeroUsize; use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; use types::{ - BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec, Hash256, - SignedBeaconBlock, + BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec, + Hash256, SignedBeaconBlock, }; /// This represents the components of a partially available block