Skip to content

Commit

Permalink
stops refreshing legacy gossip NodeInstance and Version (solana-labs#…
Browse files Browse the repository at this point in the history
…4677)

Mainnet nodes are running v2.0.15 and v2.1.5 at a minimum, both of which
are using the new ContactInfo to look up version:
https://github.com/anza-xyz/agave/blob/v2.0.15/gossip/src/cluster_info.rs#L1336-L1338
https://github.com/anza-xyz/agave/blob/v2.1.5/gossip/src/cluster_info.rs#L1316-L1317

and to check for duplicate instance:
https://github.com/anza-xyz/agave/blob/v2.0.15/gossip/src/cluster_info.rs#L2504-L2505
https://github.com/anza-xyz/agave/blob/v2.1.5/gossip/src/cluster_info.rs#L2481-L2482

So we no longer need to refresh and push legacy gossip NodeInstance and
Version.
  • Loading branch information
behzadnouri authored Jan 29, 2025
1 parent 6719b90 commit 8889b4a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 105 deletions.
70 changes: 19 additions & 51 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use {
},
contact_info::{self, ContactInfo, ContactInfoQuery, Error as ContactInfoError},
crds::{Crds, Cursor, GossipRoute},
crds_data::{
self, CrdsData, EpochSlotsIndex, LowestSlot, NodeInstance, SnapshotHashes, Version,
Vote,
},
crds_data::{self, CrdsData, EpochSlotsIndex, LowestSlot, SnapshotHashes, Vote},
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{
Expand All @@ -48,7 +45,7 @@ use {
},
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
itertools::Itertools,
rand::{seq::SliceRandom, thread_rng, CryptoRng, Rng},
rand::{seq::SliceRandom, CryptoRng, Rng},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_ledger::shred::Shred,
solana_measure::measure::Measure,
Expand Down Expand Up @@ -157,7 +154,6 @@ pub struct ClusterInfo {
local_message_pending_push_queue: Mutex<Vec<CrdsValue>>,
contact_debug_interval: u64, // milliseconds, 0 = disabled
contact_save_interval: u64, // milliseconds, 0 = disabled
instance: RwLock<NodeInstance>,
contact_info_path: PathBuf,
socket_addr_space: SocketAddrSpace,
}
Expand Down Expand Up @@ -211,7 +207,6 @@ impl ClusterInfo {
socket_addr_space: SocketAddrSpace,
) -> Self {
assert_eq!(contact_info.pubkey(), &keypair.pubkey());
let id = *contact_info.pubkey();
let me = Self {
gossip: CrdsGossip::default(),
keypair: RwLock::new(keypair),
Expand All @@ -228,7 +223,6 @@ impl ClusterInfo {
stats: GossipStats::default(),
local_message_pending_push_queue: Mutex::default(),
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())),
contact_info_path: PathBuf::default(),
contact_save_interval: 0, // disabled
socket_addr_space,
Expand Down Expand Up @@ -431,18 +425,10 @@ impl ClusterInfo {

pub fn set_keypair(&self, new_keypair: Arc<Keypair>) {
let id = new_keypair.pubkey();
{
let mut instance = self.instance.write().unwrap();
*instance = NodeInstance::new(&mut thread_rng(), id, timestamp());
}
*self.keypair.write().unwrap() = new_keypair;
self.my_contact_info.write().unwrap().hot_swap_pubkey(id);

self.refresh_my_gossip_contact_info();
self.push_message(CrdsValue::new(
CrdsData::Version(Version::new(self.id())),
&self.keypair(),
));
}

pub fn set_tpu(&self, tpu_addr: SocketAddr) -> Result<(), ContactInfoError> {
Expand Down Expand Up @@ -1174,24 +1160,17 @@ impl ClusterInfo {

fn refresh_my_gossip_contact_info(&self) {
let keypair: Arc<Keypair> = self.keypair().clone();
let instance = self.instance.read().unwrap().with_wallclock(timestamp());
let node = {
let mut node = self.my_contact_info.write().unwrap();
node.set_wallclock(timestamp());
node.clone()
};
let entries: Vec<_> = [
CrdsData::ContactInfo(node),
CrdsData::NodeInstance(instance),
]
.into_iter()
.map(|entry| CrdsValue::new(entry, &keypair))
.collect();
let mut gossip_crds = self.gossip.crds.write().unwrap();
for entry in entries {
if let Err(err) = gossip_crds.insert(entry, timestamp(), GossipRoute::LocalMessage) {
error!("Insert self failed: {err:?}");
}
let node = CrdsValue::new(CrdsData::ContactInfo(node), &keypair);
if let Err(err) = {
let mut gossip_crds = self.gossip.crds.write().unwrap();
gossip_crds.insert(node, timestamp(), GossipRoute::LocalMessage)
} {
error!("refresh_my_gossip_contact_info failed: {err:?}");
}
}

Expand Down Expand Up @@ -1528,16 +1507,6 @@ impl ClusterInfo {
let mut last_contact_info_save = timestamp();
let mut entrypoints_processed = false;
let recycler = PacketBatchRecycler::default();
let crds_data = vec![
CrdsData::Version(Version::new(self.id())),
CrdsData::NodeInstance(
self.instance.read().unwrap().with_wallclock(timestamp()),
),
];
for value in crds_data {
let value = CrdsValue::new(value, &self.keypair());
self.push_message(value);
}
let mut generate_pull_requests = true;
while !exit.load(Ordering::Relaxed) {
let start = timestamp();
Expand Down Expand Up @@ -2137,19 +2106,14 @@ impl ClusterInfo {
// Check if there is a duplicate instance of
// this node with more recent timestamp.
let check_duplicate_instance = {
let instance = self.instance.read().unwrap();
let my_contact_info = self.my_contact_info();
move |values: &[CrdsValue]| {
if should_check_duplicate_instance
&& values.iter().any(|value| match value.data() {
CrdsData::ContactInfo(other) => my_contact_info.check_duplicate(other),
CrdsData::NodeInstance(other) => instance.check_duplicate(other),
_ => false,
})
{
return Err(GossipError::DuplicateNodeInstance);
let mut nodes = values.iter().filter_map(CrdsValue::contact_info);
if nodes.any(|other| my_contact_info.check_duplicate(other)) {
Err(GossipError::DuplicateNodeInstance)
} else {
Ok(())
}
Ok(())
}
};
let mut pings = Vec::new();
Expand Down Expand Up @@ -2186,14 +2150,18 @@ impl ClusterInfo {
}
}
Protocol::PullResponse(_, mut data) => {
check_duplicate_instance(&data)?;
if should_check_duplicate_instance {
check_duplicate_instance(&data)?;
}
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
pull_responses.append(&mut data);
}
}
Protocol::PushMessage(from, mut data) => {
check_duplicate_instance(&data)?;
if should_check_duplicate_instance {
check_duplicate_instance(&data)?;
}
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
push_messages.push((from, data));
Expand Down
62 changes: 8 additions & 54 deletions gossip/src/crds_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
legacy_contact_info::LegacyContactInfo,
restart_crds_values::{RestartHeaviestFork, RestartLastVotedForkSlots},
},
rand::{CryptoRng, Rng},
rand::Rng,
serde::de::{Deserialize, Deserializer},
solana_sanitize::{Sanitize, SanitizeError},
solana_sdk::{
Expand Down Expand Up @@ -116,7 +116,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
pub(crate) fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0..9);
let kind = rng.gen_range(0..8);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
Expand All @@ -126,12 +126,11 @@ impl CrdsData {
1 => CrdsData::LowestSlot(0, LowestSlot::new_rand(rng, pubkey)),
2 => CrdsData::LegacySnapshotHashes(LegacySnapshotHashes::new_rand(rng, pubkey)),
3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)),
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)),
6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
4 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)),
5 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
rng, pubkey,
)),
7 => CrdsData::RestartHeaviestFork(RestartHeaviestFork::new_rand(rng, pubkey)),
6 => CrdsData::RestartHeaviestFork(RestartHeaviestFork::new_rand(rng, pubkey)),
_ => CrdsData::EpochSlots(
rng.gen_range(0..MAX_EPOCH_SLOTS),
EpochSlots::new_rand(rng, pubkey),
Expand Down Expand Up @@ -437,31 +436,6 @@ impl Sanitize for Version {
}
}

impl Version {
pub(crate) fn new(from: Pubkey) -> Self {
Self {
from,
wallclock: timestamp(),
version: solana_version::LegacyVersion2::default(),
}
}

/// New random Version for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
Self {
from: pubkey.unwrap_or_else(pubkey::new_rand),
wallclock: new_rand_timestamp(rng),
version: solana_version::LegacyVersion2 {
major: rng.gen(),
minor: rng.gen(),
patch: rng.gen(),
commit: Some(rng.gen()),
feature_set: rng.gen(),
},
}
}
}

