From 1afd1db4fc28a49d2065dc7c0ced0a3bda722be4 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 1 Sep 2020 01:21:57 +0000 Subject: [PATCH] generate_pull_response optimization (bp #11597) (#11954) * generate_pull_response optimization (#11597) (cherry picked from commit f519fdecc2a958d6f5a2ccd6bbd656ffd0bf8959) # Conflicts: # core/src/crds_gossip_pull.rs * Fix merge conflicts Co-authored-by: sakridge Co-authored-by: Carl --- core/src/crds.rs | 9 +++++++ core/src/crds_gossip_pull.rs | 46 ++++++++++++++++++++++++++---------- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/core/src/crds.rs b/core/src/crds.rs index 1f6cfe3c6f494d..8c25c2b059f6e8 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -24,6 +24,7 @@ //! A value is updated to a new version if the labels match, and the value //! wallclock is later, or the value hash is greater. +use crate::crds_gossip_pull::CrdsFilter; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use bincode::serialize; use indexmap::map::IndexMap; @@ -37,6 +38,8 @@ pub struct Crds { /// Stores the map of labels and values pub table: IndexMap, pub num_inserts: usize, + + pub masks: IndexMap, } #[derive(PartialEq, Debug)] @@ -86,6 +89,7 @@ impl Default for Crds { Crds { table: IndexMap::new(), num_inserts: 0, + masks: IndexMap::new(), } } } @@ -126,6 +130,10 @@ impl Crds { .map(|current| new_value > *current) .unwrap_or(true); if do_insert { + self.masks.insert( + label.clone(), + CrdsFilter::hash_as_u64(&new_value.value_hash), + ); let old = self.table.insert(label, new_value); self.num_inserts += 1; Ok(old) @@ -193,6 +201,7 @@ impl Crds { pub fn remove(&mut self, key: &CrdsValueLabel) { self.table.swap_remove(key); + self.masks.swap_remove(key); } } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 73b81bec98058c..1a5c65b6d4f97c 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -91,7 +91,7 @@ impl CrdsFilter { // for small ratios this can result in a negative number, ensure it returns 0 instead ((num_items / max_items).log2().ceil()).max(0.0) as u32 } - fn hash_as_u64(item: &Hash) -> u64 { + pub fn hash_as_u64(item: &Hash) -> u64 { let arr = item.as_ref(); let mut accum = 0; for (i, val) in arr.iter().enumerate().take(8) { @@ -99,6 +99,10 @@ impl CrdsFilter { } accum } + pub fn test_mask_u64(&self, item: u64, ones: u64) -> bool { + let bits = item | ones; + bits == self.mask + } pub fn test_mask(&self, item: &Hash) -> bool { // only consider the highest mask_bits bits from the hash and set the rest to 1. let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64); @@ -116,6 +120,9 @@ impl CrdsFilter { } self.filter.contains(item) } + pub fn filter_contains(&self, item: &Hash) -> bool { + self.filter.contains(item) + } } #[derive(Default)] @@ -396,18 +403,31 @@ impl CrdsGossipPull { return ret; } let mut total_skipped = 0; - for v in crds.table.values() { - recent.iter().for_each(|(i, (caller, filter))| { - //skip values that are too new - if v.value.wallclock() > caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0) - { - total_skipped += 1; - return; - } - if !filter.contains(&v.value_hash) { - ret[*i].push(v.value.clone()); - } - }); + let mask_ones: Vec<_> = recent + .iter() + .map(|(_i, (_caller, filter))| (!0u64).checked_shr(filter.mask_bits).unwrap_or(!0u64)) + .collect(); + for (label, mask) in crds.masks.iter() { + recent + .iter() + .zip(mask_ones.iter()) + .for_each(|((i, (caller, filter)), mask_ones)| { + if filter.test_mask_u64(*mask, *mask_ones) { + let item = crds.table.get(label).unwrap(); + + //skip values that are too new + if item.value.wallclock() + > caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0) + { + total_skipped += 1; + return; + } + + if !filter.filter_contains(&item.value_hash) { + ret[*i].push(item.value.clone()); + } + } + }); } inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped); ret