Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
removes outdated tvu_forward socket (#32101)
Browse files Browse the repository at this point in the history
Shreds are no longer sent to tvu_forward socket.
  • Loading branch information
behzadnouri authored Jun 20, 2023
1 parent 2035442 commit 469661d
Show file tree
Hide file tree
Showing 15 changed files with 8 additions and 158 deletions.
3 changes: 0 additions & 3 deletions core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let mut contact_info = ContactInfo::new_localhost(&id, timestamp());
let port = socket.local_addr().unwrap().port();
contact_info.set_tvu((Ipv4Addr::LOCALHOST, port)).unwrap();
contact_info
.set_tvu_forwards(contact_info.tvu(Protocol::UDP).unwrap())
.unwrap();
info!("local: {:?}", contact_info.tvu(Protocol::UDP).unwrap());
cluster_info.insert_info(contact_info);
socket.set_nonblocking(true).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
peers.iter().enumerate().for_each(|(i, peer)| {
if fake == (i <= self.partition) {
// Send fake shreds to the first N peers
if let Ok(addr) = peer.tvu_forwards() {
if let Ok(addr) = peer.tvu(Protocol::UDP) {
data_shreds.iter().for_each(|b| {
sock.send_to(b.payload(), addr).unwrap();
});
Expand Down
2 changes: 0 additions & 2 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,6 @@ mod tests {
);
nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap();
nxt.set_tvu((Ipv4Addr::LOCALHOST, 1235)).unwrap();
nxt.set_tvu_forwards((Ipv4Addr::LOCALHOST, 1236)).unwrap();
nxt.set_repair((Ipv4Addr::LOCALHOST, 1237)).unwrap();
nxt.set_tpu((Ipv4Addr::LOCALHOST, 1238)).unwrap();
nxt.set_tpu_forwards((Ipv4Addr::LOCALHOST, 1239)).unwrap();
Expand Down Expand Up @@ -1960,7 +1959,6 @@ mod tests {
);
nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap();
nxt.set_tvu((Ipv4Addr::LOCALHOST, 1235)).unwrap();
nxt.set_tvu_forwards((Ipv4Addr::LOCALHOST, 1236)).unwrap();
nxt.set_repair((Ipv4Addr::LOCALHOST, 1237)).unwrap();
nxt.set_tpu((Ipv4Addr::LOCALHOST, 1238)).unwrap();
nxt.set_tpu_forwards((Ipv4Addr::LOCALHOST, 1239)).unwrap();
Expand Down
16 changes: 0 additions & 16 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ impl ShredFetchStage {
pub(crate) fn new(
sockets: Vec<Arc<UdpSocket>>,
quic_socket: UdpSocket,
forward_sockets: Vec<Arc<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
sender: Sender<PacketBatch>,
shred_version: u16,
Expand All @@ -196,19 +195,6 @@ impl ShredFetchStage {
turbine_disabled.clone(),
);

let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier(
forward_sockets,
exit.clone(),
sender.clone(),
recycler.clone(),
bank_forks.clone(),
shred_version,
"shred_fetch_tvu_forwards",
PacketFlags::FORWARDED,
None, // repair_context
turbine_disabled.clone(),
);

let (repair_receiver, repair_handler) = Self::packet_modifier(
vec![repair_socket.clone()],
exit.clone(),
Expand All @@ -222,10 +208,8 @@ impl ShredFetchStage {
turbine_disabled.clone(),
);

tvu_threads.extend(tvu_forwards_threads.into_iter());
tvu_threads.extend(repair_receiver.into_iter());
tvu_threads.push(tvu_filter);
tvu_threads.push(fwd_thread_hdl);
tvu_threads.push(repair_handler);

let keypair = cluster_info.keypair().clone();
Expand Down
6 changes: 0 additions & 6 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub struct TvuSockets {
pub(crate) fetch_quic: UdpSocket,
pub repair: UdpSocket,
pub retransmit: Vec<UdpSocket>,
pub forwards: Vec<UdpSocket>,
pub ancestor_hashes_requests: UdpSocket,
}

Expand Down Expand Up @@ -149,7 +148,6 @@ impl Tvu {
fetch: fetch_sockets,
fetch_quic: fetch_quic_socket,
retransmit: retransmit_sockets,
forwards: tvu_forward_sockets,
ancestor_hashes_requests: ancestor_hashes_socket,
} = sockets;

Expand All @@ -158,12 +156,9 @@ impl Tvu {
let repair_socket = Arc::new(repair_socket);
let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket);
let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
let forward_sockets: Vec<Arc<UdpSocket>> =
tvu_forward_sockets.into_iter().map(Arc::new).collect();
let fetch_stage = ShredFetchStage::new(
fetch_sockets,
fetch_quic_socket,
forward_sockets,
repair_socket.clone(),
fetch_sender,
tvu_config.shred_version,
Expand Down Expand Up @@ -463,7 +458,6 @@ pub mod tests {
retransmit: target1.sockets.retransmit_sockets,
fetch: target1.sockets.tvu,
fetch_quic: target1.sockets.tvu_quic,
forwards: target1.sockets.tvu_forwards,
ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
}
},
Expand Down
1 change: 0 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,6 @@ impl Validator {
retransmit: node.sockets.retransmit_sockets,
fetch: node.sockets.tvu,
fetch_quic: node.sockets.tvu_quic,
forwards: node.sockets.tvu_forwards,
ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
},
blockstore.clone(),
Expand Down
1 change: 0 additions & 1 deletion dos/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ pub struct TransactionParams {
pub enum Mode {
Gossip,
Tvu,
TvuForwards,
Tpu,
TpuForwards,
Repair,
Expand Down
1 change: 0 additions & 1 deletion dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,6 @@ fn get_target(
target = match mode {
Mode::Gossip => Some((*node.pubkey(), node.gossip().unwrap())),
Mode::Tvu => Some((*node.pubkey(), node.tvu(Protocol::UDP).unwrap())),
Mode::TvuForwards => Some((*node.pubkey(), node.tvu_forwards().unwrap())),
Mode::Tpu => Some((*node.pubkey(), node.tpu(protocol).unwrap())),
Mode::TpuForwards => {
Some((*node.pubkey(), node.tpu_forwards(protocol).unwrap()))
Expand Down
112 changes: 4 additions & 108 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ impl ClusterInfo {
}
let ip_addr = node.gossip().as_ref().map(SocketAddr::ip).ok();
Some(format!(
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
node.gossip()
.ok()
.filter(|addr| self.socket_addr_space.check(addr))
Expand All @@ -848,7 +848,6 @@ impl ClusterInfo {
self.addr_to_string(&ip_addr, &node.tpu(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu_forwards().ok()),
self.addr_to_string(&ip_addr, &node.repair().ok()),
self.addr_to_string(&ip_addr, &node.serve_repair().ok()),
node.shred_version(),
Expand All @@ -859,9 +858,9 @@ impl ClusterInfo {

format!(
"IP Address |Age(ms)| Node identifier \
| Version |Gossip|TPUvote| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer\n\
------------------+-------+----------------------------------------------\
+---------+------+-------+------+------+------+------+------+------+--------\n\
| Version |Gossip|TPUvote| TPU |TPUfwd| TVU |Repair|ServeR|ShredVer\n\
------------------+-------+---------------------------------------\
+---------+------+-------+------+------+------+------+------+--------\n\
{}\
Nodes: {}{}{}",
nodes.join(""),
Expand Down Expand Up @@ -2798,7 +2797,6 @@ pub struct Sockets {
pub gossip: UdpSocket,
pub ip_echo: Option<TcpListener>,
pub tvu: Vec<UdpSocket>,
pub tvu_forwards: Vec<UdpSocket>,
pub tvu_quic: UdpSocket,
pub tpu: Vec<UdpSocket>,
pub tpu_forwards: Vec<UdpSocket>,
Expand Down Expand Up @@ -2836,7 +2834,6 @@ impl Node {
let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
let ((_tvu_port, tvu), (_tvu_quic_port, tvu_quic)) =
bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let tvu_forwards = UdpSocket::bind(&localhost_bind_addr).unwrap();
let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap();
Expand Down Expand Up @@ -2865,11 +2862,6 @@ impl Node {
}
set_socket!(set_gossip, gossip_addr, "gossip");
set_socket!(set_tvu, tvu.local_addr().unwrap(), "TVU");
set_socket!(
set_tvu_forwards,
tvu_forwards.local_addr().unwrap(),
"TVU-forwards"
);
set_socket!(set_repair, repair.local_addr().unwrap(), "repair");
set_socket!(set_tpu, tpu.local_addr().unwrap(), "TPU");
set_socket!(
Expand All @@ -2891,7 +2883,6 @@ impl Node {
gossip,
ip_echo: Some(ip_echo),
tvu: vec![tvu],
tvu_forwards: vec![tvu_forwards],
tvu_quic,
tpu: vec![tpu],
tpu_forwards: vec![tpu_forwards],
Expand Down Expand Up @@ -2937,7 +2928,6 @@ impl Node {
Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr);
let ((tvu_port, tvu), (_tvu_quic_port, tvu_quic)) =
bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range);
let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
Expand Down Expand Up @@ -2968,7 +2958,6 @@ impl Node {
}
set_socket!(set_gossip, gossip_port, "gossip");
set_socket!(set_tvu, tvu_port, "TVU");
set_socket!(set_tvu_forwards, tvu_forwards_port, "TVU-forwards");
set_socket!(set_repair, repair_port, "repair");
set_socket!(set_tpu, tpu_port, "TPU");
set_socket!(set_tpu_forwards, tpu_forwards_port, "TPU-forwards");
Expand All @@ -2984,7 +2973,6 @@ impl Node {
gossip,
ip_echo: Some(ip_echo),
tvu: vec![tvu],
tvu_forwards: vec![tvu_forwards],
tvu_quic,
tpu: vec![tpu],
tpu_forwards: vec![tpu_forwards],
Expand Down Expand Up @@ -3017,9 +3005,6 @@ impl Node {
bind_ip_addr,
(tvu_port + QUIC_PORT_OFFSET, tvu_port + QUIC_PORT_OFFSET + 1),
);
let (tvu_forwards_port, tvu_forwards_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tvu_forwards multi_bind");

let (tpu_port, tpu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind");

Expand Down Expand Up @@ -3061,7 +3046,6 @@ impl Node {
let addr = gossip_addr.ip();
let _ = info.set_gossip((addr, gossip_port));
let _ = info.set_tvu((addr, tvu_port));
let _ = info.set_tvu_forwards((addr, tvu_forwards_port));
let _ = info.set_repair((addr, repair_port));
let _ = info.set_tpu(public_tpu_addr.unwrap_or_else(|| SocketAddr::new(addr, tpu_port)));
let _ = info.set_tpu_forwards(
Expand All @@ -3076,7 +3060,6 @@ impl Node {
sockets: Sockets {
gossip,
tvu: tvu_sockets,
tvu_forwards: tvu_forwards_sockets,
tvu_quic,
tpu: tpu_sockets,
tpu_forwards: tpu_forwards_sockets,
Expand Down Expand Up @@ -3193,7 +3176,6 @@ mod tests {
duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS},
},
itertools::izip,
regex::Regex,
solana_ledger::shred::Shredder,
solana_net_utils::MINIMUM_VALIDATOR_PORT_RANGE_WIDTH,
solana_sdk::signature::{Keypair, Signer},
Expand Down Expand Up @@ -3224,92 +3206,6 @@ mod tests {
));
}

#[test]
fn test_cluster_info_trace() {
solana_logger::setup();
let keypair = Keypair::from_base58_string("3jATNWfbii1btv6nCpToAXAJz6a4km5HsLSWiwLfNvHNQAmvksLFVAKGUz286bXb9N4ivXx8nuwkn91PFDTyoFEp");

let node = {
let tpu = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8900);
let _tpu_quic = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8901);

let gossip = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8888);
let tvu = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8902);
let tvu_forwards = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8903);
let tpu_forwards = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8904);

let tpu_vote = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8906);
let repair = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8907);
let rpc = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8908);
let rpc_pubsub = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8909);

let serve_repair = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8910);

let mut info = ContactInfo::new(
keypair.pubkey(),
timestamp(), // wallclock
0u16, // shred_version
);
info.set_gossip(gossip).unwrap();
info.set_tvu(tvu).unwrap();
info.set_tvu_forwards(tvu_forwards).unwrap();
info.set_repair(repair).unwrap();
info.set_tpu(tpu).unwrap();
info.set_tpu_forwards(tpu_forwards).unwrap();
info.set_tpu_vote(tpu_vote).unwrap();
info.set_rpc(rpc).unwrap();
info.set_rpc_pubsub(rpc_pubsub).unwrap();
info.set_serve_repair(serve_repair).unwrap();
Node {
info,
sockets: Sockets {
gossip: UdpSocket::bind("0.0.0.0:0").unwrap(),
ip_echo: None,
tvu: vec![],
tvu_forwards: vec![],
tvu_quic: UdpSocket::bind("0.0.0.0:0").unwrap(),
tpu: vec![],
tpu_forwards: vec![],
tpu_vote: vec![],
broadcast: vec![],
repair: UdpSocket::bind("0.0.0.0:0").unwrap(),
retransmit_sockets: vec![],
serve_repair: UdpSocket::bind("0.0.0.0:0").unwrap(),
ancestor_hashes_requests: UdpSocket::bind("0.0.0.0:0").unwrap(),
tpu_quic: UdpSocket::bind("0.0.0.0:0").unwrap(),
tpu_forwards_quic: UdpSocket::bind("0.0.0.0:0").unwrap(),
},
}
};

let cluster_info = Arc::new(ClusterInfo::new(
node.info,
Arc::new(keypair),
SocketAddrSpace::Unspecified,
));

let golden = r#"
IP Address |Age(ms)| Node identifier | Version |Gossip|TPUvote| TPU |TPUfwd| TVU |TVUfwd|Repair|ServeR|ShredVer
------------------+-------+----------------------------------------------+---------+------+-------+------+------+------+------+------+------+--------
127.0.0.1 me| \d | 7fGBVaezz2YrTxAkwvLjBZpxrGEfNsd14Jxw9W5Df5zY | - | 8888 | 8906 | 8900 | 8904 | 8902 | 8903 | 8907 | 8910 | 0
Nodes: 1
RPC Address |Age(ms)| Node identifier | Version | RPC |PubSub|ShredVer
------------------+-------+----------------------------------------------+---------+------+------+--------
127.0.0.1 me| \d | 7fGBVaezz2YrTxAkwvLjBZpxrGEfNsd14Jxw9W5Df5zY | - | 8908 | 8909 | 0
RPC Enabled Nodes: 1"#;

let re = Regex::new(golden).unwrap();

let output = format!(
"\n{}\n\n{}",
cluster_info.contact_info_trace(),
cluster_info.rpc_info_trace()
);

assert!(re.is_match(&output));
}

#[test]
fn test_handle_pull() {
solana_logger::setup();
Expand Down
Loading

0 comments on commit 469661d

Please sign in to comment.