Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

v1.17: down samples outgoing gossip pull requests (backport of #33719) #33752

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gossip/benches/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ fn bench_build_crds_filters(bencher: &mut Bencher) {
let crds = RwLock::new(crds);
bencher.iter(|| {
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
assert_eq!(filters.len(), 128);
assert_eq!(filters.len(), 16);
});
}
96 changes: 59 additions & 37 deletions gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ use {
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
// Retention period of hashes of received outdated values.
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
// Maximum number of pull requests to send out each time around.
const MAX_NUM_PULL_REQUESTS: usize = 1024;
pub const FALSE_RATE: f64 = 0.1f64;
pub const KEYS: f64 = 8f64;

Expand Down Expand Up @@ -143,19 +141,26 @@ impl CrdsFilter {

/// A vector of crds filters that together hold a complete set of Hashes.
struct CrdsFilterSet {
filters: Vec<AtomicBloom<Hash>>,
filters: Vec<Option<AtomicBloom<Hash>>>,
mask_bits: u32,
}

impl CrdsFilterSet {
fn new(num_items: usize, max_bytes: usize) -> Self {
fn new<R: Rng>(rng: &mut R, num_items: usize, max_bytes: usize) -> Self {
const SAMPLE_RATE: usize = 8;
const MAX_NUM_FILTERS: usize = 1024;
let max_bits = (max_bytes * 8) as f64;
let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS);
let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items);
let filters =
repeat_with(|| Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize).into())
.take(1 << mask_bits)
.collect();
let mut filters: Vec<_> = repeat_with(|| None).take(1usize << mask_bits).collect();
let mut indices: Vec<_> = (0..filters.len()).collect();
let size = (filters.len() + SAMPLE_RATE - 1) / SAMPLE_RATE;
for _ in 0..MAX_NUM_FILTERS.min(size) {
let k = rng.gen_range(0..indices.len());
let k = indices.swap_remove(k);
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
filters[k] = Some(AtomicBloom::<Hash>::from(filter));
}
Self { filters, mask_bits }
}

Expand All @@ -167,7 +172,9 @@ impl CrdsFilterSet {
.unwrap_or_default(),
)
.unwrap();
self.filters[index].add(&hash_value);
if let Some(filter) = &self.filters[index] {
filter.add(&hash_value);
}
}
}

