diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index aa3d55d050..158f67d22c 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -39,7 +39,7 @@ serde = "1.0.201" sha2 = "0.10.8" shellexpand = "3.1.0" tempfile = "3.10.1" -tokio = { version = "1.37.0" } +tokio = { version = "1.37.0", features = ["full", "rt-multi-thread"] } tokio-stream = { version = "0.1.15", features = ["fs"] } tokio-util = { version = "0.7.11" } tonic = { version = "0.11.0", features = ["gzip", "tls"] } diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index ac7fe3bd30..b69815595b 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -39,12 +39,24 @@ use crate::cas_utils::is_zero_digest; #[derive(Clone)] pub struct BytesWrapper(Bytes); +impl BytesWrapper { + pub fn from_string(s: &str) -> Self { + BytesWrapper(Bytes::copy_from_slice(s.as_bytes())) + } +} + impl Debug for BytesWrapper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str("BytesWrapper { -- Binary data -- }") } } +impl Drop for BytesWrapper { + fn drop(&mut self) { + print!("BytesWrapper dropped {}\n", std::str::from_utf8(&self.0).unwrap()); + } +} + impl LenEntry for BytesWrapper { #[inline] fn len(&self) -> usize { @@ -154,6 +166,10 @@ impl MemoryStore { self.evicting_map.remove(&key.into_owned()).await } + pub async fn insert(&self, key: StoreKey<'static>, data: BytesWrapper) -> Option { + return self.evicting_map.insert(key, data).await; + } + /// Tells the store that a subscription has been dropped and gives an opportunity to clean up. fn remove_dropped_subscription(&self, key: StoreKey<'static>) { let mut subscriptions = self.subscriptions.write(); diff --git a/nativelink-store/tests/memory_store_test.rs b/nativelink-store/tests/memory_store_test.rs index 9cc6e50d6f..a400db0a2a 100644 --- a/nativelink-store/tests/memory_store_test.rs +++ b/nativelink-store/tests/memory_store_test.rs @@ -18,9 +18,10 @@ use std::pin::Pin; use bytes::{BufMut, Bytes, BytesMut}; use futures::poll; use memory_stats::memory_stats; +use nativelink_config::stores::EvictionPolicy; use nativelink_error::{Code, Error, ResultExt}; use nativelink_macro::nativelink_test; -use nativelink_store::memory_store::MemoryStore; +use nativelink_store::memory_store::{ MemoryStore, BytesWrapper }; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::common::DigestInfo; use nativelink_util::spawn; @@ -36,6 +37,24 @@ const TOO_LONG_HASH: &str = "0123456789abcdef00000000000000000001000000000000012 const TOO_SHORT_HASH: &str = "100000000000000000000000000000000000000000000000000000000000001"; const INVALID_HASH: &str = "g111111111111111111111111111111111111111111111111111111111111111"; +#[nativelink_test] +async fn insert_drop_test() -> Result<(), Error> { + const VALUE1: &str = "13"; + const VALUE2: &str = "2345"; + let eviction_policy = EvictionPolicy { + max_bytes: 5, + evict_bytes: 0, + max_seconds: 0, + max_count: 2 + }; + let store = MemoryStore::new(&nativelink_config::stores::MemoryStore { + eviction_policy: Some(eviction_policy) + }); + store.insert(StoreKey::new_str(VALUE1), BytesWrapper::from_string(VALUE1)).await; + store.insert(StoreKey::new_str(VALUE2), BytesWrapper::from_string(VALUE2)).await; + Ok(()) +} + #[nativelink_test] async fn insert_one_item_then_update() -> Result<(), Error> { const VALUE1: &str = "13"; diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index c43883a124..69e8a1881d 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -67,7 +67,25 @@ struct EvictionItem { } #[async_trait] -pub trait LenEntry: 'static { +impl LenEntry for EvictionItem { + #[inline] + fn len(&self) -> usize { + return self.data.len(); + } + + #[inline] + fn is_empty(&self) -> bool { + return self.data.is_empty(); + } + + #[inline] + async fn unref(&self) { + std::mem::drop(&self.data); + } +} + +#[async_trait] +pub trait LenEntry: 'static + Send + Sync{ /// Length of referenced data. fn len(&self) -> usize; @@ -303,6 +321,7 @@ where .lru .pop_lru() .expect("Tried to peek() then pop() but failed"); + print!("Evicting item -- "); event!(Level::INFO, ?key, "Evicting",); state.remove(&key, &eviction_item, false).await;