Skip to content

Commit

Permalink
fix: clear cache after destroy, use u128 timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed Sep 27, 2024
1 parent 5660e86 commit bae53d7
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 29 deletions.
41 changes: 37 additions & 4 deletions foyer-storage/src/small/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ where
self.inner.flushers[id].submit(Submission::Deletion { hash });
}

async fn destroy(&self) -> Result<()> {
// TODO(MrCroxx): reset bloom filters
self.inner.set_manager.destroy().await
}

fn may_contains(&self, hash: u64) -> bool {
let set_manager = self.inner.set_manager.clone();
let sid = set_manager.set_picker().sid(hash);

Check warning on line 235 in foyer-storage/src/small/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/small/generic.rs#L235

Added line #L235 was not covered by tests
Expand Down Expand Up @@ -277,7 +282,7 @@ where
}

async fn destroy(&self) -> Result<()> {
self.inner.set_manager.update_watermark().await
self.destroy().await

Check warning on line 285 in foyer-storage/src/small/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/small/generic.rs#L285

Added line #L285 was not covered by tests
}

fn stats(&self) -> Arc<DeviceStats> {
Expand Down Expand Up @@ -356,6 +361,17 @@ mod tests {
store.enqueue(entry.clone(), estimated_size);
}

async fn assert_some(store: &GenericSmallStorage<u64, Vec<u8>, RandomState>, entry: &CacheEntry<u64, Vec<u8>>) {
assert_eq!(
store.load(entry.hash()).await.unwrap().unwrap(),
(*entry.key(), entry.value().clone())
);
}

async fn assert_none(store: &GenericSmallStorage<u64, Vec<u8>, RandomState>, entry: &CacheEntry<u64, Vec<u8>>) {
assert!(store.load(entry.hash()).await.unwrap().is_none());
}

#[test_log::test(tokio::test)]
async fn test_store_enqueue_lookup_destroy_recovery() {
let dir = tempfile::tempdir().unwrap();
Expand All @@ -367,12 +383,29 @@ mod tests {
enqueue(&store, &e1);
store.wait().await;

let r1 = store.load(e1.hash()).await.unwrap().unwrap();
assert_eq!(r1, (1, vec![1; 42]));
assert_some(&store, &e1).await;

store.delete(e1.hash());
store.wait().await;

assert!(store.load(e1.hash()).await.unwrap().is_none());
assert_none(&store, &e1).await;

let e2 = memory.insert(2, vec![2; 192]);
let e3 = memory.insert(3, vec![3; 168]);

enqueue(&store, &e1);
enqueue(&store, &e2);
enqueue(&store, &e3);
store.wait().await;

assert_some(&store, &e1).await;
assert_some(&store, &e2).await;
assert_some(&store, &e3).await;

store.destroy().await.unwrap();

assert_none(&store, &e1).await;
assert_none(&store, &e2).await;
assert_none(&store, &e3).await;
}
}
42 changes: 23 additions & 19 deletions foyer-storage/src/small/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl SetMut {
/// # Format
///
/// ```plain
/// | checksum (4B) | timestamp (8B) | len (4B) |
/// | checksum (4B) | ns timestamp (16B) | len (4B) |
/// | bloom filter (4 * 8B = 32B) |
/// ```
pub struct SetStorage {
Expand All @@ -97,7 +97,7 @@ pub struct SetStorage {
/// Set size.
size: usize,
/// Set last updated timestamp.
timestamp: u64,
timestamp: u128,
/// Set bloom filter.
bloom_filter: BloomFilterU64<4>,

Expand All @@ -118,18 +118,18 @@ impl Debug for SetStorage {
}

impl SetStorage {
pub const SET_HEADER_SIZE: usize = 48;
pub const SET_HEADER_SIZE: usize = 56;

/// Load the set storage from buffer.
///
/// If `after` is set and the set storage is before the timestamp, load an empty set storage.
pub fn load(buffer: IoBytesMut, watermark: u64) -> Self {
pub fn load(buffer: IoBytesMut, watermark: u128) -> Self {
assert!(buffer.len() >= Self::SET_HEADER_SIZE);

let checksum = (&buffer[0..4]).get_u32();
let timestamp = (&buffer[4..12]).get_u64();
let len = (&buffer[12..16]).get_u32() as usize;
let bloom_filter = BloomFilterU64::read(&buffer[16..48]);
let timestamp = (&buffer[4..20]).get_u128();
let len = (&buffer[20..24]).get_u32() as usize;
let bloom_filter = BloomFilterU64::read(&buffer[24..56]);

let mut this = Self {
checksum,
Expand All @@ -141,25 +141,29 @@ impl SetStorage {
buffer,
};

if Self::SET_HEADER_SIZE + this.len >= this.buffer.len() || this.timestamp < watermark {
this.verify(watermark);

this
}

fn verify(&mut self, watermark: u128) {
if Self::SET_HEADER_SIZE + self.len >= self.buffer.len() || self.timestamp < watermark {
// invalid len
this.clear();
self.clear();
} else {
let c = Checksummer::checksum32(&this.buffer[4..Self::SET_HEADER_SIZE + this.len]);
if c != checksum {
let c = Checksummer::checksum32(&self.buffer[4..Self::SET_HEADER_SIZE + self.len]);
if c != self.checksum {
// checksum mismatch
this.clear();
self.clear();
}
}

this
}

pub fn update(&mut self) {
self.bloom_filter.write(&mut self.buffer[16..48]);
(&mut self.buffer[12..16]).put_u32(self.len as _);
self.bloom_filter.write(&mut self.buffer[24..56]);
(&mut self.buffer[20..24]).put_u32(self.len as _);
self.timestamp = SetTimestamp::current();
(&mut self.buffer[4..12]).put_u64(self.timestamp);
(&mut self.buffer[4..20]).put_u128(self.timestamp);
self.checksum = Checksummer::checksum32(&self.buffer[4..Self::SET_HEADER_SIZE + self.len]);
(&mut self.buffer[0..4]).put_u32(self.checksum);
}
Expand Down Expand Up @@ -390,8 +394,8 @@ impl<'a> Iterator for SetIter<'a> {
pub struct SetTimestamp;

impl SetTimestamp {
pub fn current() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as _
pub fn current() -> u128 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos()
}
}

Expand Down
18 changes: 12 additions & 6 deletions foyer-storage/src/small/set_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,17 @@ impl SetManager {
&self.inner.set_picker
}

pub async fn watermark(&self) -> u64 {
pub async fn watermark(&self) -> u128 {
self.inner.metadata.read().await.watermark
}

pub async fn update_watermark(&self) -> Result<()> {
pub async fn destroy(&self) -> Result<()> {
self.update_watermark().await?;
self.inner.cache.lock().await.clear();
Ok(())
}

async fn update_watermark(&self) -> Result<()> {
let mut metadata = self.inner.metadata.write().await;

let watermark = SetTimestamp::current();
Expand Down Expand Up @@ -309,7 +315,7 @@ impl SetPicker {
#[derive(Debug)]
struct Metadata {
/// watermark timestamp
watermark: u64,
watermark: u128,
}

impl Default for Metadata {
Expand All @@ -322,16 +328,16 @@ impl Default for Metadata {

impl Metadata {
const MAGIC: u64 = 0x20230512deadbeef;
const SIZE: usize = 8 + 8;
const SIZE: usize = 8 + 16;

fn write(&self, mut buf: impl BufMut) {
buf.put_u64(Self::MAGIC);
buf.put_u64(self.watermark);
buf.put_u128(self.watermark);
}

fn read(mut buf: impl Buf) -> Self {
let magic = buf.get_u64();
let watermark = buf.get_u64();
let watermark = buf.get_u128();

if magic != Self::MAGIC || watermark > SetTimestamp::current() {
return Self::default();
Expand Down

0 comments on commit bae53d7

Please sign in to comment.