Skip to content

Commit

Permalink
Fix a bug that can create oversized frequency sketch when weigher is set
Browse files Browse the repository at this point in the history
Relates to #72.
  • Loading branch information
tatsuya6502 committed Feb 1, 2022
1 parent a261146 commit 10648e9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 29 deletions.
47 changes: 31 additions & 16 deletions src/sync/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ where
}
}
// Enable the frequency sketch.
self.inner.enable_frequency_sketch();
self.inner.enable_frequency_sketch_for_testing();
}

pub(crate) fn set_expiration_clock(&self, clock: Option<Clock>) {
Expand Down Expand Up @@ -472,6 +472,7 @@ pub(crate) struct Inner<K, V, S> {
value_entry_builder: ValueEntryBuilder,
deques: Mutex<Deques<K>>,
frequency_sketch: RwLock<FrequencySketch>,
frequency_sketch_enabled: AtomicBool,
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<WriteOp<K, V>>,
time_to_live: Option<Duration>,
Expand Down Expand Up @@ -527,6 +528,7 @@ where
value_entry_builder,
deques: Mutex::new(Default::default()),
frequency_sketch: RwLock::new(Default::default()),
frequency_sketch_enabled: Default::default(),
read_op_ch,
write_op_ch,
time_to_live,
Expand Down Expand Up @@ -723,7 +725,9 @@ where
self.apply_writes(&mut deqs, w_len, &mut counters);
}

self.enable_frequency_sketch_if_needed(&counters);
if self.should_enable_frequency_sketch(&counters) {
self.enable_frequency_sketch(&counters);
}

calls += 1;
should_sync = self.read_op_ch.len() >= READ_LOG_FLUSH_POINT
Expand Down Expand Up @@ -796,30 +800,41 @@ where
}