#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub(crate) struct NodeInstance {
Expand All @@ -472,9 +446,10 @@ pub(crate) struct NodeInstance {
}

impl NodeInstance {
#[cfg(test)]
pub(crate) fn new<R>(rng: &mut R, from: Pubkey, now: u64) -> Self
where
R: Rng + CryptoRng,
R: Rng + rand::CryptoRng,
{
Self {
from,
Expand All @@ -484,21 +459,12 @@ impl NodeInstance {
}
}

#[cfg(test)]
// Clones the value with an updated wallclock.
pub(crate) fn with_wallclock(&self, wallclock: u64) -> Self {
Self { wallclock, ..*self }
}

// Returns true if the crds-value is a duplicate instance of this node,
// with a more recent timestamp.
// The older instance is considered the duplicate instance. If a staked
// node is restarted it will receive its old instance value from gossip.
// Considering the new instance as the duplicate would prevent the node
// from restarting.
pub(crate) fn check_duplicate(&self, other: &NodeInstance) -> bool {
self.token != other.token && self.timestamp <= other.timestamp && self.from == other.from
}

// Returns None if tokens are the same or other is not a node-instance from
// the same owner. Otherwise returns true if self has more recent timestamp
// than other, and so overrides it.
Expand Down Expand Up @@ -686,8 +652,6 @@ mod test {
timestamp: now + 1,
token: node.token,
};
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
// Older timestamp is not a duplicate.
Expand All @@ -697,8 +661,6 @@ mod test {
timestamp: now - 1,
token: rng.gen(),
};
assert!(!node.check_duplicate(&other));
assert!(other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(true));
assert_eq!(other.overrides(&node), Some(false));
// Updated wallclock is not a duplicate.
Expand All @@ -712,8 +674,6 @@ mod test {
token: node.token,
}
);
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
// Duplicate instance; tied timestamp.
Expand All @@ -724,8 +684,6 @@ mod test {
timestamp: now,
token: rng.gen(),
};
assert!(node.check_duplicate(&other));
assert!(other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(other.token < node.token));
assert_eq!(other.overrides(&node), Some(node.token < other.token));
}
Expand All @@ -737,8 +695,6 @@ mod test {
timestamp: now + 1,
token: rng.gen(),
};
assert!(node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(false));
assert_eq!(other.overrides(&node), Some(true));
}
Expand All @@ -749,8 +705,6 @@ mod test {
timestamp: now + 1,
token: rng.gen(),
};
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
}
Expand Down

0 comments on commit 8889b4a

Please sign in to comment.