Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
dedups gossip addresses, keeping only the one with highest weight
Browse files Browse the repository at this point in the history
In order to avoid traffic congestion or sending duplicate packets, when
sampling gossip nodes, if several nodes have the same gossip address
because they are behind a relayer or whatever, they need to be
deduplicated into one.
  • Loading branch information
behzadnouri committed Dec 27, 2022
1 parent 7429df5 commit 2ea3212
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 18 deletions.
4 changes: 3 additions & 1 deletion gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ impl ContactInfo {
let delay = 10 * 60 * 1000; // 10 minutes
let now = timestamp() - delay + rng.gen_range(0, 2 * delay);
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
ContactInfo::new_localhost(&pubkey, now)
let mut node = ContactInfo::new_localhost(&pubkey, now);
node.gossip.set_port(rng.gen());
node
}

#[cfg(test)]
Expand Down
18 changes: 14 additions & 4 deletions gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,22 +244,32 @@ impl CrdsGossipPull {
);
// Check for nodes which have responded to ping messages.
let mut rng = rand::thread_rng();
let (weights, peers): (Vec<_>, Vec<_>) = {
let peers: Vec<_> = {
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
let now = Instant::now();
peers
.into_iter()
.filter_map(|(weight, peer)| {
.filter(|(_weight, peer)| {
let node = (peer.id, peer.gossip);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
if let Some(ping) = ping {
pings.push((peer.gossip, ping));
}
check.then_some((weight, peer))
check
})
.unzip()
.collect()
};
// Dedup gossip addresses.
let (weights, peers): (Vec<_>, Vec<_>) = peers
.into_iter()
.into_grouping_map_by(|(_weight, node)| node.gossip)
.aggregate(|acc, _node_gossip, (weight, node)| match acc {
Some((w, _)) if w >= weight => acc,
Some(_) | None => Some((weight, node)),
})
.into_values()
.unzip();
if peers.is_empty() {
return Err(CrdsGossipError::NoPeers);
}
Expand Down
26 changes: 19 additions & 7 deletions gossip/src/crds_gossip_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,22 +299,32 @@ impl CrdsGossipPush {
socket_addr_space,
);
// Check for nodes which have responded to ping messages.
let (weights, peers): (Vec<_>, Vec<_>) = {
let peers: Vec<_> = {
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
let now = Instant::now();
peers
.into_iter()
.filter_map(|(weight, peer)| {
.filter(|(_weight, peer)| {
let node = (peer.id, peer.gossip);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
if let Some(ping) = ping {
pings.push((peer.gossip, ping));
}
check.then_some((weight, peer.id))
check
})
.unzip()
.collect()
};
// Dedup gossip addresses.
let (weights, peers): (Vec<_>, Vec<_>) = peers
.into_iter()
.into_grouping_map_by(|(_weight, node)| node.gossip)
.aggregate(|acc, _node_gossip, (weight, node)| match acc {
Some((w, _)) if w >= weight => acc,
Some(_) | None => Some((weight, node.id)),
})
.into_values()
.unzip();
if peers.is_empty() {
return;
}
Expand Down Expand Up @@ -572,7 +582,8 @@ mod tests {

let active_set = push.active_set.read().unwrap();
assert!(active_set.get(&value1.label().pubkey()).is_some());
let value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
let mut value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
value2.gossip.set_port(1245);
ping_cache
.lock()
.unwrap()
Expand Down Expand Up @@ -608,8 +619,9 @@ mod tests {
let active_set = push.active_set.read().unwrap();
assert!(active_set.get(&value2.label().pubkey()).is_some());
}
for _ in 0..push.num_active {
let value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
for k in 0..push.num_active {
let mut value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
value2.gossip.set_port(1246 + k as u16);
ping_cache
.lock()
.unwrap()
Expand Down
15 changes: 9 additions & 6 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ impl CrdsData {
// the mainnet crds table.
match kind {
0 => CrdsData::ContactInfo(ContactInfo::new_rand(rng, pubkey)),
1 => CrdsData::LowestSlot(rng.gen(), LowestSlot::new_rand(rng, pubkey)),
// Index for LowestSlot is deprecated and should be zero.
1 => CrdsData::LowestSlot(0, LowestSlot::new_rand(rng, pubkey)),
2 => CrdsData::SnapshotHashes(SnapshotHashes::new_rand(rng, pubkey)),
3 => CrdsData::AccountsHashes(SnapshotHashes::new_rand(rng, pubkey)),
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
Expand Down Expand Up @@ -864,7 +865,7 @@ mod test {
let index = rng.gen_range(0, keys.len());
CrdsValue::new_rand(&mut rng, Some(&keys[index]))
})
.take(2048)
.take(1 << 12)
.collect();
let mut currents = HashMap::new();
for value in filter_current(&values) {
Expand All @@ -888,10 +889,12 @@ mod test {
}
}
assert_eq!(count, currents.len());
// Currently CrdsData::new_rand is only implemented for 5 different
// kinds and excludes EpochSlots, and so the unique labels cannot be
// more than (5 + MAX_VOTES) times number of keys.
assert!(currents.len() <= keys.len() * (5 + MAX_VOTES as usize));
// Currently CrdsData::new_rand is implemented for:
// AccountsHashes, ContactInfo, LowestSlot, SnapshotHashes, Version
// EpochSlots x MAX_EPOCH_SLOTS
// Vote x MAX_VOTES
let num_kinds = 5 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize;
assert!(currents.len() <= keys.len() * num_kinds);
}

#[test]
Expand Down

0 comments on commit 2ea3212

Please sign in to comment.