Skip to content

Commit

Permalink
Use subscribed peers on subnet before MetaDataV3 is implemented. Remo…
Browse files Browse the repository at this point in the history
…ve peer_id matching when injecting error because multiple peers are used for range requests. Use randomized custodial peer to avoid repeatedly sending requests to failing peers. Batch by range request where possible.
  • Loading branch information
jimmygchen committed Jul 2, 2024
1 parent 9cfbfa6 commit bf70fb4
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ impl<E: EthSpec> PeerInfo<E> {
}
Subnet::DataColumn(_) => {
// TODO(das): Pending spec PR https://github.com/ethereum/consensus-specs/pull/3821
// We should use MetaDataV3 for peer selection rather than
// looking at subscribed peers (current behavior). Until MetaDataV3 is
// implemented, this is the perhaps the only viable option on the current devnet
// as the peer count is low and it's important to identify supernodes to get a
// good distribution of peers across subnets.
return true;
}
}
Expand Down
120 changes: 5 additions & 115 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
//! A collection of variables that are accessible outside of the network thread itself.
use crate::discovery::peer_id_to_node_id;
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV2};
use crate::types::{BackFillState, SyncState};
use crate::EnrExt;
use crate::{Client, Eth2Enr};
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use discv5::handler::NodeContact;
use itertools::Itertools;
use crate::{EnrExt, Subnet};
use parking_lot::RwLock;
use slog::{debug, Logger};
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec};
Expand Down Expand Up @@ -136,48 +132,13 @@ impl<E: EthSpec> NetworkGlobals<E> {
&self,
column_index: ColumnIndex,
spec: &ChainSpec,
log: &Logger,
) -> Vec<PeerId> {
self.peers
.read()
.connected_peers()
.filter_map(|(peer_id, peer_info)| {
let node_id_and_csc = if let Some(enr) = peer_info.enr() {
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
Some((enr.node_id(), custody_subnet_count))
} else if let Some(node_id) = peer_id_to_node_id(peer_id)
// TODO(das): may be noisy, downgrade to trace
.inspect_err(
|e| debug!(log, "Error converting peer ID to node ID"; "error" => ?e),
)
.ok()
{
// TODO(das): may be noisy, downgrade to trace
debug!(
log,
"ENR not present for peer";
"peer_id" => %peer_id,
"info" => "Unable to compute custody columns, falling back to default \
custody requirement",
);
// TODO(das): Use `custody_subnet_count` from `MetaDataV3`
Some((node_id, spec.custody_requirement))
} else {
None
};

node_id_and_csc.and_then(|(node_id, custody_subnet_count)| {
// TODO(das): consider caching a map of subnet -> Vec<PeerId> and invalidating
// whenever a peer connected or disconnect event in received
DataColumnSubnetId::compute_custody_columns::<E>(
node_id.raw().into(),
custody_subnet_count,
spec,
)
.contains(&column_index)
.then_some(*peer_id)
})
})
.good_peers_on_subnet(Subnet::DataColumn(
DataColumnSubnetId::from_column_index::<E>(column_index as usize, spec),
))
.cloned()
.collect::<Vec<_>>()
}

Expand All @@ -204,7 +165,6 @@ impl<E: EthSpec> NetworkGlobals<E> {
#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;
use types::{Epoch, EthSpec, MainnetEthSpec as E};

#[test]
Expand All @@ -222,74 +182,4 @@ mod test {
default_custody_requirement_column_count as usize
);
}

#[test]
fn custody_peers_for_column_enr_present() {
let spec = E::default_spec();
let log = logging::test_logger();
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);

