Skip to content

Commit

Permalink
[ENH] Call purge_one when evicting from cache. (#2917)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
- Anything that implements the hnsw provider will evict files from disk
when evicted from cache.

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
rescrv authored Oct 15, 2024
1 parent a6cf995 commit 14b2798
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions rust/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ anyhow = "1.0"
# TODO(rescrv): Deprecated. Find a suitable replacement for such things.
serde_yaml = "0.9"

thiserror = { workspace = true }
serde = { workspace = true }
async-trait = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }

Expand Down
63 changes: 63 additions & 0 deletions rust/cache/src/foyer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,19 @@ impl FoyerCacheConfig {
Ok(Box::new(FoyerPlainCache::memory(self).await?))
}

pub async fn build_memory_with_event_listener<K, V>(
&self,
tx: tokio::sync::mpsc::UnboundedSender<K>,
) -> Result<Box<dyn super::Cache<K, V>>, Box<dyn ChromaError>>
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static,
V: Clone + Send + Sync + Weighted + 'static,
{
Ok(Box::new(
FoyerPlainCache::memory_with_event_listener(self, tx).await?,
))
}

/// Build an in-memory-only cache.
pub async fn build_memory_persistent<K, V>(
&self,
Expand Down Expand Up @@ -361,6 +374,56 @@ where
.build();
Ok(FoyerPlainCache { cache })
}

/// Build an in-memory cache that emits keys that get evicted to a channel.
pub async fn memory_with_event_listener(
config: &FoyerCacheConfig,
tx: tokio::sync::mpsc::UnboundedSender<K>,
) -> Result<FoyerPlainCache<K, V>, Box<dyn ChromaError>> {
struct TokioEventListener<K, V>(
tokio::sync::mpsc::UnboundedSender<K>,
std::marker::PhantomData<V>,
)
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static,
V: Clone + Send + Sync + Weighted + 'static;
impl<K, V> foyer::EventListener for TokioEventListener<K, V>
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static,
V: Clone + Send + Sync + Weighted + 'static,
{
type Key = K;
type Value = V;

fn on_memory_release(&self, key: Self::Key, _: Self::Value)
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static,
{
// NOTE(rescrv): There's no mechanism by which we can error. We could log a
// metric, but this should really never happen.
let _ = self.0.send(key.clone());
}
}
let evl = TokioEventListener(tx, std::marker::PhantomData);

let tracing_config = TracingConfig::default();
tracing_config
.set_record_hybrid_insert_threshold(Duration::from_micros(config.trace_insert_us as _));
tracing_config
.set_record_hybrid_get_threshold(Duration::from_micros(config.trace_get_us as _));
tracing_config
.set_record_hybrid_obtain_threshold(Duration::from_micros(config.trace_obtain_us as _));
tracing_config
.set_record_hybrid_remove_threshold(Duration::from_micros(config.trace_remove_us as _));
tracing_config
.set_record_hybrid_fetch_threshold(Duration::from_micros(config.trace_fetch_us as _));

let cache = CacheBuilder::new(config.capacity)
.with_shards(config.shards)
.with_event_listener(Arc::new(evl))
.build();
Ok(FoyerPlainCache { cache })
}
}

#[async_trait::async_trait]
Expand Down
24 changes: 24 additions & 0 deletions rust/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,30 @@ pub trait Weighted {
fn weight(&self) -> usize;
}

/// Create a new cache from the provided config. This is solely for caches that cannot implement
/// the persistent cache trait. Attempts to construct a disk-based cache will return an error.
pub async fn from_config_with_event_listener<K, V>(
config: &CacheConfig,
tx: tokio::sync::mpsc::UnboundedSender<K>,
) -> Result<Box<dyn Cache<K, V>>, Box<dyn ChromaError>>
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static,
V: Clone + Send + Sync + Weighted + 'static,
{
match config {
CacheConfig::Unbounded(_) => Err(Box::new(CacheError::InvalidCacheConfig(
"from_config_with_event_listener was called with unbounded".to_string(),
))),
CacheConfig::Memory(c) => Ok(c.build_memory_with_event_listener(tx).await? as _),
CacheConfig::Disk(_) => Err(Box::new(CacheError::InvalidCacheConfig(
"from_config_with_event_listener was called with disk".to_string(),
))),
CacheConfig::Nop => Err(Box::new(CacheError::InvalidCacheConfig(
"from_config_with_event_listener was called with nop".to_string(),
))),
}
}