Expand All @@ -177,10 +184,12 @@ impl From<CrdsFilterSet> for Vec<CrdsFilter> {
cfs.filters
.into_iter()
.enumerate()
.map(|(seed, filter)| CrdsFilter {
filter: filter.into(),
mask: CrdsFilter::compute_mask(seed as u64, mask_bits),
mask_bits,
.filter_map(|(seed, filter)| {
Some(CrdsFilter {
filter: Bloom::<Hash>::from(filter?),
mask: CrdsFilter::compute_mask(seed as u64, mask_bits),
mask_bits,
})
})
.collect()
}
Expand Down Expand Up @@ -269,14 +278,7 @@ impl CrdsGossipPull {
if nodes.is_empty() {
return Err(CrdsGossipError::NoPeers);
}
let mut filters = self.build_crds_filters(thread_pool, crds, bloom_size);
if filters.len() > MAX_NUM_PULL_REQUESTS {
for i in 0..MAX_NUM_PULL_REQUESTS {
let j = rng.gen_range(i..filters.len());
filters.swap(i, j);
}
filters.truncate(MAX_NUM_PULL_REQUESTS);
}
let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
// Associate each pull-request filter with a randomly selected peer.
let dist = WeightedIndex::new(weights).unwrap();
let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone());
Expand Down Expand Up @@ -425,7 +427,7 @@ impl CrdsGossipPull {
let crds = crds.read().unwrap();
let num_items = crds.len() + crds.num_purged() + failed_inserts.len();
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items);
let filters = CrdsFilterSet::new(num_items, bloom_size);
let filters = CrdsFilterSet::new(&mut rand::thread_rng(), num_items, bloom_size);
thread_pool.install(|| {
crds.par_values()
.with_min_len(PAR_MIN_LENGTH)
Expand Down Expand Up @@ -669,45 +671,61 @@ pub(crate) mod tests {

#[test]
fn test_crds_filter_set_add() {
let crds_filter_set =
CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196);
let hash_values: Vec<_> = repeat_with(Hash::new_unique).take(1024).collect();
let mut rng = rand::thread_rng();
let crds_filter_set = CrdsFilterSet::new(
&mut rng, /*num_items=*/ 59672788, /*max_bytes=*/ 8196,
);
let hash_values: Vec<_> = repeat_with(|| {
let buf: [u8; 32] = rng.gen();
solana_sdk::hash::hashv(&[&buf])
})
.take(1024)
.collect();
assert_eq!(crds_filter_set.filters.len(), 8192);
for hash_value in &hash_values {
crds_filter_set.add(*hash_value);
}
let filters: Vec<CrdsFilter> = crds_filter_set.into();
let mut num_hits = 0;
assert_eq!(filters.len(), 1024);
for hash_value in hash_values {
let mut num_hits = 0;
let mut hit = false;
let mut false_positives = 0;
for filter in &filters {
if filter.test_mask(&hash_value) {
num_hits += 1;
assert!(!hit);
hit = true;
assert!(filter.contains(&hash_value));
assert!(filter.filter.contains(&hash_value));
} else if filter.filter.contains(&hash_value) {
false_positives += 1;
}
}
assert_eq!(num_hits, 1);
assert!(false_positives < 5);
}
assert!(num_hits > 96, "num_hits: {num_hits}");
}

#[test]
fn test_crds_filter_set_new() {
// Validates invariances required by CrdsFilterSet::get in the
// vector of filters generated by CrdsFilterSet::new.
let filters: Vec<CrdsFilter> =
CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).into();
assert_eq!(filters.len(), 16384);
let filters = CrdsFilterSet::new(
&mut rand::thread_rng(),
55345017, // num_items
4098, // max_bytes
);
assert_eq!(filters.filters.len(), 16384);
let filters = Vec::<CrdsFilter>::from(filters);
assert_eq!(filters.len(), 1024);
let mask_bits = filters[0].mask_bits;
let right_shift = 64 - mask_bits;
let ones = !0u64 >> mask_bits;
for (i, filter) in filters.iter().enumerate() {
for filter in &filters {
// Check that all mask_bits are equal.
assert_eq!(mask_bits, filter.mask_bits);
assert_eq!(i as u64, filter.mask >> right_shift);
assert!((0..16384).contains(&(filter.mask >> right_shift)));
assert_eq!(ones, ones & filter.mask);
}
}
Expand Down Expand Up @@ -740,7 +758,7 @@ pub(crate) mod tests {
let crds = RwLock::new(crds);
assert!(num_inserts > 30_000, "num inserts: {num_inserts}");
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32));
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(4));
let crds = crds.read().unwrap();
let purged: Vec<_> = thread_pool.install(|| crds.purged().collect());
let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect();
Expand All @@ -751,21 +769,24 @@ pub(crate) mod tests {
"hash_values.len(): {}",
hash_values.len()
);
let mut num_hits = 0;
let mut false_positives = 0;
for hash_value in hash_values {
let mut num_hits = 0;
let mut hit = false;
for filter in &filters {
if filter.test_mask(&hash_value) {
num_hits += 1;
assert!(!hit);
hit = true;
assert!(filter.contains(&hash_value));
assert!(filter.filter.contains(&hash_value));
} else if filter.filter.contains(&hash_value) {
false_positives += 1;
}
}
assert_eq!(num_hits, 1);
}
assert!(false_positives < 150_000, "fp: {false_positives}");
assert!(num_hits > 4000, "num_hits: {num_hits}");
assert!(false_positives < 20_000, "fp: {false_positives}");
}

#[test]
Expand Down Expand Up @@ -1308,7 +1329,8 @@ pub(crate) mod tests {
}
#[test]
fn test_crds_filter_complete_set_add_mask() {
let mut filters: Vec<CrdsFilter> = CrdsFilterSet::new(1000, 10).into();
let mut filters =
Vec::<CrdsFilter>::from(CrdsFilterSet::new(&mut rand::thread_rng(), 1000, 10));
assert!(filters.iter().all(|f| f.mask_bits > 0));
let mut h: Hash = Hash::default();
// rev to make the hash::default() miss on the first few test_masks
Expand Down