Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Partial shred deserialize cleanup and shred type differentiation (#14094
Browse files Browse the repository at this point in the history
) (#14138)

* Partial shred deserialize cleanup and shred type differentiation in retransmit

* consolidate packet hashing logic

(cherry picked from commit d4a174f)

Co-authored-by: sakridge <sakridge@gmail.com>
  • Loading branch information
mergify[bot] and sakridge authored Dec 16, 2020
1 parent fdb1c5a commit 599b22b
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 144 deletions.
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub mod local_vote_signer_service;
pub mod non_circulating_supply;
pub mod optimistic_confirmation_verifier;
pub mod optimistically_confirmed_bank_tracker;
pub mod packet_hasher;
pub mod ping_pong;
pub mod poh_recorder;
pub mod poh_service;
Expand Down
34 changes: 34 additions & 0 deletions core/src/packet_hasher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Get a unique hash value for a packet
// Used in retransmit and shred fetch to prevent dos with same packet data.

use ahash::AHasher;
use rand::{thread_rng, Rng};
use solana_perf::packet::Packet;
use std::hash::Hasher;

#[derive(Clone)]
pub struct PacketHasher {
seed1: u128,
seed2: u128,
}

impl Default for PacketHasher {
fn default() -> Self {
Self {
seed1: thread_rng().gen::<u128>(),
seed2: thread_rng().gen::<u128>(),
}
}
}

impl PacketHasher {
pub fn hash_packet(&self, packet: &Packet) -> u64 {
let mut hasher = AHasher::new_with_keys(self.seed1, self.seed2);
hasher.write(&packet.data[0..packet.meta.size]);
hasher.finish()
}

pub fn reset(&mut self) {
*self = Self::default();
}
}
136 changes: 97 additions & 39 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! The `retransmit_stage` retransmits shreds between validators
use crate::shred_fetch_stage::ShredFetchStage;
use crate::shred_fetch_stage::ShredFetchStats;
use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
cluster_info_vote_listener::VerifiedVoteReceiver,
Expand All @@ -14,25 +12,23 @@ use crate::{
result::{Error, Result},
window_service::{should_retransmit_and_persist, WindowService},
};
use ahash::AHasher;
use crossbeam_channel::Receiver;
use lru::LruCache;
use rand::{thread_rng, Rng};
use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats};
use solana_ledger::{
blockstore::{Blockstore, CompletedSlotsReceiver},
leader_schedule_cache::LeaderScheduleCache,
staking_utils,
};
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_error;
use solana_perf::packet::Packets;
use solana_perf::packet::{Packet, Packets};
use solana_runtime::bank_forks::BankForks;
use solana_sdk::clock::{Epoch, Slot};
use solana_sdk::epoch_schedule::EpochSchedule;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use solana_streamer::streamer::PacketReceiver;
use std::hash::Hasher;
use std::{
cmp,
collections::hash_set::HashSet,
Expand Down Expand Up @@ -189,7 +185,42 @@ struct EpochStakesCache {
stakes_and_index: Vec<(u64, usize)>,
}

pub type ShredFilterAndSeeds = (LruCache<(Slot, u32), Vec<u64>>, u128, u128);
use crate::packet_hasher::PacketHasher;
// Map of shred (slot, index, is_data) => list of hash values seen for that key.
pub type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>;

pub type ShredFilterAndHasher = (ShredFilter, PacketHasher);

// Return true if shred is already received and should skip retransmit
fn check_if_already_received(
packet: &Packet,
shreds_received: &Arc<Mutex<ShredFilterAndHasher>>,
) -> bool {
match get_shred_slot_index_type(packet, &mut ShredFetchStats::default()) {
Some(slot_index) => {
let mut received = shreds_received.lock().unwrap();
let hasher = received.1.clone();
if let Some(sent) = received.0.get_mut(&slot_index) {
if sent.len() < MAX_DUPLICATE_COUNT {
let hash = hasher.hash_packet(packet);
if sent.contains(&hash) {
return true;
}

sent.push(hash);
} else {
return true;
}
} else {
let hash = hasher.hash_packet(&packet);
received.0.put(slot_index, vec![hash]);
}

false
}
None => true,
}
}

#[allow(clippy::too_many_arguments)]
fn retransmit(
Expand All @@ -202,7 +233,7 @@ fn retransmit(
stats: &Arc<RetransmitStats>,
epoch_stakes_cache: &Arc<RwLock<EpochStakesCache>>,
last_peer_update: &Arc<AtomicU64>,
shreds_received: &Arc<Mutex<ShredFilterAndSeeds>>,
shreds_received: &Arc<Mutex<ShredFilterAndHasher>>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let r_lock = r.lock().unwrap();
Expand Down Expand Up @@ -254,8 +285,7 @@ fn retransmit(
{
let mut sr = shreds_received.lock().unwrap();
sr.0.clear();
sr.1 = thread_rng().gen::<u128>();
sr.2 = thread_rng().gen::<u128>();
sr.1.reset();
}
}
let mut peers_len = 0;
Expand All @@ -282,33 +312,10 @@ fn retransmit(
continue;
}

match ShredFetchStage::get_slot_index(packet, &mut ShredFetchStats::default()) {
Some(slot_index) => {
let mut received = shreds_received.lock().unwrap();
let seed1 = received.1;
let seed2 = received.2;
if let Some(sent) = received.0.get_mut(&slot_index) {
if sent.len() < MAX_DUPLICATE_COUNT {
let mut hasher = AHasher::new_with_keys(seed1, seed2);
hasher.write(&packet.data[0..packet.meta.size]);
let hash = hasher.finish();
if sent.contains(&hash) {
continue;
}

sent.push(hash);
} else {
continue;
}
} else {
let mut hasher = AHasher::new_with_keys(seed1, seed2);
hasher.write(&packet.data[0..packet.meta.size]);
let hash = hasher.finish();
received.0.put(slot_index, vec![hash]);
}
}
None => continue,
if check_if_already_received(packet, shreds_received) {
continue;
}

let mut compute_turbine_peers = Measure::start("turbine_start");
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id,
Expand Down Expand Up @@ -397,7 +404,10 @@ pub fn retransmitter(
r: Arc<Mutex<PacketReceiver>>,
) -> Vec<JoinHandle<()>> {
let stats = Arc::new(RetransmitStats::default());
let shreds_received = Arc::new(Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), 0, 0)));
let shreds_received = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
(0..sockets.len())
.map(|s| {
let sockets = sockets.clone();
Expand Down Expand Up @@ -551,6 +561,7 @@ mod tests {
use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions};
use solana_ledger::create_new_tmp_ledger;
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_ledger::shred::Shred;
use solana_net_utils::find_available_port_in_range;
use solana_perf::packet::{Packet, Packets};
use std::net::{IpAddr, Ipv4Addr};
Expand Down Expand Up @@ -599,8 +610,7 @@ mod tests {
);
let _thread_hdls = vec![t_retransmit];

let mut shred =
solana_ledger::shred::Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
let mut shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
let mut packet = Packet::default();
shred.copy_to_packet(&mut packet);

Expand All @@ -625,4 +635,52 @@ mod tests {
assert_eq!(packets.packets.len(), 1);
assert_eq!(packets.packets[0].meta.repair, false);
}

#[test]
fn test_already_received() {
let mut packet = Packet::default();
let slot = 1;
let index = 5;
let version = 0x40;
let shred = Shred::new_from_data(slot, index, 0, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet);
let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default())));
// unique shred for (1, 5) should pass
assert!(!check_if_already_received(&packet, &shreds_received));
// duplicate shred for (1, 5) blocked
assert!(check_if_already_received(&packet, &shreds_received));

let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet);
// first duplicate shred for (1, 5) passed
assert!(!check_if_already_received(&packet, &shreds_received));
// then blocked
assert!(check_if_already_received(&packet, &shreds_received));

let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet);
// 2nd duplicate shred for (1, 5) blocked
assert!(check_if_already_received(&packet, &shreds_received));
assert!(check_if_already_received(&packet, &shreds_received));

let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, 0, version);
shred.copy_to_packet(&mut packet);
// Coding at (1, 5) passes
assert!(!check_if_already_received(&packet, &shreds_received));
// then blocked
assert!(check_if_already_received(&packet, &shreds_received));

let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, 0, version);
shred.copy_to_packet(&mut packet);
// 2nd unique coding at (1, 5) passes
assert!(!check_if_already_received(&packet, &shreds_received));
// same again is blocked
assert!(check_if_already_received(&packet, &shreds_received));

let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, 0, version);
shred.copy_to_packet(&mut packet);
// Another unique coding at (1, 5) always blocked
assert!(check_if_already_received(&packet, &shreds_received));
assert!(check_if_already_received(&packet, &shreds_received));
}
}
Loading

0 comments on commit 599b22b

Please sign in to comment.