From c2391dfefb137e6135fb3374890c2d9873d38e61 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 3 Jun 2021 12:50:57 -0400 Subject: [PATCH] removes port-based forwarding logic from turbine retransmit Turbine retransmit logic is based on which socket it received the packet from (i.e `packet.meta.forward`): https://github.com/solana-labs/solana/blob/708bbcb00/core/src/retransmit_stage.rs#L467-L470 This can leave the cluster vulnerable to spoofing and selective propagation of packets; see https://github.com/solana-labs/solana/issues/6672 https://github.com/solana-labs/solana/pull/7774 This commit identifies if the node is on the "critical path" based on its index in the shuffled cluster. If so, it forwards the packet to both neighbors and children; otherwise, the packet is only forwarded to the children. The metrics added in https://github.com/solana-labs/solana/pull/17351 shows that the number of times the index does not match the port is very rare, and therefore this change should be safe. --- core/benches/retransmit_stage.rs | 7 ++++++- core/src/retransmit_stage.rs | 28 ++++++++++++++++++++++------ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 5d225560718def..63f762a2fbf0c6 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -39,7 +39,12 @@ fn bench_retransmitter(bencher: &mut Bencher) { const NUM_PEERS: usize = 4; let mut peer_sockets = Vec::new(); for _ in 0..NUM_PEERS { - let id = pubkey::new_rand(); + // This ensures that cluster_info.id() is the root of turbine + // retransmit tree and so the shreds are retransmited to all other + // nodes in the cluster. + let id = std::iter::repeat_with(pubkey::new_rand) + .find(|pk| cluster_info.id() < *pk) + .unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); contact_info.tvu = socket.local_addr().unwrap(); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 137d0873aebf34..499a8f28aebdb1 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -428,7 +428,9 @@ fn retransmit( // neighborhood), then we expect that the packet arrives at tvu socket // as opposed to tvu-forwards. If this is not the case, then the // turbine broadcast/retransmit tree is mismatched across nodes. - if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) { + let anchor_node = my_index % DATA_PLANE_FANOUT == 0; + if packet.meta.forward == anchor_node { + // TODO: Consider forwarding the packet to the root node here. retransmit_tree_mismatch += 1; } peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); @@ -464,10 +466,19 @@ fn retransmit( .or_default() += 1; let mut retransmit_time = Measure::start("retransmit_to"); - if !packet.meta.forward { - ClusterInfo::retransmit_to(&neighbors, packet, sock, true); + // If the node is on the critical path (i.e. the first node in each + // neighborhood), it should send the packet to tvu socket of its + // children and also tvu_forward socket of its neighbors. Otherwise it + // should only forward to tvu_forward socket of its children. + if anchor_node { + ClusterInfo::retransmit_to(&neighbors, packet, sock, /*forward socket=*/ true); } - ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward); + ClusterInfo::retransmit_to( + &children, + packet, + sock, + !anchor_node, // send to forward socket! + ); retransmit_time.stop(); retransmit_total += retransmit_time.as_us(); } @@ -726,8 +737,13 @@ mod tests { .unwrap() .local_addr() .unwrap(); - - let other = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + // This fixes the order of nodes returned by shuffle_peers_and_index, + // and makes turbine retransmit tree deterministic for the purpose of + // the test. + let other = std::iter::repeat_with(solana_sdk::pubkey::new_rand) + .find(|pk| me.id < *pk) + .unwrap(); + let other = ContactInfo::new_localhost(&other, 0); let cluster_info = ClusterInfo::new_with_invalid_keypair(other); cluster_info.insert_info(me);