Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Fix Turbine making Port based decisions #7774

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use solana_core::contact_info::ContactInfo;
use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_core::packet::to_packets_chunked;
use solana_core::retransmit_stage::retransmitter;
use solana_core::weighted_shuffle::weighted_best;
use solana_ledger::bank_forks::BankForks;
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::staking_utils;
use solana_measure::measure::Measure;
use solana_perf::test_tx::test_tx;
use solana_runtime::bank::Bank;
Expand Down Expand Up @@ -68,6 +70,23 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let batches = to_packets_chunked(&vec![tx; NUM_PACKETS], chunk_size);
info!("batches: {}", batches.len());

// compute the selected index here, make sure to change cluster_info ownership to critical path id
let working_bank = bank_forks.read().unwrap().working_bank();
let bank_epoch = working_bank.get_leader_schedule_epoch(working_bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&working_bank, bank_epoch);
let stakes = stakes.map(Arc::new);
let (peers, stakes_and_index) = cluster_info
.read()
.unwrap()
.sorted_retransmit_peers_and_stakes(stakes);
// the packet seeds are always zero, just find the id that gets picked first
let broadcast_index = weighted_best(&stakes_and_index, batches[0].packets[0].meta.seed);
{
let mut w_cinfo = cluster_info.write().unwrap();
w_cinfo.gossip.set_self(&peers[broadcast_index].id);
w_cinfo.insert_self(peers[broadcast_index].clone());
}

let retransmitter_handles = retransmitter(
Arc::new(sockets),
bank_forks,
Expand Down
84 changes: 2 additions & 82 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ fn retransmit(
continue;
}
if packet.meta.repair {
total_packets -= 1;
repair_total += 1;
continue;
}

let mut compute_turbine_peers = Measure::start("turbine_start");
Expand Down Expand Up @@ -111,7 +109,8 @@ fn retransmit(
let leader =
leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref()));
let mut retransmit_time = Measure::start("retransmit_to");
if !packet.meta.forward {
// If I am on the critical path for this packet, send it to everyone
if my_index % DATA_PLANE_FANOUT == 0 {
ClusterInfo::retransmit_to(&neighbors, packet, leader, sock, true)?;
ClusterInfo::retransmit_to(&children, packet, leader, sock, false)?;
} else {
Expand Down Expand Up @@ -274,82 +273,3 @@ impl RetransmitStage {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::contact_info::ContactInfo;
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use crate::packet::{self, Meta, Packet, Packets};
use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions};
use solana_ledger::create_new_tmp_ledger;
use solana_net_utils::find_available_port_in_range;
use solana_sdk::pubkey::Pubkey;

#[test]
fn test_skip_repair() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(123);
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
let blockstore = Blockstore::open(&ledger_path).unwrap();
let opts = ProcessOptions {
full_leader_cache: true,
..ProcessOptions::default()
};
let (bank_forks, _, cached_leader_schedule) =
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts).unwrap();
let leader_schedule_cache = Arc::new(cached_leader_schedule);
let bank_forks = Arc::new(RwLock::new(bank_forks));

let mut me = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
let port = find_available_port_in_range((8000, 10000)).unwrap();
let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap();
// need to make sure tvu and tpu are valid addresses
me.tvu_forwards = me_retransmit.local_addr().unwrap();
let port = find_available_port_in_range((8000, 10000)).unwrap();
me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port))
.unwrap()
.local_addr()
.unwrap();

let other = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(other);
cluster_info.insert_info(me);

let retransmit_socket = Arc::new(vec![UdpSocket::bind("0.0.0.0:0").unwrap()]);
let cluster_info = Arc::new(RwLock::new(cluster_info));

let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = retransmitter(
retransmit_socket,
bank_forks,
&leader_schedule_cache,
cluster_info,
Arc::new(Mutex::new(retransmit_receiver)),
);
let _thread_hdls = vec![t_retransmit];

let packets = Packets::new(vec![Packet::default()]);
// it should send this over the sockets.
retransmit_sender.send(packets).unwrap();
let mut packets = Packets::new(vec![]);
packet::recv_from(&mut packets, &me_retransmit).unwrap();
assert_eq!(packets.packets.len(), 1);
assert_eq!(packets.packets[0].meta.repair, false);

let repair = Packet {
meta: Meta {
repair: true,
..Meta::default()
},
..Packet::default()
};

// send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from
let packets = Packets::new(vec![repair, Packet::default()]);
retransmit_sender.send(packets).unwrap();
let mut packets = Packets::new(vec![]);
packet::recv_from(&mut packets, &me_retransmit).unwrap();
assert_eq!(packets.packets.len(), 1);
assert_eq!(packets.packets[0].meta.repair, false);
}
}
17 changes: 7 additions & 10 deletions core/tests/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;

type Nodes = HashMap<Pubkey, (bool, HashSet<i32>, Receiver<(i32, bool)>)>;
type Nodes = HashMap<Pubkey, (bool, HashSet<i32>, Receiver<i32>)>;

fn num_threads() -> usize {
sys_info::cpu_num().unwrap_or(10) as usize
Expand All @@ -29,11 +29,10 @@ fn find_insert_shred(id: &Pubkey, shred: i32, batches: &mut [Nodes]) {

fn retransmit(
mut shuffled_nodes: Vec<ContactInfo>,
senders: &HashMap<Pubkey, Sender<(i32, bool)>>,
senders: &HashMap<Pubkey, Sender<i32>>,
cluster: &ClusterInfo,
fanout: usize,
shred: i32,
retransmit: bool,
) -> i32 {
let mut seed = [0; 32];
let mut my_index = 0;
Expand All @@ -48,17 +47,18 @@ fn retransmit(
}
});
seed[0..4].copy_from_slice(&shred.to_le_bytes());
let retransmit = my_index % fanout == 0;
let shuffled_indices = (0..shuffled_nodes.len()).collect();
let (neighbors, children) = compute_retransmit_peers(fanout, my_index, shuffled_indices);
children.into_iter().for_each(|i| {
let s = senders.get(&shuffled_nodes[i].id).unwrap();
let _ = s.send((shred, retransmit));
let _ = s.send(shred);
});

if retransmit {
neighbors.into_iter().for_each(|i| {
let s = senders.get(&shuffled_nodes[i].id).unwrap();
let _ = s.send((shred, false));
let _ = s.send(shred);
});
}

Expand All @@ -79,8 +79,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) {

// setup accounts for all nodes (leader has 0 bal)
let (s, r) = channel();
let senders: Arc<Mutex<HashMap<Pubkey, Sender<(i32, bool)>>>> =
Arc::new(Mutex::new(HashMap::new()));
let senders: Arc<Mutex<HashMap<Pubkey, Sender<i32>>>> = Arc::new(Mutex::new(HashMap::new()));
senders.lock().unwrap().insert(leader_info.id, s);
let mut batches: Vec<Nodes> = Vec::with_capacity(num_threads);
(0..num_threads).for_each(|_| batches.push(HashMap::new()));
Expand Down Expand Up @@ -159,7 +158,6 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
&cluster,
fanout,
*i,
true,
);
});
*layer1_done = true;
Expand All @@ -169,15 +167,14 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
if recv.len() < shreds_len {
loop {
match r.try_recv() {
Ok((data, retx)) => {
Ok(data) => {
if recv.insert(data) {
let _ = retransmit(
shuffled_peers[data as usize].clone(),
&senders,
&cluster,
fanout,
data,
retx,
);
}
if recv.len() == shreds_len {
Expand Down