From 9f70456a5c6bc8cf743cdda723b85920a42b354f Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 12 Dec 2024 18:11:18 +0000 Subject: [PATCH] uses SipHasher24 for gossip ping tokens (#3974) The commit uses SipHasher24 for gossip ping tokens where keys are refreshed every 1 minute. --- Cargo.lock | 2 + core/src/banking_trace.rs | 5 - core/src/repair/ancestor_hashes_service.rs | 9 +- core/src/repair/serve_repair.rs | 45 ++--- gossip/Cargo.toml | 2 + gossip/src/cluster_info.rs | 42 ++-- gossip/src/crds_gossip.rs | 8 +- gossip/src/crds_gossip_pull.rs | 37 ++-- gossip/src/crds_gossip_push.rs | 5 +- gossip/src/ping_pong.rs | 224 +++++++++++---------- gossip/src/protocol.rs | 8 +- gossip/tests/crds_gossip.rs | 5 +- programs/sbf/Cargo.lock | 2 + svm/examples/Cargo.lock | 2 + 14 files changed, 195 insertions(+), 201 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37dd503a7e6ea3..042ea8f0e60bca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7299,9 +7299,11 @@ dependencies = [ "rand_chacha 0.3.1", "rayon", "serde", + "serde-big-array", "serde_bytes", "serde_derive", "serial_test", + "siphasher", "solana-bloom", "solana-clap-utils", "solana-client", diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index 6e0797c8c3842f..f9d560f5d17087 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -62,11 +62,6 @@ pub struct BankingTracer { active_tracer: Option, } -#[cfg_attr( - feature = "frozen-abi", - derive(AbiExample), - frozen_abi(digest = "6PCDw6YSEivfbwhbPmE4NAsXb88ZX6hkFnruP8B38nma") -)] #[derive(Serialize, Deserialize, Debug)] pub struct TimedTracedEvent(pub std::time::SystemTime, pub TracedEvent); diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index 4ca7f1bae080f4..97b8cd865adbc6 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -15,7 +15,6 @@ use { replay_stage::DUPLICATE_THRESHOLD, shred_fetch_stage::receive_quic_datagrams, }, - bincode::serialize, bytes::Bytes, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, dashmap::{mapref::entry::Entry::Occupied, DashMap}, @@ -454,11 +453,9 @@ impl AncestorHashesService { return None; } stats.ping_count += 1; - if let Ok(pong) = Pong::new(&ping, keypair) { - let pong = RepairProtocol::Pong(pong); - if let Ok(pong_bytes) = serialize(&pong) { - let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr); - } + let pong = RepairProtocol::Pong(Pong::new(&ping, keypair)); + if let Ok(pong) = bincode::serialize(&pong) { + let _ = ancestor_socket.send_to(&pong, from_addr); } None } diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index 1e6b7635a0f5a6..8271e670d9ace0 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -21,7 +21,7 @@ use { solana_gossip::{ cluster_info::{ClusterInfo, ClusterInfoError}, contact_info::{ContactInfo, Protocol}, - ping_pong::{self, PingCache, Pong}, + ping_pong::{self, Pong}, weighted_shuffle::WeightedShuffle, }, solana_ledger::{ @@ -81,7 +81,7 @@ pub const MAX_ANCESTOR_BYTES_IN_PACKET: usize = pub const MAX_ANCESTOR_RESPONSES: usize = MAX_ANCESTOR_BYTES_IN_PACKET / std::mem::size_of::<(Slot, Hash)>(); /// Number of bytes in the randomly generated token sent with ping messages. -pub(crate) const REPAIR_PING_TOKEN_SIZE: usize = HASH_BYTES; +const REPAIR_PING_TOKEN_SIZE: usize = HASH_BYTES; pub const REPAIR_PING_CACHE_CAPACITY: usize = 65536; pub const REPAIR_PING_CACHE_TTL: Duration = Duration::from_secs(1280); const REPAIR_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(2); @@ -141,11 +141,6 @@ impl AncestorHashesRepairType { } } -#[cfg_attr( - feature = "frozen-abi", - derive(AbiEnumVisitor, AbiExample), - frozen_abi(digest = "GPS6e6pgUdbXLwXN6XHTqrUVMwAL2YKLPDawgMi5hHzi") -)] #[derive(Debug, Deserialize, Serialize)] pub enum AncestorHashesResponse { Hashes(Vec<(Slot, Hash)>), @@ -219,7 +214,8 @@ impl RepairRequestHeader { } } -pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>; +type Ping = ping_pong::Ping; +type PingCache = ping_pong::PingCache; /// Window protocol messages #[cfg_attr( @@ -270,11 +266,6 @@ fn discard_malformed_repair_requests( requests.len() } -#[cfg_attr( - feature = "frozen-abi", - derive(AbiEnumVisitor, AbiExample), - frozen_abi(digest = "9A6ae44qpdT7PaxiDZbybMM2mewnSnPs3C4CxhpbbYuV") -)] #[derive(Debug, Deserialize, Serialize)] pub(crate) enum RepairResponse { Ping(Ping), @@ -824,6 +815,8 @@ impl ServeRepair { assert!(REPAIR_PING_CACHE_RATE_LIMIT_DELAY > Duration::from_millis(REPAIR_MS)); let mut ping_cache = PingCache::new( + &mut rand::thread_rng(), + Instant::now(), REPAIR_PING_CACHE_TTL, REPAIR_PING_CACHE_RATE_LIMIT_DELAY, REPAIR_PING_CACHE_CAPACITY, @@ -924,10 +917,16 @@ impl ServeRepair { identity_keypair: &Keypair, ) -> (bool, Option) { let mut rng = rand::thread_rng(); - let mut pingf = move || Ping::new_rand(&mut rng, identity_keypair).ok(); let (check, ping) = request .sender() - .map(|&sender| ping_cache.check(Instant::now(), (sender, *from_addr), &mut pingf)) + .map(|&sender| { + ping_cache.check( + &mut rng, + identity_keypair, + Instant::now(), + (sender, *from_addr), + ) + }) .unwrap_or_default(); let ping_pkt = if let Some(ping) = ping { match request { @@ -1232,12 +1231,10 @@ impl ServeRepair { } packet.meta_mut().set_discard(true); stats.ping_count += 1; - if let Ok(pong) = Pong::new(&ping, keypair) { - let pong = RepairProtocol::Pong(pong); - if let Ok(pong_bytes) = serialize(&pong) { - let from_addr = packet.meta().socket_addr(); - pending_pongs.push((pong_bytes, from_addr)); - } + let pong = RepairProtocol::Pong(Pong::new(&ping, keypair)); + if let Ok(pong) = bincode::serialize(&pong) { + let from_addr = packet.meta().socket_addr(); + pending_pongs.push((pong, from_addr)); } } } @@ -1462,7 +1459,7 @@ mod tests { fn test_serialized_ping_size() { let mut rng = rand::thread_rng(); let keypair = Keypair::new(); - let ping = Ping::new_rand(&mut rng, &keypair).unwrap(); + let ping = Ping::new(rng.gen(), &keypair); let ping = RepairResponse::Ping(ping); let pkt = Packet::from_data(None, ping).unwrap(); assert_eq!(pkt.meta().size, REPAIR_RESPONSE_SERIALIZED_PING_BYTES); @@ -1516,8 +1513,8 @@ mod tests { fn test_check_well_formed_repair_request() { let mut rng = rand::thread_rng(); let keypair = Keypair::new(); - let ping = ping_pong::Ping::<[u8; 32]>::new_rand(&mut rng, &keypair).unwrap(); - let pong = Pong::new(&ping, &keypair).unwrap(); + let ping = Ping::new(rng.gen(), &keypair); + let pong = Pong::new(&ping, &keypair); let request = RepairProtocol::Pong(pong); let mut pkt = Packet::from_data(None, request).unwrap(); let mut batch = vec![make_remote_request(&pkt)]; diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 126d0bbe7efc4d..9d2a0adbe5b39e 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -25,8 +25,10 @@ rand = { workspace = true } rand_chacha = { workspace = true } rayon = { workspace = true } serde = { workspace = true } +serde-big-array = { workspace = true } serde_bytes = { workspace = true } serde_derive = { workspace = true } +siphasher = { workspace = true } solana-bloom = { workspace = true } solana-clap-utils = { workspace = true } solana-client = { workspace = true } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index a1ec2f21915d90..9f23169c55f512 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -35,11 +35,11 @@ use { epoch_slots::EpochSlots, gossip_error::GossipError, legacy_contact_info::LegacyContactInfo, - ping_pong::{PingCache, Pong}, + ping_pong::Pong, protocol::{ - split_gossip_messages, Ping, Protocol, PruneData, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, - MAX_INCREMENTAL_SNAPSHOT_HASHES, MAX_PRUNE_DATA_NODES, - PULL_RESPONSE_MIN_SERIALIZED_SIZE, PUSH_MESSAGE_MAX_PAYLOAD_SIZE, + split_gossip_messages, Ping, PingCache, Protocol, PruneData, + DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, MAX_INCREMENTAL_SNAPSHOT_HASHES, + MAX_PRUNE_DATA_NODES, PULL_RESPONSE_MIN_SERIALIZED_SIZE, PUSH_MESSAGE_MAX_PAYLOAD_SIZE, }, restart_crds_values::{ RestartHeaviestFork, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError, @@ -217,6 +217,8 @@ impl ClusterInfo { outbound_budget: DataBudget::default(), my_contact_info: RwLock::new(contact_info), ping_cache: Mutex::new(PingCache::new( + &mut rand::thread_rng(), + Instant::now(), GOSSIP_PING_CACHE_TTL, GOSSIP_PING_CACHE_RATE_LIMIT_DELAY, GOSSIP_PING_CACHE_CAPACITY, @@ -1729,24 +1731,19 @@ impl ClusterInfo { // Returns a predicate checking if the pull request is from a valid // address, and if the address have responded to a ping request. Also // appends ping packets for the addresses which need to be (re)verified. - // - // allow lint false positive trait bound requirement (`CryptoRng` only - // implemented on `&'a mut T` - #[allow(clippy::needless_pass_by_ref_mut)] fn check_pull_request<'a, R>( &'a self, now: Instant, - mut rng: &'a mut R, + rng: &'a mut R, packet_batch: &'a mut PacketBatch, ) -> impl FnMut(&PullData) -> bool + 'a where R: Rng + CryptoRng, { let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new(); - let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair()).ok(); let mut ping_cache = self.ping_cache.lock().unwrap(); let mut hard_check = move |node| { - let (check, ping) = ping_cache.check(now, node, &mut pingf); + let (check, ping) = ping_cache.check(rng, &self.keypair(), now, node); if let Some(ping) = ping { let ping = Protocol::PingMessage(ping); match Packet::from_data(Some(&node.1), ping) { @@ -1964,10 +1961,9 @@ impl ClusterInfo { let keypair = self.keypair(); let pongs_and_dests: Vec<_> = pings .into_iter() - .filter_map(|(addr, ping)| { - let pong = Pong::new(&ping, &keypair).ok()?; - let pong = Protocol::PongMessage(pong); - Some((addr, pong)) + .map(|(addr, ping)| { + let pong = Pong::new(&ping, &keypair); + (addr, Protocol::PongMessage(pong)) }) .collect(); if pongs_and_dests.is_empty() { @@ -3110,9 +3106,8 @@ fn verify_gossip_addr( }; let (out, ping) = { let node = (*pubkey, addr); - let mut pingf = move || Ping::new_rand(rng, keypair).ok(); let mut ping_cache = ping_cache.lock().unwrap(); - ping_cache.check(Instant::now(), node, &mut pingf) + ping_cache.check(rng, keypair, Instant::now(), node) }; if let Some(ping) = ping { pings.push((addr, Protocol::PingMessage(ping))); @@ -3209,12 +3204,11 @@ mod tests { .collect(); let pings: Vec<_> = { let mut ping_cache = cluster_info.ping_cache.lock().unwrap(); - let mut pingf = || Ping::new_rand(&mut rng, &this_node).ok(); remote_nodes .iter() .map(|(keypair, socket)| { let node = (keypair.pubkey(), *socket); - let (check, ping) = ping_cache.check(now, node, &mut pingf); + let (check, ping) = ping_cache.check(&mut rng, &this_node, now, node); // Assert that initially remote nodes will not pass the // ping/pong check. assert!(!check); @@ -3225,7 +3219,7 @@ mod tests { let pongs: Vec<(SocketAddr, Pong)> = pings .iter() .zip(&remote_nodes) - .map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair).unwrap())) + .map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair))) .collect(); let now = now + Duration::from_millis(1); cluster_info.handle_batch_pong_messages(pongs, now); @@ -3234,7 +3228,7 @@ mod tests { let mut ping_cache = cluster_info.ping_cache.lock().unwrap(); for (keypair, socket) in &remote_nodes { let node = (keypair.pubkey(), *socket); - let (check, _) = ping_cache.check(now, node, || -> Option { None }); + let (check, _) = ping_cache.check(&mut rng, &this_node, now, node); assert!(check); } } @@ -3243,7 +3237,7 @@ mod tests { let mut ping_cache = cluster_info.ping_cache.lock().unwrap(); let (keypair, socket) = new_rand_remote_node(&mut rng); let node = (keypair.pubkey(), socket); - let (check, _) = ping_cache.check(now, node, || -> Option { None }); + let (check, _) = ping_cache.check(&mut rng, &this_node, now, node); assert!(!check); } } @@ -3263,11 +3257,11 @@ mod tests { .collect(); let pings: Vec<_> = remote_nodes .iter() - .map(|(keypair, _)| Ping::new_rand(&mut rng, keypair).unwrap()) + .map(|(keypair, _)| Ping::new(rng.gen(), keypair)) .collect(); let pongs: Vec<_> = pings .iter() - .map(|ping| Pong::new(ping, &this_node).unwrap()) + .map(|ping| Pong::new(ping, &this_node)) .collect(); let recycler = PacketBatchRecycler::default(); let packets = cluster_info diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 40ecb65771184d..36795f6720d04d 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -15,8 +15,7 @@ use { crds_gossip_push::CrdsGossipPush, crds_value::CrdsValue, duplicate_shred::{self, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, - ping_pong::PingCache, - protocol::Ping, + protocol::{Ping, PingCache}, }, itertools::Itertools, rand::{CryptoRng, Rng}, @@ -386,7 +385,6 @@ pub(crate) fn maybe_ping_gossip_addresses( pings: &mut Vec<(SocketAddr, Ping)>, ) -> Vec { let mut ping_cache = ping_cache.lock().unwrap(); - let mut pingf = move || Ping::new_rand(rng, keypair).ok(); let now = Instant::now(); nodes .into_iter() @@ -396,7 +394,7 @@ pub(crate) fn maybe_ping_gossip_addresses( }; let (check, ping) = { let node = (*node.pubkey(), node_gossip); - ping_cache.check(now, node, &mut pingf) + ping_cache.check(rng, keypair, now, node) }; if let Some(ping) = ping { pings.push((node_gossip, ping)); @@ -431,6 +429,8 @@ mod test { ) .unwrap(); let ping_cache = PingCache::new( + &mut rand::thread_rng(), + Instant::now(), Duration::from_secs(20 * 60), // ttl Duration::from_secs(20 * 60) / 64, // rate_limit_delay 128, // capacity diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index f650d91497fbb5..1f6ad65b432c8f 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -19,8 +19,7 @@ use { crds_gossip, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, - ping_pong::PingCache, - protocol::Ping, + protocol::{Ping, PingCache}, }, itertools::Itertools, rand::{ @@ -679,6 +678,16 @@ pub(crate) mod tests { #[cfg(not(debug_assertions))] pub(crate) const MIN_NUM_BLOOM_FILTERS: usize = 64; + fn new_ping_cache() -> PingCache { + PingCache::new( + &mut rand::thread_rng(), + Instant::now(), + Duration::from_secs(20 * 60), // ttl + Duration::from_secs(20 * 60) / 64, // rate_limit_delay + 128, // capacity + ) + } + #[test] fn test_hash_as_u64() { let arr: [u8; HASH_BYTES] = std::array::from_fn(|i| i as u8 + 1); @@ -851,11 +860,7 @@ pub(crate) mod tests { ))); let node = CrdsGossipPull::default(); let mut pings = Vec::new(); - let ping_cache = Mutex::new(PingCache::new( - Duration::from_secs(20 * 60), // ttl - Duration::from_secs(20 * 60) / 64, // rate_limit_delay - 128, // capacity - )); + let ping_cache = Mutex::new(new_ping_cache()); assert_eq!( node.new_pull_request( &thread_pool, @@ -949,11 +954,7 @@ pub(crate) mod tests { fn test_new_mark_creation_time() { let now: u64 = 1_605_127_770_789; let thread_pool = ThreadPoolBuilder::new().build().unwrap(); - let mut ping_cache = PingCache::new( - Duration::from_secs(20 * 60), // ttl - Duration::from_secs(20 * 60) / 64, // rate_limit_delay - 128, // capacity - ); + let mut ping_cache = new_ping_cache(); let mut crds = Crds::default(); let node_keypair = Keypair::new(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -1011,11 +1012,7 @@ pub(crate) mod tests { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let node_keypair = Keypair::new(); let mut node_crds = Crds::default(); - let mut ping_cache = PingCache::new( - Duration::from_secs(20 * 60), // ttl - Duration::from_secs(20 * 60) / 64, // rate_limit_delay - 128, // capacity - ); + let mut ping_cache = new_ping_cache(); let now = timestamp(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &node_keypair.pubkey(), @@ -1125,11 +1122,7 @@ pub(crate) mod tests { node_crds .insert(entry, 0, GossipRoute::LocalMessage) .unwrap(); - let mut ping_cache = PingCache::new( - Duration::from_secs(20 * 60), // ttl - Duration::from_secs(20 * 60) / 64, // rate_limit_delay - 128, // capacity - ); + let mut ping_cache = new_ping_cache(); let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1); ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 4dd0f4e06cc4c1..43c500cbdcfd4d 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -17,8 +17,7 @@ use { crds::{Crds, CrdsError, Cursor, GossipRoute}, crds_gossip, crds_value::CrdsValue, - ping_pong::PingCache, - protocol::Ping, + protocol::{Ping, PingCache}, push_active_set::PushActiveSet, received_cache::ReceivedCache, }, @@ -287,6 +286,8 @@ mod tests { fn new_ping_cache() -> PingCache { PingCache::new( + &mut rand::thread_rng(), + Instant::now(), Duration::from_secs(20 * 60), // ttl Duration::from_secs(20 * 60) / 64, // rate_limit_delay 128, // capacity diff --git a/gossip/src/ping_pong.rs b/gossip/src/ping_pong.rs index 56b75862927e80..7fef7c93918d81 100644 --- a/gossip/src/ping_pong.rs +++ b/gossip/src/ping_pong.rs @@ -1,28 +1,34 @@ use { - bincode::{serialize, Error}, lru::LruCache, - rand::{CryptoRng, Fill, Rng}, - serde::Serialize, + rand::{CryptoRng, Rng}, + serde_big_array::BigArray, + siphasher::sip::SipHasher24, solana_sanitize::{Sanitize, SanitizeError}, solana_sdk::{ - hash::{self, Hash}, + hash::Hash, pubkey::Pubkey, signature::{Keypair, Signable, Signature, Signer}, }, std::{ borrow::Cow, + hash::{Hash as _, Hasher}, net::SocketAddr, time::{Duration, Instant}, }, }; +const KEY_REFRESH_CADENCE: Duration = Duration::from_secs(60); const PING_PONG_HASH_PREFIX: &[u8] = "SOLANA_PING_PONG".as_bytes(); +// For backward compatibility we are using a const generic parameter here. +// N should always be >= 8 and only the first 8 bytes are used. So the new code +// should only use N == 8. #[cfg_attr(feature = "frozen-abi", derive(AbiExample))] #[derive(Debug, Deserialize, Serialize)] -pub struct Ping { +pub struct Ping { from: Pubkey, - token: T, + #[serde(with = "BigArray")] + token: [u8; N], signature: Signature, } @@ -37,48 +43,37 @@ pub struct Pong { /// Maintains records of remote nodes which have returned a valid response to a /// ping message, and on-the-fly ping messages pending a pong response from the /// remote node. -pub struct PingCache { +/// Const generic parameter N corresponds to token size in Ping type. +pub struct PingCache { // Time-to-live of received pong messages. ttl: Duration, // Rate limit delay to generate pings for a given address rate_limit_delay: Duration, + // Hashers initialized with random keys, rotated at KEY_REFRESH_CADENCE. + // Because at the moment that the keys are rotated some pings might already + // be in the flight, we need to keep the two most recent hashers. + hashers: [SipHasher24; 2], + // When hashers were last refreshed. + key_refresh: Instant, // Timestamp of last ping message sent to a remote node. // Used to rate limit pings to remote nodes. pings: LruCache<(Pubkey, SocketAddr), Instant>, // Verified pong responses from remote nodes. pongs: LruCache<(Pubkey, SocketAddr), Instant>, - // Hash of ping tokens sent out to remote nodes, - // pending a pong response back. - pending_cache: LruCache, } -impl Ping { - pub fn new(token: T, keypair: &Keypair) -> Result { - let signature = keypair.sign_message(&serialize(&token)?); - let ping = Ping { +impl Ping { + pub fn new(token: [u8; N], keypair: &Keypair) -> Self { + let signature = keypair.sign_message(&token); + Ping { from: keypair.pubkey(), token, signature, - }; - Ok(ping) - } -} - -impl Ping -where - T: Serialize + Fill + Default, -{ - pub fn new_rand(rng: &mut R, keypair: &Keypair) -> Result - where - R: Rng + CryptoRng, - { - let mut token = T::default(); - rng.fill(&mut token); - Ping::new(token, keypair) + } } } -impl Sanitize for Ping { +impl Sanitize for Ping { fn sanitize(&self) -> Result<(), SanitizeError> { self.from.sanitize()?; // TODO Add self.token.sanitize()?; when rust's @@ -87,15 +82,18 @@ impl Sanitize for Ping { } } -impl Signable for Ping { +impl Signable for Ping { + #[inline] fn pubkey(&self) -> Pubkey { self.from } + #[inline] fn signable_data(&self) -> Cow<[u8]> { - Cow::Owned(serialize(&self.token).unwrap()) + Cow::Borrowed(&self.token) } + #[inline] fn get_signature(&self) -> Signature { self.signature } @@ -106,15 +104,13 @@ impl Signable for Ping { } impl Pong { - pub fn new(ping: &Ping, keypair: &Keypair) -> Result { - let token = serialize(&ping.token)?; - let hash = hash::hashv(&[PING_PONG_HASH_PREFIX, &token]); - let pong = Pong { + pub fn new(ping: &Ping, keypair: &Keypair) -> Self { + let hash = hash_ping_token(&ping.token); + Pong { from: keypair.pubkey(), hash, signature: keypair.sign_message(hash.as_ref()), - }; - Ok(pong) + } } pub fn from(&self) -> &Pubkey { @@ -148,16 +144,23 @@ impl Signable for Pong { } } -impl PingCache { - pub fn new(ttl: Duration, rate_limit_delay: Duration, cap: usize) -> Self { +impl PingCache { + pub fn new( + rng: &mut R, + now: Instant, + ttl: Duration, + rate_limit_delay: Duration, + cap: usize, + ) -> Self { // Sanity check ttl/rate_limit_delay assert!(rate_limit_delay <= ttl / 2); Self { ttl, rate_limit_delay, + hashers: std::array::from_fn(|_| SipHasher24::new_with_key(&rng.gen())), + key_refresh: now, pings: LruCache::new(cap), pongs: LruCache::new(cap), - pending_cache: LruCache::new(cap), } } @@ -166,43 +169,37 @@ impl PingCache { /// returns true. /// Note: Does not verify the signature. pub fn add(&mut self, pong: &Pong, socket: SocketAddr, now: Instant) -> bool { - let node = (pong.pubkey(), socket); - match self.pending_cache.peek(&pong.hash) { - Some(value) if *value == node => { - self.pings.pop(&node); - self.pongs.put(node, now); - self.pending_cache.pop(&pong.hash); - true - } - _ => false, - } + let remote_node = (pong.pubkey(), socket); + if !self.hashers.iter().copied().any(|hasher| { + let token = make_ping_token::(hasher, &remote_node); + hash_ping_token(&token) == pong.hash + }) { + return false; + }; + self.pongs.put(remote_node, now); + true } /// Checks if the remote node has been pinged recently. If not, calls the /// given function to generates a new ping message, records current /// timestamp and hash of ping token, and returns the ping message. - fn maybe_ping( + fn maybe_ping( &mut self, + rng: &mut R, + keypair: &Keypair, now: Instant, - node: (Pubkey, SocketAddr), - mut pingf: F, - ) -> Option> - where - T: Serialize, - F: FnMut() -> Option>, - { - match self.pings.peek(&node) { - // Rate limit consecutive pings sent to a remote node. - Some(t) if now.saturating_duration_since(*t) < self.rate_limit_delay => None, - _ => { - let ping = pingf()?; - let token = serialize(&ping.token).ok()?; - let hash = hash::hashv(&[PING_PONG_HASH_PREFIX, &token]); - self.pending_cache.put(hash, node); - self.pings.put(node, now); - Some(ping) - } + remote_node: (Pubkey, SocketAddr), + ) -> Option> { + // Rate limit consecutive pings sent to a remote node. + if matches!(self.pings.peek(&remote_node), + Some(&t) if now.saturating_duration_since(t) < self.rate_limit_delay) + { + return None; } + self.pings.put(remote_node, now); + self.maybe_refresh_key(rng, now); + let token = make_ping_token::(self.hashers[0], &remote_node); + Some(Ping::new(token, keypair)) } /// Returns true if the remote node has responded to a ping message. @@ -213,43 +210,63 @@ impl PingCache { /// the ping message. /// Caller should verify if the socket address is valid. (e.g. by using /// ContactInfo::is_valid_address). - pub fn check( + pub fn check( &mut self, + rng: &mut R, + keypair: &Keypair, now: Instant, - node: (Pubkey, SocketAddr), - pingf: F, - ) -> (bool, Option>) - where - T: Serialize, - F: FnMut() -> Option>, - { - let (check, should_ping) = match self.pongs.get(&node) { + remote_node: (Pubkey, SocketAddr), + ) -> (bool, Option>) { + let (check, should_ping) = match self.pongs.get(&remote_node) { None => (false, true), Some(t) => { let age = now.saturating_duration_since(*t); // Pop if the pong message has expired. if age > self.ttl { - self.pongs.pop(&node); + self.pongs.pop(&remote_node); } // If the pong message is not too recent, generate a new ping // message to extend remote node verification. (true, age > self.ttl / 8) } }; - let ping = if should_ping { - self.maybe_ping(now, node, pingf) - } else { - None - }; + let ping = should_ping + .then(|| self.maybe_ping(rng, keypair, now, remote_node)) + .flatten(); (check, ping) } + fn maybe_refresh_key(&mut self, rng: &mut R, now: Instant) { + if now.checked_duration_since(self.key_refresh) > Some(KEY_REFRESH_CADENCE) { + let hasher = SipHasher24::new_with_key(&rng.gen()); + self.hashers[1] = std::mem::replace(&mut self.hashers[0], hasher); + self.key_refresh = now; + } + } + /// Only for tests and simulations. pub fn mock_pong(&mut self, node: Pubkey, socket: SocketAddr, now: Instant) { self.pongs.put((node, socket), now); } } +fn make_ping_token( + mut hasher: SipHasher24, + remote_node: &(Pubkey, SocketAddr), +) -> [u8; N] { + // TODO: Consider including local node's (pubkey, socket-addr). + remote_node.hash(&mut hasher); + let hash = hasher.finish().to_le_bytes(); + debug_assert!(N >= std::mem::size_of::()); + let mut token = [0u8; N]; + token[..std::mem::size_of::()].copy_from_slice(&hash); + token +} + +fn hash_ping_token(token: &[u8; N]) -> Hash { + solana_sdk::hash::hashv(&[PING_PONG_HASH_PREFIX, token]) +} + #[cfg(test)] mod tests { use { @@ -261,21 +278,19 @@ mod tests { }, }; - type Token = [u8; 32]; - #[test] fn test_ping_pong() { let mut rng = rand::thread_rng(); let keypair = Keypair::new(); - let ping = Ping::::new_rand(&mut rng, &keypair).unwrap(); + let ping = Ping::<32>::new(rng.gen(), &keypair); assert!(ping.verify()); assert!(ping.sanitize().is_ok()); - let pong = Pong::new(&ping, &keypair).unwrap(); + let pong = Pong::new(&ping, &keypair); assert!(pong.verify()); assert!(pong.sanitize().is_ok()); assert_eq!( - hash::hashv(&[PING_PONG_HASH_PREFIX, &ping.token]), + solana_sdk::hash::hashv(&[PING_PONG_HASH_PREFIX, &ping.token]), pong.hash ); } @@ -286,7 +301,7 @@ mod tests { let mut rng = rand::thread_rng(); let ttl = Duration::from_millis(256); let delay = ttl / 64; - let mut cache = PingCache::new(ttl, delay, /*cap=*/ 1000); + let mut cache = PingCache::new(&mut rng, Instant::now(), ttl, delay, /*cap=*/ 1000); let this_node = Keypair::new(); let keypairs: Vec<_> = repeat_with(Keypair::new).take(8).collect(); let sockets: Vec<_> = repeat_with(|| { @@ -308,12 +323,11 @@ mod tests { // Initially all checks should fail. The first observation of each node // should create a ping packet. let mut seen_nodes = HashSet::<(Pubkey, SocketAddr)>::new(); - let pings: Vec>> = remote_nodes + let pings: Vec>> = remote_nodes .iter() .map(|(keypair, socket)| { let node = (keypair.pubkey(), *socket); - let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); - let (check, ping) = cache.check(now, node, pingf); + let (check, ping) = cache.check(&mut rng, &this_node, now, node); assert!(!check); assert_eq!(seen_nodes.insert(node), ping.is_some()); ping @@ -321,19 +335,18 @@ mod tests { .collect(); let now = now + Duration::from_millis(1); - let panic_ping = || -> Option> { panic!("this should not happen!") }; for ((keypair, socket), ping) in remote_nodes.iter().zip(&pings) { match ping { None => { // Already have a recent ping packets for nodes, so no new // ping packet will be generated. let node = (keypair.pubkey(), *socket); - let (check, ping) = cache.check(now, node, panic_ping); + let (check, ping) = cache.check(&mut rng, &this_node, now, node); assert!(check); assert!(ping.is_none()); } Some(ping) => { - let pong = Pong::new(ping, keypair).unwrap(); + let pong = Pong::new(ping, keypair); assert!(cache.add(&pong, *socket, now)); } } @@ -343,7 +356,7 @@ mod tests { // All nodes now have a recent pong packet. for (keypair, socket) in &remote_nodes { let node = (keypair.pubkey(), *socket); - let (check, ping) = cache.check(now, node, panic_ping); + let (check, ping) = cache.check(&mut rng, &this_node, now, node); assert!(check); assert!(ping.is_none()); } @@ -354,8 +367,7 @@ mod tests { seen_nodes.clear(); for (keypair, socket) in &remote_nodes { let node = (keypair.pubkey(), *socket); - let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); - let (check, ping) = cache.check(now, node, pingf); + let (check, ping) = cache.check(&mut rng, &this_node, now, node); assert!(check); assert_eq!(seen_nodes.insert(node), ping.is_some()); } @@ -365,7 +377,7 @@ mod tests { // packet pending response. So no new ping packet will be created. for (keypair, socket) in &remote_nodes { let node = (keypair.pubkey(), *socket); - let (check, ping) = cache.check(now, node, panic_ping); + let (check, ping) = cache.check(&mut rng, &this_node, now, node); assert!(check); assert!(ping.is_none()); } @@ -377,8 +389,7 @@ mod tests { seen_nodes.clear(); for (keypair, socket) in &remote_nodes { let node = (keypair.pubkey(), *socket); - let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); - let (check, ping) = cache.check(now, node, pingf); + let (check, ping) = cache.check(&mut rng, &this_node, now, node); if seen_nodes.insert(node) { assert!(check); assert!(ping.is_some()); @@ -393,7 +404,7 @@ mod tests { // created, so no new one will be created. for (keypair, socket) in &remote_nodes { let node = (keypair.pubkey(), *socket); - let (check, ping) = cache.check(now, node, panic_ping); + let (check, ping) = cache.check(&mut rng, &this_node, now, node); assert!(!check); assert!(ping.is_none()); } @@ -404,8 +415,7 @@ mod tests { seen_nodes.clear(); for (keypair, socket) in &remote_nodes { let node = (keypair.pubkey(), *socket); - let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); - let (check, ping) = cache.check(now, node, pingf); + let (check, ping) = cache.check(&mut rng, &this_node, now, node); assert!(!check); assert_eq!(seen_nodes.insert(node), ping.is_some()); } diff --git a/gossip/src/protocol.rs b/gossip/src/protocol.rs index b4acd831627cad..61ee47750e04eb 100644 --- a/gossip/src/protocol.rs +++ b/gossip/src/protocol.rs @@ -43,11 +43,6 @@ const GOSSIP_PING_TOKEN_SIZE: usize = 32; pub(crate) const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161; // TODO These messages should go through the gpu pipeline for spam filtering -#[cfg_attr( - feature = "frozen-abi", - derive(AbiExample, AbiEnumVisitor), - frozen_abi(digest = "CBR9G92mpd1WSXEmiH6dAKHziLjJky9aYWPw6S5WmJkG") -)] #[derive(Serialize, Deserialize, Debug)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -63,7 +58,8 @@ pub(crate) enum Protocol { // Update count_packets_received if new variants are added here. } -pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; +pub(crate) type Ping = ping_pong::Ping; +pub(crate) type PingCache = ping_pong::PingCache; #[cfg_attr(feature = "frozen-abi", derive(AbiExample))] #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index eb2c7517f1eac4..4bf34b98bdbae0 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -14,7 +14,6 @@ use { crds_gossip_pull::{CrdsTimeouts, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, crds_value::{CrdsValue, CrdsValueLabel}, - ping_pong::PingCache, }, solana_rayon_threadlimit::get_thread_count, solana_sdk::{ @@ -33,6 +32,8 @@ use { }, }; +type PingCache = solana_gossip::ping_pong::PingCache<32>; + #[derive(Clone)] struct Node { keypair: Arc, @@ -650,6 +651,8 @@ fn build_gossip_thread_pool() -> ThreadPool { fn new_ping_cache() -> Mutex { let ping_cache = PingCache::new( + &mut rand::thread_rng(), + Instant::now(), Duration::from_secs(20 * 60), // ttl Duration::from_secs(20 * 60) / 64, // rate_limit_delay 2048, // capacity diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 37c990d54389c7..4e80af8676fad5 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5818,8 +5818,10 @@ dependencies = [ "rand_chacha 0.3.1", "rayon", "serde", + "serde-big-array", "serde_bytes", "serde_derive", + "siphasher", "solana-bloom", "solana-clap-utils", "solana-client", diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 7170a61237fe44..170f59f40311d4 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5638,8 +5638,10 @@ dependencies = [ "rand_chacha 0.3.1", "rayon", "serde", + "serde-big-array", "serde_bytes", "serde_derive", + "siphasher", "solana-bloom", "solana-clap-utils", "solana-client",