diff --git a/foyer-storage/src/small/generic.rs b/foyer-storage/src/small/generic.rs index 4f2107be..d1f6b753 100644 --- a/foyer-storage/src/small/generic.rs +++ b/foyer-storage/src/small/generic.rs @@ -225,6 +225,11 @@ where self.inner.flushers[id].submit(Submission::Deletion { hash }); } + async fn destroy(&self) -> Result<()> { + // TODO(MrCroxx): reset bloom filters + self.inner.set_manager.destroy().await + } + fn may_contains(&self, hash: u64) -> bool { let set_manager = self.inner.set_manager.clone(); let sid = set_manager.set_picker().sid(hash); @@ -277,7 +282,7 @@ where } async fn destroy(&self) -> Result<()> { - self.inner.set_manager.update_watermark().await + self.destroy().await } fn stats(&self) -> Arc { @@ -356,6 +361,17 @@ mod tests { store.enqueue(entry.clone(), estimated_size); } + async fn assert_some(store: &GenericSmallStorage, RandomState>, entry: &CacheEntry>) { + assert_eq!( + store.load(entry.hash()).await.unwrap().unwrap(), + (*entry.key(), entry.value().clone()) + ); + } + + async fn assert_none(store: &GenericSmallStorage, RandomState>, entry: &CacheEntry>) { + assert!(store.load(entry.hash()).await.unwrap().is_none()); + } + #[test_log::test(tokio::test)] async fn test_store_enqueue_lookup_destroy_recovery() { let dir = tempfile::tempdir().unwrap(); @@ -367,12 +383,29 @@ mod tests { enqueue(&store, &e1); store.wait().await; - let r1 = store.load(e1.hash()).await.unwrap().unwrap(); - assert_eq!(r1, (1, vec![1; 42])); + assert_some(&store, &e1).await; store.delete(e1.hash()); store.wait().await; - assert!(store.load(e1.hash()).await.unwrap().is_none()); + assert_none(&store, &e1).await; + + let e2 = memory.insert(2, vec![2; 192]); + let e3 = memory.insert(3, vec![3; 168]); + + enqueue(&store, &e1); + enqueue(&store, &e2); + enqueue(&store, &e3); + store.wait().await; + + assert_some(&store, &e1).await; + assert_some(&store, &e2).await; + assert_some(&store, &e3).await; + + store.destroy().await.unwrap(); + + assert_none(&store, &e1).await; + assert_none(&store, &e2).await; + assert_none(&store, &e3).await; } } diff --git a/foyer-storage/src/small/set.rs b/foyer-storage/src/small/set.rs index bc4316fa..2af18480 100644 --- a/foyer-storage/src/small/set.rs +++ b/foyer-storage/src/small/set.rs @@ -83,7 +83,7 @@ impl SetMut { /// # Format /// /// ```plain -/// | checksum (4B) | timestamp (8B) | len (4B) | +/// | checksum (4B) | ns timestamp (16B) | len (4B) | /// | bloom filter (4 * 8B = 32B) | /// ``` pub struct SetStorage { @@ -97,7 +97,7 @@ pub struct SetStorage { /// Set size. size: usize, /// Set last updated timestamp. - timestamp: u64, + timestamp: u128, /// Set bloom filter. bloom_filter: BloomFilterU64<4>, @@ -118,18 +118,18 @@ impl Debug for SetStorage { } impl SetStorage { - pub const SET_HEADER_SIZE: usize = 48; + pub const SET_HEADER_SIZE: usize = 56; /// Load the set storage from buffer. /// /// If `after` is set and the set storage is before the timestamp, load an empty set storage. - pub fn load(buffer: IoBytesMut, watermark: u64) -> Self { + pub fn load(buffer: IoBytesMut, watermark: u128) -> Self { assert!(buffer.len() >= Self::SET_HEADER_SIZE); let checksum = (&buffer[0..4]).get_u32(); - let timestamp = (&buffer[4..12]).get_u64(); - let len = (&buffer[12..16]).get_u32() as usize; - let bloom_filter = BloomFilterU64::read(&buffer[16..48]); + let timestamp = (&buffer[4..20]).get_u128(); + let len = (&buffer[20..24]).get_u32() as usize; + let bloom_filter = BloomFilterU64::read(&buffer[24..56]); let mut this = Self { checksum, @@ -141,25 +141,29 @@ impl SetStorage { buffer, }; - if Self::SET_HEADER_SIZE + this.len >= this.buffer.len() || this.timestamp < watermark { + this.verify(watermark); + + this + } + + fn verify(&mut self, watermark: u128) { + if Self::SET_HEADER_SIZE + self.len >= self.buffer.len() || self.timestamp < watermark { // invalid len - this.clear(); + self.clear(); } else { - let c = Checksummer::checksum32(&this.buffer[4..Self::SET_HEADER_SIZE + this.len]); - if c != checksum { + let c = Checksummer::checksum32(&self.buffer[4..Self::SET_HEADER_SIZE + self.len]); + if c != self.checksum { // checksum mismatch - this.clear(); + self.clear(); } } - - this } pub fn update(&mut self) { - self.bloom_filter.write(&mut self.buffer[16..48]); - (&mut self.buffer[12..16]).put_u32(self.len as _); + self.bloom_filter.write(&mut self.buffer[24..56]); + (&mut self.buffer[20..24]).put_u32(self.len as _); self.timestamp = SetTimestamp::current(); - (&mut self.buffer[4..12]).put_u64(self.timestamp); + (&mut self.buffer[4..20]).put_u128(self.timestamp); self.checksum = Checksummer::checksum32(&self.buffer[4..Self::SET_HEADER_SIZE + self.len]); (&mut self.buffer[0..4]).put_u32(self.checksum); } @@ -390,8 +394,8 @@ impl<'a> Iterator for SetIter<'a> { pub struct SetTimestamp; impl SetTimestamp { - pub fn current() -> u64 { - SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as _ + pub fn current() -> u128 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() } } diff --git a/foyer-storage/src/small/set_manager.rs b/foyer-storage/src/small/set_manager.rs index b354d538..9a1974cc 100644 --- a/foyer-storage/src/small/set_manager.rs +++ b/foyer-storage/src/small/set_manager.rs @@ -194,11 +194,17 @@ impl SetManager { &self.inner.set_picker } - pub async fn watermark(&self) -> u64 { + pub async fn watermark(&self) -> u128 { self.inner.metadata.read().await.watermark } - pub async fn update_watermark(&self) -> Result<()> { + pub async fn destroy(&self) -> Result<()> { + self.update_watermark().await?; + self.inner.cache.lock().await.clear(); + Ok(()) + } + + async fn update_watermark(&self) -> Result<()> { let mut metadata = self.inner.metadata.write().await; let watermark = SetTimestamp::current(); @@ -309,7 +315,7 @@ impl SetPicker { #[derive(Debug)] struct Metadata { /// watermark timestamp - watermark: u64, + watermark: u128, } impl Default for Metadata { @@ -322,16 +328,16 @@ impl Default for Metadata { impl Metadata { const MAGIC: u64 = 0x20230512deadbeef; - const SIZE: usize = 8 + 8; + const SIZE: usize = 8 + 16; fn write(&self, mut buf: impl BufMut) { buf.put_u64(Self::MAGIC); - buf.put_u64(self.watermark); + buf.put_u128(self.watermark); } fn read(mut buf: impl Buf) -> Self { let magic = buf.get_u64(); - let watermark = buf.get_u64(); + let watermark = buf.get_u128(); if magic != Self::MAGIC || watermark > SetTimestamp::current() { return Self::default();