Skip to content

Commit

Permalink
moves socket-addr-space check to get_retransmit_addrs
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Dec 16, 2024
1 parent 6bdcdba commit ecc3f0f
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
1 change: 1 addition & 0 deletions streamer/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ impl SocketAddrSpace {
}

/// Returns true if the IP address is valid.
#[inline]
#[must_use]
pub fn check(&self, addr: &SocketAddr) -> bool {
if matches!(self, SocketAddrSpace::Unspecified) {
Expand Down
9 changes: 7 additions & 2 deletions turbine/benches/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
solana_gossip::contact_info::ContactInfo,
solana_ledger::shred::{Shred, ShredFlags},
solana_sdk::{clock::Slot, genesis_config::ClusterType, pubkey::Pubkey},
solana_streamer::socket::SocketAddrSpace,
solana_turbine::{
cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes},
retransmit_stage::RetransmitStage,
Expand Down Expand Up @@ -45,8 +46,12 @@ fn get_retransmit_peers_deterministic(
0,
0,
);
let _retransmit_peers =
cluster_nodes.get_retransmit_addrs(slot_leader, &shred.id(), /*fanout:*/ 200);
let _retransmit_peers = cluster_nodes.get_retransmit_addrs(
slot_leader,
&shred.id(),
200, // fanout
&SocketAddrSpace::Unspecified,
);
}
}

Expand Down
6 changes: 5 additions & 1 deletion turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl ClusterNodes<RetransmitStage> {
slot_leader: &Pubkey,
shred: &ShredId,
fanout: usize,
socket_addr_space: &SocketAddrSpace,
) -> Result<(/*root_distance:*/ usize, Vec<SocketAddr>), Error> {
let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes.
Expand Down Expand Up @@ -195,7 +196,10 @@ impl ClusterNodes<RetransmitStage> {
};
let (index, peers) =
get_retransmit_peers(fanout, |(node, _)| node.pubkey() == self.pubkey, nodes);
let peers = peers.filter_map(|(_, addr)| addr).collect();
let peers = peers
.filter_map(|(_, addr)| addr)
.filter(|addr| socket_addr_space.check(addr))
.collect();
let root_distance = get_root_distance(index, fanout);
Ok((root_distance, peers))
}
Expand Down
12 changes: 6 additions & 6 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,12 @@ fn retransmit_shred(
) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> {
let mut compute_turbine_peers = Measure::start("turbine_start");
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank);
let (root_distance, addrs) =
cluster_nodes.get_retransmit_addrs(slot_leader, key, data_plane_fanout)?;
let addrs: Vec<_> = addrs
.into_iter()
.filter(|addr| socket_addr_space.check(addr))
.collect();
let (root_distance, addrs) = cluster_nodes.get_retransmit_addrs(
slot_leader,
key,
data_plane_fanout,
socket_addr_space,
)?;
compute_turbine_peers.stop();
stats
.compute_turbine_peers_total
Expand Down

0 comments on commit ecc3f0f

Please sign in to comment.