From bf70fb4598ccb20a63b060459bbf4190b80f7bf3 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 2 Jul 2024 16:51:49 +1000 Subject: [PATCH] Use subscribed peers on subnet before MetaDataV3 is implemented. Remove peer_id matching when injecting error because multiple peers are used for range requests. Use randomized custodial peer to avoid repeatedly sending requests to failing peers. Batch by range request where possible. --- .../src/peer_manager/peerdb/peer_info.rs | 5 + .../lighthouse_network/src/types/globals.rs | 120 +----------------- .../network/src/sync/backfill_sync/mod.rs | 8 +- .../network/src/sync/network_context.rs | 75 +++++++---- .../network/src/sync/range_sync/batch.rs | 6 +- .../network/src/sync/range_sync/chain.rs | 8 +- beacon_node/network/src/sync/sampling.rs | 8 +- 7 files changed, 80 insertions(+), 150 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index b0d28cc835..0745cc2600 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -96,6 +96,11 @@ impl PeerInfo { } Subnet::DataColumn(_) => { // TODO(das): Pending spec PR https://github.com/ethereum/consensus-specs/pull/3821 + // We should use MetaDataV3 for peer selection rather than + // looking at subscribed peers (current behavior). Until MetaDataV3 is + // implemented, this is the perhaps the only viable option on the current devnet + // as the peer count is low and it's important to identify supernodes to get a + // good distribution of peers across subnets. return true; } } diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 770f7284e6..d01ad9707a 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -1,15 +1,11 @@ //! A collection of variables that are accessible outside of the network thread itself. -use crate::discovery::peer_id_to_node_id; use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV2}; use crate::types::{BackFillState, SyncState}; -use crate::EnrExt; use crate::{Client, Eth2Enr}; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; -use discv5::handler::NodeContact; -use itertools::Itertools; +use crate::{EnrExt, Subnet}; use parking_lot::RwLock; -use slog::{debug, Logger}; use std::collections::HashSet; use types::data_column_sidecar::ColumnIndex; use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec}; @@ -136,48 +132,13 @@ impl NetworkGlobals { &self, column_index: ColumnIndex, spec: &ChainSpec, - log: &Logger, ) -> Vec { self.peers .read() - .connected_peers() - .filter_map(|(peer_id, peer_info)| { - let node_id_and_csc = if let Some(enr) = peer_info.enr() { - let custody_subnet_count = enr.custody_subnet_count::(spec); - Some((enr.node_id(), custody_subnet_count)) - } else if let Some(node_id) = peer_id_to_node_id(peer_id) - // TODO(das): may be noisy, downgrade to trace - .inspect_err( - |e| debug!(log, "Error converting peer ID to node ID"; "error" => ?e), - ) - .ok() - { - // TODO(das): may be noisy, downgrade to trace - debug!( - log, - "ENR not present for peer"; - "peer_id" => %peer_id, - "info" => "Unable to compute custody columns, falling back to default \ - custody requirement", - ); - // TODO(das): Use `custody_subnet_count` from `MetaDataV3` - Some((node_id, spec.custody_requirement)) - } else { - None - }; - - node_id_and_csc.and_then(|(node_id, custody_subnet_count)| { - // TODO(das): consider caching a map of subnet -> Vec and invalidating - // whenever a peer connected or disconnect event in received - DataColumnSubnetId::compute_custody_columns::( - node_id.raw().into(), - custody_subnet_count, - spec, - ) - .contains(&column_index) - .then_some(*peer_id) - }) - }) + .good_peers_on_subnet(Subnet::DataColumn( + DataColumnSubnetId::from_column_index::(column_index as usize, spec), + )) + .cloned() .collect::>() } @@ -204,7 +165,6 @@ impl NetworkGlobals { #[cfg(test)] mod test { use super::*; - use std::str::FromStr; use types::{Epoch, EthSpec, MainnetEthSpec as E}; #[test] @@ -222,74 +182,4 @@ mod test { default_custody_requirement_column_count as usize ); } - - #[test] - fn custody_peers_for_column_enr_present() { - let spec = E::default_spec(); - let log = logging::test_logger(); - let globals = NetworkGlobals::::new_test_globals(vec![], &log); - - let mut peers_db_write_lock = globals.peers.write(); - let valid_enrs = [ - "enr:-Mm4QDJpcg5mZ8EFeYuDcUX78tOTigHLz4_zJlCY7vOTd2-XPPqlAoWM02Us69c4ov85pHgTgeo77Z3_nAhJ4yF1y30Bh2F0dG5ldHOIAAAAAAAAAACDY3NjIIRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR0iXNlY3AyNTZrMaECvF7Y-fD1MEEVQq3y5qW7C8UoTsq6J_tfwvQIJ5fo1TGIc3luY25ldHMAg3RjcIKUc4N1ZHCClHM", - "enr:-Mm4QBw4saycbk-Up2PvppJOv0KzBqgFFHl6_OfFlh8_HxtwWkZpSFgJ0hFV3qOelh_Ai4L9HhSAEJSG48LE8YJ-7WABh2F0dG5ldHOIAAAAAAAAAACDY3NjIIRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR1iXNlY3AyNTZrMaECsRjhgRrAuRWelB9VTTzTa0tHtcWyLTLSReL4RNWhJgGIc3luY25ldHMAg3RjcIKUdIN1ZHCClHQ", - "enr:-Mm4QMFlqbpGrmN21EM-70_hDW9c3MrulhIZElmsP3kb7XSLOEmV7-Msj2jlwGR5C_TicwOXYsZrN6eEIJlGgluM_XgBh2F0dG5ldHOIAAAAAAAAAACDY3NjAYRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR2iXNlY3AyNTZrMaECpAOonvUcYbBX8Tf0ErNPKwJeeidKzJftLTryBZUusMSIc3luY25ldHMAg3RjcIKUdYN1ZHCClHU", - "enr:-Mm4QEHdVjmQ7mH2qIX7_6SDablQUcrZuA4Sxjprh9WGbipfHUjPrELtBaRIRJUrpI8cgJRoAF1wMwoeRS7j3d8xviRGh2F0dG5ldHOIAAAAAAAAAACDY3NjAYRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR2iXNlY3AyNTZrMaECpAOonvUcYbBX8Tf0ErNPKwJeeidKzJftLTryBZUusMSIc3luY25ldHMAg3RjcIKUdYN1ZHCClHU" - ]; - let peers = valid_enrs - .into_iter() - .map(|enr_str| { - let enr = Enr::from_str(enr_str).unwrap(); - let peer_id = enr.peer_id(); - peers_db_write_lock.__add_connected_peer_enr_testing_only(enr); - peer_id - }) - .collect::>(); - - drop(peers_db_write_lock); - let [supernode_peer_1, supernode_peer_2, _, _] = - peers.try_into().expect("expected exactly 4 peer ids"); - - for col_index in 0..spec.number_of_columns { - let custody_peers = - globals.custody_peers_for_column(col_index as ColumnIndex, &spec, &log); - assert!( - custody_peers.contains(&supernode_peer_1), - "must at least return supernode peer" - ); - assert!( - custody_peers.contains(&supernode_peer_2), - "must at least return supernode peer" - ); - } - } - - // If ENR is not preset, fallback to deriving node_id and use `spec.custody_requirement`. - #[test] - fn custody_peers_for_column_no_enr_use_default() { - let spec = E::default_spec(); - let log = logging::test_logger(); - let globals = NetworkGlobals::::new_test_globals(vec![], &log); - - // Add peer without enr - let peer_id_str = "16Uiu2HAm86zWajwnBFD8uxkRpxhRzeUEf6Brfz2VBxGAaWx9ejyr"; - let peer_id = PeerId::from_str(peer_id_str).unwrap(); - let multiaddr = - Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/9000/p2p/{peer_id_str}")).unwrap(); - - let mut peers_db_write_lock = globals.peers.write(); - peers_db_write_lock.__add_connected_peer_multiaddr_testing_only(&peer_id, multiaddr); - drop(peers_db_write_lock); - - let custody_subnets = (0..spec.data_column_sidecar_subnet_count) - .filter(|col_index| { - !globals - .custody_peers_for_column(*col_index, &spec, &log) - .is_empty() - }) - .count(); - - // The single peer's custody subnet should match custody_requirement. - assert_eq!(custody_subnets, spec.custody_requirement as usize); - } } diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 915ba36a50..db58d5d3c8 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -337,7 +337,9 @@ impl BackFillSync { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer - if !batch.is_expecting_block(peer_id, &request_id) { + // TODO(das): removed peer_id matching as the node may request a different peer for data + // columns. + if !batch.is_expecting_block(&request_id) { return Ok(()); } debug!(self.log, "Batch failed"; "batch_epoch" => batch_id, "error" => "rpc_error"); @@ -385,7 +387,9 @@ impl BackFillSync { // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer, and that the // request_id matches - if !batch.is_expecting_block(peer_id, &request_id) { + // TODO(das): removed peer_id matching as the node may request a different peer for data + // columns. + if !batch.is_expecting_block(&request_id) { return Ok(ProcessResult::Successful); } batch diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index a6b59f71f1..adb2f5e33e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -26,6 +26,8 @@ use fnv::FnvHashMap; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; +use rand::seq::SliceRandom; +use rand::thread_rng; pub use requests::LookupVerifyError; use slog::{debug, error, warn}; use slot_clock::SlotClock; @@ -238,7 +240,17 @@ impl SyncNetworkContext { // TODO(das): epoch argument left here in case custody rotation is implemented pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec { self.network_globals() - .custody_peers_for_column(column_index, &self.chain.spec, &self.log) + .custody_peers_for_column(column_index, &self.chain.spec) + } + + pub fn get_random_custodial_peer( + &self, + epoch: Epoch, + column_index: ColumnIndex, + ) -> Option { + self.get_custodial_peers(epoch, column_index) + .choose(&mut thread_rng()) + .cloned() } pub fn network_globals(&self) -> &NetworkGlobals { @@ -337,35 +349,22 @@ impl SyncNetworkContext { .network_globals() .custody_columns(epoch, &self.chain.spec); - for column_index in &custody_indexes { - let custody_peer_ids = self.get_custodial_peers(epoch, *column_index); - let Some(custody_peer) = custody_peer_ids.first().cloned() else { - // TODO(das): this will be pretty bad UX. To improve we should: - // - Attempt to fetch custody requests first, before requesting blocks - // - Handle the no peers case gracefully, maybe add some timeout and give a few - // minutes / seconds to the peer manager to locate peers on this subnet before - // abandoing progress on the chain completely. - return Err(RpcRequestSendError::NoCustodyPeers); - }; - + for (peer_id, columns_by_range_request) in + self.make_columns_by_range_requests(epoch, request, &custody_indexes)? + { debug!( self.log, "Sending DataColumnsByRange requests"; "method" => "DataColumnsByRange", - "count" => request.count(), + "count" => columns_by_range_request.count, "epoch" => epoch, - "index" => column_index, - "peer" => %custody_peer, + "columns" => ?columns_by_range_request.columns, + "peer" => %peer_id, ); - // Create the blob request based on the blocks request. self.send_network_msg(NetworkMessage::SendRequest { - peer_id: custody_peer, - request: Request::DataColumnsByRange(DataColumnsByRangeRequest { - start_slot: *request.start_slot(), - count: *request.count(), - columns: vec![*column_index], - }), + peer_id, + request: Request::DataColumnsByRange(columns_by_range_request), request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)), }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; @@ -382,6 +381,38 @@ impl SyncNetworkContext { Ok(id) } + fn make_columns_by_range_requests( + &self, + epoch: Epoch, + request: BlocksByRangeRequest, + custody_indexes: &Vec, + ) -> Result, RpcRequestSendError> { + let mut peer_id_to_request_map = HashMap::new(); + + for column_index in custody_indexes { + let Some(custody_peer) = self.get_random_custodial_peer(epoch, *column_index) else { + // TODO(das): this will be pretty bad UX. To improve we should: + // - Attempt to fetch custody requests first, before requesting blocks + // - Handle the no peers case gracefully, maybe add some timeout and give a few + // minutes / seconds to the peer manager to locate peers on this subnet before + // abandoing progress on the chain completely. + return Err(RpcRequestSendError::NoCustodyPeers); + }; + + let columns_by_range_request = peer_id_to_request_map + .entry(custody_peer) + .or_insert_with(|| DataColumnsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + columns: vec![], + }); + + columns_by_range_request.columns.push(*column_index); + } + + Ok(peer_id_to_request_map) + } + pub fn range_request_failed(&mut self, request_id: Id) -> Option { let sender_id = self .range_block_components_requests diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 273d3248e1..aac2b12f3c 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -199,9 +199,9 @@ impl BatchInfo { } /// Verifies if an incoming block belongs to this batch. - pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool { - if let BatchState::Downloading(expected_peer, _, expected_id) = &self.state { - return peer_id == expected_peer && expected_id == request_id; + pub fn is_expecting_block(&self, request_id: &Id) -> bool { + if let BatchState::Downloading(_, _, expected_id) = &self.state { + return expected_id == request_id; } false } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 071d634a39..13f8ffdb92 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -216,7 +216,9 @@ impl SyncingChain { // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer, and that the // request_id matches - if !batch.is_expecting_block(peer_id, &request_id) { + // TODO(das): removed peer_id matching as the node may request a different peer for data + // columns. + if !batch.is_expecting_block(&request_id) { return Ok(KeepChain); } batch @@ -808,7 +810,9 @@ impl SyncingChain { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer - if !batch.is_expecting_block(peer_id, &request_id) { + // TODO(das): removed peer_id matching as the node may request a different peer for data + // columns. + if !batch.is_expecting_block(&request_id) { debug!( self.log, "Batch not expecting block"; diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs index 3fb21489d9..ee298f518b 100644 --- a/beacon_node/network/src/sync/sampling.rs +++ b/beacon_node/network/src/sync/sampling.rs @@ -490,13 +490,9 @@ mod request { // TODO: When is a fork and only a subset of your peers know about a block, sampling should only // be queried on the peers on that fork. Should this case be handled? How to handle it? - let peer_ids = cx.get_custodial_peers( - block_slot.epoch(::slots_per_epoch()), - self.column_index, - ); + let epoch = block_slot.epoch(::slots_per_epoch()); - // TODO(das) randomize custodial peer and avoid failing peers - if let Some(peer_id) = peer_ids.first().cloned() { + if let Some(peer_id) = cx.get_random_custodial_peer(epoch, self.column_index) { cx.data_column_lookup_request( DataColumnsByRootRequester::Sampling(SamplingId { id: requester,