let mut peers_db_write_lock = globals.peers.write();
let valid_enrs = [
"enr:-Mm4QDJpcg5mZ8EFeYuDcUX78tOTigHLz4_zJlCY7vOTd2-XPPqlAoWM02Us69c4ov85pHgTgeo77Z3_nAhJ4yF1y30Bh2F0dG5ldHOIAAAAAAAAAACDY3NjIIRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR0iXNlY3AyNTZrMaECvF7Y-fD1MEEVQq3y5qW7C8UoTsq6J_tfwvQIJ5fo1TGIc3luY25ldHMAg3RjcIKUc4N1ZHCClHM",
"enr:-Mm4QBw4saycbk-Up2PvppJOv0KzBqgFFHl6_OfFlh8_HxtwWkZpSFgJ0hFV3qOelh_Ai4L9HhSAEJSG48LE8YJ-7WABh2F0dG5ldHOIAAAAAAAAAACDY3NjIIRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR1iXNlY3AyNTZrMaECsRjhgRrAuRWelB9VTTzTa0tHtcWyLTLSReL4RNWhJgGIc3luY25ldHMAg3RjcIKUdIN1ZHCClHQ",
"enr:-Mm4QMFlqbpGrmN21EM-70_hDW9c3MrulhIZElmsP3kb7XSLOEmV7-Msj2jlwGR5C_TicwOXYsZrN6eEIJlGgluM_XgBh2F0dG5ldHOIAAAAAAAAAACDY3NjAYRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR2iXNlY3AyNTZrMaECpAOonvUcYbBX8Tf0ErNPKwJeeidKzJftLTryBZUusMSIc3luY25ldHMAg3RjcIKUdYN1ZHCClHU",
"enr:-Mm4QEHdVjmQ7mH2qIX7_6SDablQUcrZuA4Sxjprh9WGbipfHUjPrELtBaRIRJUrpI8cgJRoAF1wMwoeRS7j3d8xviRGh2F0dG5ldHOIAAAAAAAAAACDY3NjAYRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR2iXNlY3AyNTZrMaECpAOonvUcYbBX8Tf0ErNPKwJeeidKzJftLTryBZUusMSIc3luY25ldHMAg3RjcIKUdYN1ZHCClHU"
];
let peers = valid_enrs
.into_iter()
.map(|enr_str| {
let enr = Enr::from_str(enr_str).unwrap();
let peer_id = enr.peer_id();
peers_db_write_lock.__add_connected_peer_enr_testing_only(enr);
peer_id
})
.collect::<Vec<_>>();

drop(peers_db_write_lock);
let [supernode_peer_1, supernode_peer_2, _, _] =
peers.try_into().expect("expected exactly 4 peer ids");

for col_index in 0..spec.number_of_columns {
let custody_peers =
globals.custody_peers_for_column(col_index as ColumnIndex, &spec, &log);
assert!(
custody_peers.contains(&supernode_peer_1),
"must at least return supernode peer"
);
assert!(
custody_peers.contains(&supernode_peer_2),
"must at least return supernode peer"
);
}
}

// If ENR is not preset, fallback to deriving node_id and use `spec.custody_requirement`.
#[test]
fn custody_peers_for_column_no_enr_use_default() {
let spec = E::default_spec();
let log = logging::test_logger();
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);

// Add peer without enr
let peer_id_str = "16Uiu2HAm86zWajwnBFD8uxkRpxhRzeUEf6Brfz2VBxGAaWx9ejyr";
let peer_id = PeerId::from_str(peer_id_str).unwrap();
let multiaddr =
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/9000/p2p/{peer_id_str}")).unwrap();

let mut peers_db_write_lock = globals.peers.write();
peers_db_write_lock.__add_connected_peer_multiaddr_testing_only(&peer_id, multiaddr);
drop(peers_db_write_lock);

let custody_subnets = (0..spec.data_column_sidecar_subnet_count)
.filter(|col_index| {
!globals
.custody_peers_for_column(*col_index, &spec, &log)
.is_empty()
})
.count();

// The single peer's custody subnet should match custody_requirement.
assert_eq!(custody_subnets, spec.custody_requirement as usize);
}
}
8 changes: 6 additions & 2 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(());
}
debug!(self.log, "Batch failed"; "batch_epoch" => batch_id, "error" => "rpc_error");
Expand Down Expand Up @@ -385,7 +387,9 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(ProcessResult::Successful);
}
batch
Expand Down
75 changes: 53 additions & 22 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
use rand::seq::SliceRandom;
use rand::thread_rng;
pub use requests::LookupVerifyError;
use slog::{debug, error, warn};
use slot_clock::SlotClock;
Expand Down Expand Up @@ -238,7 +240,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// TODO(das): epoch argument left here in case custody rotation is implemented
pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec<PeerId> {
self.network_globals()
.custody_peers_for_column(column_index, &self.chain.spec, &self.log)
.custody_peers_for_column(column_index, &self.chain.spec)
}

