diff --git a/foyer-storage/Cargo.toml b/foyer-storage/Cargo.toml index d67803bb..44b40b6d 100644 --- a/foyer-storage/Cargo.toml +++ b/foyer-storage/Cargo.toml @@ -18,7 +18,6 @@ allocator-api2 = "0.2" anyhow = "1.0" # TODO(MrCroxx): use `array_chunks` after `#![feature(array_chunks)]` is stable. array-util = "1" -async-channel = "2" auto_enums = { version = "0.8", features = ["futures03"] } bincode = "1" bytes = "1" diff --git a/foyer-storage/src/region.rs b/foyer-storage/src/region.rs index 20843716..44558a36 100644 --- a/foyer-storage/src/region.rs +++ b/foyer-storage/src/region.rs @@ -24,7 +24,7 @@ use std::{ task::{Context, Poll}, }; -use async_channel::{Receiver, Sender}; +use flume::{Receiver, Sender}; use foyer_common::{countdown::Countdown, metrics::Metrics}; use futures_core::future::BoxFuture; use futures_util::{future::Shared, FutureExt}; @@ -154,7 +154,7 @@ impl RegionManager { stats: Arc::new(RegionStats::default()), }) .collect_vec(); - let (clean_region_tx, clean_region_rx) = async_channel::unbounded(); + let (clean_region_tx, clean_region_rx) = flume::unbounded(); metrics.storage_region_total.absolute(device.regions() as _); metrics.storage_region_size_bytes.absolute(device.region_size() as _); @@ -248,7 +248,7 @@ impl RegionManager { pub async fn mark_clean(&self, region: RegionId) { self.inner .clean_region_tx - .send(self.region(region).clone()) + .send_async(self.region(region).clone()) .await .unwrap(); self.inner.metrics.storage_region_clean.increase(1); @@ -261,7 +261,7 @@ impl RegionManager { let metrics = self.inner.metrics.clone(); GetCleanRegionHandle::new( async move { - let region = clean_region_rx.recv().await.unwrap(); + let region = clean_region_rx.recv_async().await.unwrap(); // The only place to increase the permit. // // See comments in `ReclaimRunner::handle()` and `RecoverRunner::run()`.