/// Create a new cache from the provided config. This is solely for caches that cannot implement
/// the persistent cache trait. Attempts to construct a disk-based cache will return an error.
pub async fn from_config<K, V>(
Expand Down
36 changes: 28 additions & 8 deletions rust/index/src/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct HnswIndexProvider {
pub temporary_storage_path: PathBuf,
storage: Storage,
write_mutex: Arc<tokio::sync::Mutex<()>>,
#[allow(dead_code)]
purger: Option<Arc<tokio::task::JoinHandle<()>>>,
}

#[derive(Clone)]
Expand All @@ -72,14 +74,17 @@ impl Configurable<(HnswProviderConfig, Storage)> for HnswIndexProvider {
config: &(HnswProviderConfig, Storage),
) -> Result<Self, Box<dyn ChromaError>> {
let (hnsw_config, storage) = config;
let cache = chroma_cache::from_config(&hnsw_config.hnsw_cache_config).await?;
let cache: Arc<dyn Cache<_, _>> = cache.into();
Ok(Self {
// TODO(rescrv): Long-term we should migrate this to the component API.
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let cache =
chroma_cache::from_config_with_event_listener(&hnsw_config.hnsw_cache_config, tx)
.await?;
Ok(Self::new(
storage.clone(),
PathBuf::from(&hnsw_config.hnsw_temporary_path),
cache,
storage: storage.clone(),
temporary_storage_path: PathBuf::from(&hnsw_config.hnsw_temporary_path),
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
})
rx,
))
}
}

Expand All @@ -98,13 +103,21 @@ impl HnswIndexProvider {
storage: Storage,
storage_path: PathBuf,
cache: Box<dyn Cache<Uuid, HnswIndexRef>>,
mut evicted: tokio::sync::mpsc::UnboundedReceiver<Uuid>,
) -> Self {
let cache: Arc<dyn Cache<Uuid, HnswIndexRef>> = cache.into();
let temporary_storage_path = storage_path.to_path_buf();
let purger = Some(Arc::new(tokio::task::spawn(async move {
while let Some(id) = evicted.recv().await {
let _ = Self::purge_one_id(&temporary_storage_path, id).await;
}
})));
Self {
cache,
storage,
temporary_storage_path: storage_path,
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
purger,
}
}

Expand Down Expand Up @@ -431,6 +444,12 @@ impl HnswIndexProvider {
}
}

pub async fn purge_one_id(path: &Path, id: Uuid) -> tokio::io::Result<()> {
let index_storage_path = path.join(id.to_string());
tokio::fs::remove_dir_all(index_storage_path).await?;
Ok(())
}

async fn remove_temporary_files(&self, id: &Uuid) -> tokio::io::Result<()> {
let index_storage_path = self.temporary_storage_path.join(id.to_string());
tokio::fs::remove_dir_all(index_storage_path).await
Expand Down Expand Up @@ -583,7 +602,8 @@ mod tests {

let storage = Storage::Local(LocalStorage::new(storage_dir.to_str().unwrap()));
let cache = new_non_persistent_cache_for_test();
let provider = HnswIndexProvider::new(storage, hnsw_tmp_path, cache);
let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
let provider = HnswIndexProvider::new(storage, hnsw_tmp_path, cache, rx);
let segment = Segment {
id: Uuid::new_v4(),
r#type: SegmentType::HnswDistributed,
Expand Down
2 changes: 2 additions & 0 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ mod tests {
let block_cache = new_cache_for_test();
let sparse_index_cache = new_cache_for_test();
let hnsw_cache = new_non_persistent_cache_for_test();
let (_, rx) = tokio::sync::mpsc::unbounded_channel();
let mut manager = CompactionManager::new(
scheduler,
log,
Expand All @@ -543,6 +544,7 @@ mod tests {
storage,
PathBuf::from(tmpdir.path().to_str().unwrap()),
hnsw_cache,
rx,
),
compaction_manager_queue_size,
compaction_interval,
Expand Down
2 changes: 2 additions & 0 deletions rust/worker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ mod tests {
let block_cache = new_cache_for_test();
let sparse_index_cache = new_cache_for_test();
let hnsw_index_cache = new_non_persistent_cache_for_test();
let (_, rx) = tokio::sync::mpsc::unbounded_channel();
let port = random_port::PortPicker::new().pick().unwrap();
let mut server = WorkerServer {
dispatcher: None,
Expand All @@ -683,6 +684,7 @@ mod tests {
storage.clone(),
tmp_dir.path().to_path_buf(),
hnsw_index_cache,
rx,
),
blockfile_provider: BlockfileProvider::new_arrow(
storage,
Expand Down

0 comments on commit 14b2798

Please sign in to comment.