pub fn get_random_custodial_peer(
&self,
epoch: Epoch,
column_index: ColumnIndex,
) -> Option<PeerId> {
self.get_custodial_peers(epoch, column_index)
.choose(&mut thread_rng())
.cloned()
}

pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down Expand Up @@ -337,35 +349,22 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.network_globals()
.custody_columns(epoch, &self.chain.spec);

for column_index in &custody_indexes {
let custody_peer_ids = self.get_custodial_peers(epoch, *column_index);
let Some(custody_peer) = custody_peer_ids.first().cloned() else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
// - Handle the no peers case gracefully, maybe add some timeout and give a few
// minutes / seconds to the peer manager to locate peers on this subnet before
// abandoing progress on the chain completely.
return Err(RpcRequestSendError::NoCustodyPeers);
};

for (peer_id, columns_by_range_request) in
self.make_columns_by_range_requests(epoch, request, &custody_indexes)?
{
debug!(
self.log,
"Sending DataColumnsByRange requests";
"method" => "DataColumnsByRange",
"count" => request.count(),
"count" => columns_by_range_request.count,
"epoch" => epoch,
"index" => column_index,
"peer" => %custody_peer,
"columns" => ?columns_by_range_request.columns,
"peer" => %peer_id,
);

// Create the blob request based on the blocks request.
self.send_network_msg(NetworkMessage::SendRequest {
peer_id: custody_peer,
request: Request::DataColumnsByRange(DataColumnsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
columns: vec![*column_index],
}),
peer_id,
request: Request::DataColumnsByRange(columns_by_range_request),
request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)),
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
Expand All @@ -382,6 +381,38 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(id)
}

fn make_columns_by_range_requests(
&self,
epoch: Epoch,
request: BlocksByRangeRequest,
custody_indexes: &Vec<ColumnIndex>,
) -> Result<HashMap<PeerId, DataColumnsByRangeRequest>, RpcRequestSendError> {
let mut peer_id_to_request_map = HashMap::new();

for column_index in custody_indexes {
let Some(custody_peer) = self.get_random_custodial_peer(epoch, *column_index) else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
// - Handle the no peers case gracefully, maybe add some timeout and give a few
// minutes / seconds to the peer manager to locate peers on this subnet before
// abandoing progress on the chain completely.
return Err(RpcRequestSendError::NoCustodyPeers);
};

let columns_by_range_request = peer_id_to_request_map
.entry(custody_peer)
.or_insert_with(|| DataColumnsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
columns: vec![],
});

columns_by_range_request.columns.push(*column_index);
}

Ok(peer_id_to_request_map)
}

pub fn range_request_failed(&mut self, request_id: Id) -> Option<RangeRequestId> {
let sender_id = self
.range_block_components_requests
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/network/src/sync/range_sync/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}

/// Verifies if an incoming block belongs to this batch.
pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool {
if let BatchState::Downloading(expected_peer, _, expected_id) = &self.state {
return peer_id == expected_peer && expected_id == request_id;
pub fn is_expecting_block(&self, request_id: &Id) -> bool {
if let BatchState::Downloading(_, _, expected_id) = &self.state {
return expected_id == request_id;
}
false
}
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(KeepChain);
}
batch
Expand Down Expand Up @@ -808,7 +810,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
debug!(
self.log,
"Batch not expecting block";
Expand Down
8 changes: 2 additions & 6 deletions beacon_node/network/src/sync/sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,13 +490,9 @@ mod request {

// TODO: When is a fork and only a subset of your peers know about a block, sampling should only
// be queried on the peers on that fork. Should this case be handled? How to handle it?
let peer_ids = cx.get_custodial_peers(
block_slot.epoch(<T::EthSpec as EthSpec>::slots_per_epoch()),
self.column_index,
);
let epoch = block_slot.epoch(<T::EthSpec as EthSpec>::slots_per_epoch());

// TODO(das) randomize custodial peer and avoid failing peers
if let Some(peer_id) = peer_ids.first().cloned() {
if let Some(peer_id) = cx.get_random_custodial_peer(epoch, self.column_index) {
cx.data_column_lookup_request(
DataColumnsByRootRequester::Sampling(SamplingId {
id: requester,
Expand Down

0 comments on commit bf70fb4

Please sign in to comment.