#[inline]
fn enable_frequency_sketch_if_needed(&self, counters: &EvictionCounters) {
fn should_enable_frequency_sketch(&self, counters: &EvictionCounters) -> bool {
if self.frequency_sketch_enabled.load(Ordering::Acquire) {
false
} else if let Some(max_cap) = self.max_capacity {
counters.weighted_size >= max_cap / 2
} else {
false
}
}

#[inline]
fn enable_frequency_sketch(&self, counters: &EvictionCounters) {
if let Some(max_cap) = self.max_capacity {
if counters.weighted_size >= max_cap / 2 {
self.do_enable_frequency_sketch(counters.entry_count, max_cap);
}
let c = counters;
let cap = if self.weigher.is_none() {
max_cap
} else {
(c.entry_count as f64 * (c.weighted_size as f64 / max_cap as f64)) as u64
};
self.do_enable_frequency_sketch(cap);
}
}

#[cfg(test)]
fn enable_frequency_sketch(&self) {
fn enable_frequency_sketch_for_testing(&self) {
if let Some(max_cap) = self.max_capacity {
self.do_enable_frequency_sketch(self.entry_count.load(), max_cap);
self.do_enable_frequency_sketch(max_cap);
}
}

#[inline]
fn do_enable_frequency_sketch(&self, entry_count: u64, max_capacity: u64) {
let num_entries = if self.weigher.is_some() {
entry_count * 2
} else {
max_capacity
};
let skt_capacity = common::sketch_capacity(num_entries);
fn do_enable_frequency_sketch(&self, cache_capacity: u64) {
let skt_capacity = common::sketch_capacity(cache_capacity);
self.frequency_sketch.write().ensure_capacity(skt_capacity);
self.frequency_sketch_enabled.store(true, Ordering::Release);
}

fn apply_reads(&self, deqs: &mut Deques<K>, count: usize) {
Expand Down Expand Up @@ -1454,7 +1469,7 @@ mod tests {
None,
false,
);
cache.inner.enable_frequency_sketch();
cache.inner.enable_frequency_sketch_for_testing();
assert_eq!(
cache.inner.frequency_sketch.read().table_len(),
len as usize,
Expand Down
43 changes: 30 additions & 13 deletions src/unsync/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ pub struct Cache<K, V, S = RandomState> {
weigher: Option<Weigher<K, V>>,
deques: Deques<K>,
frequency_sketch: FrequencySketch,
frequency_sketch_enabled: bool,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
expiration_clock: Option<Clock>,
Expand Down Expand Up @@ -223,6 +224,7 @@ where
weigher,
deques: Default::default(),
frequency_sketch: Default::default(),
frequency_sketch_enabled: false,
time_to_live,
time_to_idle,
expiration_clock: None,
Expand Down Expand Up @@ -469,7 +471,9 @@ where

#[inline]
fn should_enable_frequency_sketch(&self) -> bool {
if let Some(max_cap) = self.max_capacity {
if self.frequency_sketch_enabled {
false
} else if let Some(max_cap) = self.max_capacity {
self.weighted_size >= max_cap / 2
} else {
false
Expand All @@ -479,16 +483,29 @@ where
#[inline]
fn enable_frequency_sketch(&mut self) {
if let Some(max_cap) = self.max_capacity {
let num_entries = if self.weigher.is_some() {
self.entry_count
} else {
let cap = if self.weigher.is_none() {
max_cap
} else {
(self.entry_count as f64 * (self.weighted_size as f64 / max_cap as f64)) as u64
};
let skt_capacity = common::sketch_capacity(num_entries);
self.frequency_sketch.ensure_capacity(skt_capacity);
self.do_enable_frequency_sketch(cap);
}
}

#[cfg(test)]
fn enable_frequency_sketch_for_testing(&mut self) {
if let Some(max_cap) = self.max_capacity {
self.do_enable_frequency_sketch(max_cap);
}
}

#[inline]
fn do_enable_frequency_sketch(&mut self, cache_capacity: u64) {
let skt_capacity = common::sketch_capacity(cache_capacity);
self.frequency_sketch.ensure_capacity(skt_capacity);
self.frequency_sketch_enabled = true;
}

fn saturating_add_to_total_weight(&mut self, weight: u64) {
let total = &mut self.weighted_size;
*total = total.saturating_add(weight as u64);
Expand Down Expand Up @@ -919,7 +936,7 @@ mod tests {
#[test]
fn basic_single_thread() {
let mut cache = Cache::new(3);
cache.enable_frequency_sketch();
cache.enable_frequency_sketch_for_testing();

cache.insert("a", "alice");
cache.insert("b", "bob");
Expand Down Expand Up @@ -966,7 +983,7 @@ mod tests {
let dennis = ("dennis", 15);

let mut cache = Cache::builder().max_capacity(31).weigher(weigher).build();
cache.enable_frequency_sketch();
cache.enable_frequency_sketch_for_testing();

cache.insert("a", alice);
cache.insert("b", bob);
Expand Down Expand Up @@ -1024,7 +1041,7 @@ mod tests {
#[test]
fn invalidate_all() {
let mut cache = Cache::new(100);
cache.enable_frequency_sketch();
cache.enable_frequency_sketch_for_testing();

cache.insert("a", "alice");
cache.insert("b", "bob");
Expand All @@ -1048,7 +1065,7 @@ mod tests {
use std::collections::HashSet;

let mut cache = Cache::new(100);
cache.enable_frequency_sketch();
cache.enable_frequency_sketch_for_testing();

let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
Expand Down Expand Up @@ -1092,7 +1109,7 @@ mod tests {
let mut cache = CacheBuilder::new(100)
.time_to_live(Duration::from_secs(10))
.build();
cache.enable_frequency_sketch();
cache.enable_frequency_sketch_for_testing();

let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
Expand Down Expand Up @@ -1136,7 +1153,7 @@ mod tests {
let mut cache = CacheBuilder::new(100)
.time_to_idle(Duration::from_secs(10))
.build();
cache.enable_frequency_sketch();
cache.enable_frequency_sketch_for_testing();

let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock));
Expand Down Expand Up @@ -1174,7 +1191,7 @@ mod tests {

let ensure_sketch_len = |max_capacity, len, name| {
let mut cache = Cache::<u8, u8>::new(max_capacity);
cache.enable_frequency_sketch();
cache.enable_frequency_sketch_for_testing();
assert_eq!(cache.frequency_sketch.table_len(), len as usize, "{}", name);
};

Expand Down

0 comments on commit 10648e9

Please sign in to comment.