Skip to content

Commit

Permalink
Remove uncommon methods. Sled asynchronous implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Dec 31, 2023
1 parent bdb62aa commit 201e05e
Show file tree
Hide file tree
Showing 5 changed files with 1,582 additions and 956 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ map_len = []

[dependencies]
sled = "0.34"
tokio = { version = "1", features = ["sync", "rt"] }
redis = { version = "0.24", features = [ "tokio-comp", "connection-manager" ] }

futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
async-trait = "0.1"
bincode = "1.3"
Expand All @@ -35,7 +37,7 @@ ahash = "0.8"
convert = { package = "box-convert", version = "0.1", features = ["bytesize"] }

[dev-dependencies]
tokio = { version = "1", features = ["sync", "time", "macros", "rt","rt-multi-thread"] }
tokio = { version = "1", features = ["sync", "time", "macros", "rt", "rt-multi-thread"] }



Expand Down
180 changes: 95 additions & 85 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ pub(crate) fn timestamp_millis() -> TimestampMillis {
mod tests {
use super::storage::*;
use super::*;
use convert::Bytesize;
use std::time::Duration;

fn get_cfg(name: &str) -> Config {
let cfg = Config {
typ: StorageType::Sled,
sled: SledConfig {
path: format!("./.catch/{}", name),
cleanup_f: |_db| {},
..Default::default()
},
redis: RedisConfig {
Expand All @@ -129,14 +129,14 @@ mod tests {
typ: StorageType::Sled,
sled: SledConfig {
path: format!("./.catch/{}", "sled_cleanup"),
cache_capacity: Bytesize::from(1024 * 1024 * 1024 * 3),
cleanup_f: |_db| {
cache_capacity: convert::Bytesize::from(1024 * 1024 * 1024 * 3),
cleanup_f: move |_db| {
#[cfg(feature = "ttl")]
{
let db = _db.clone();
std::thread::spawn(move || {
let limit = 1000;
loop {
for _ in 0..10 {
std::thread::sleep(std::time::Duration::from_secs(1));
let mut total_cleanups = 0;
let now = std::time::Instant::now();
Expand Down Expand Up @@ -312,7 +312,7 @@ mod tests {

let l_all = l_1.all::<i32>().await.unwrap();
println!("l_all: {:?}", l_all);
assert_eq!(l_all, vec![]);
assert_eq!(l_all, Vec::<i32>::new());

tokio::time::sleep(Duration::from_secs(1)).await;
}
Expand Down Expand Up @@ -875,47 +875,57 @@ mod tests {
assert_eq!(kv001.is_empty().await.unwrap(), true);
}

#[tokio::main]
#[test]
async fn test_map_retain() {
let cfg = get_cfg("map_retain");
let db = init_db(&cfg).await.unwrap();

let map1 = db.map("map1");
map1.clear().await.unwrap();
for i in 0..100usize {
map1.insert(format!("mk_{}", i), &i).await.unwrap();
}
#[cfg(feature = "map_len")]
assert_eq!(map1.len().await.unwrap(), 100);
map1.retain::<_, _, usize>(|item| async {
match item {
Ok((_k, v)) => v != 10,
Err(e) => {
log::warn!("{:?}", e);
false
}
}
})
.await
.unwrap();
#[cfg(feature = "map_len")]
assert_eq!(map1.len().await.unwrap(), 99);

map1.retain_with_key(|item| async {
match item {
Ok(k) => k != b"mk_20",
Err(e) => {
log::warn!("{:?}", e);
false
}
}
})
.await
.unwrap();
#[cfg(feature = "map_len")]
assert_eq!(map1.len().await.unwrap(), 98);
}
// #[tokio::main]
// #[test]
// async fn test_map_retain() {
// let cfg = get_cfg("map_retain");
// let db = init_db(&cfg).await.unwrap();
//
// let map1 = db.map("map1");
// map1.clear().await.unwrap();
//
// for i in 0..100usize {
// map1.insert(format!("mk_{}", i), &i).await.unwrap();
// }
//
// #[cfg(feature = "map_len")]
// assert_eq!(map1.len().await.unwrap(), 100);
//
// map1.retain(|item| {
// let res = match item {
// Ok((_k, v)) => {
// let v = bincode::deserialize::<usize>(v.as_ref()).unwrap();
// v != 10
// }
// Err(e) => {
// log::warn!("{:?}", e);
// false
// }
// };
//
// Box::pin(async move { res })
// })
// .await
// .unwrap();
//
// #[cfg(feature = "map_len")]
// assert_eq!(map1.len().await.unwrap(), 99);
//
// map1.retain_with_key(|item| {
// let res = match item {
// Ok(k) => k != b"mk_20",
// Err(e) => {
// log::warn!("{:?}", e);
// false
// }
// };
// Box::pin(async move { res })
// })
// .await
// .unwrap();
// #[cfg(feature = "map_len")]
// assert_eq!(map1.len().await.unwrap(), 98);
// }

#[tokio::main]
#[test]
Expand Down Expand Up @@ -1358,45 +1368,45 @@ mod tests {
vec![2, 3, 4, 5, 6, 7, 8, 9]
);

let pop_v = l11
.pop_f::<_, i32>(|v| {
println!("left val: {:?}", v);
*v == 2
})
.await
.unwrap();
println!("pop val: {:?}", pop_v);
println!(
"all: {:?}, len: {:?}",
l11.all::<i32>().await.unwrap(),
l11.len().await
);
assert_eq!(l11.len().await.unwrap(), 7);
assert_eq!(l11.all::<i32>().await.unwrap(), vec![3, 4, 5, 6, 7, 8, 9]);

let pop_v = l11.pop_f::<_, i32>(|v| *v == 2).await.unwrap();
println!("pop val: {:?}", pop_v);
println!(
"all: {:?}, len: {:?}",
l11.all::<i32>().await.unwrap(),
l11.len().await
);
assert_eq!(l11.len().await.unwrap(), 7);
assert_eq!(l11.all::<i32>().await.unwrap(), vec![3, 4, 5, 6, 7, 8, 9]);

l11.clear().await.unwrap();
assert_eq!(l11.len().await.unwrap(), 0);
assert_eq!(l11.all::<i32>().await.unwrap(), vec![]);

let pop_v = l11.pop_f::<_, i32>(|_| true).await.unwrap();
println!("pop val: {:?}", pop_v);
println!(
"all: {:?}, len: {:?}",
l11.all::<i32>().await.unwrap(),
l11.len().await
);
assert_eq!(l11.len().await.unwrap(), 0);
assert_eq!(l11.all::<i32>().await.unwrap(), vec![]);
// let pop_v = l11
// .pop_f::<_, i32>(|v| {
// println!("left val: {:?}", v);
// *v == 2
// })
// .await
// .unwrap();
// println!("pop val: {:?}", pop_v);
// println!(
// "all: {:?}, len: {:?}",
// l11.all::<i32>().await.unwrap(),
// l11.len().await
// );
// assert_eq!(l11.len().await.unwrap(), 7);
// assert_eq!(l11.all::<i32>().await.unwrap(), vec![3, 4, 5, 6, 7, 8, 9]);
//
// let pop_v = l11.pop_f::<_, i32>(|v| *v == 2).await.unwrap();
// println!("pop val: {:?}", pop_v);
// println!(
// "all: {:?}, len: {:?}",
// l11.all::<i32>().await.unwrap(),
// l11.len().await
// );
// assert_eq!(l11.len().await.unwrap(), 7);
// assert_eq!(l11.all::<i32>().await.unwrap(), vec![3, 4, 5, 6, 7, 8, 9]);
//
// l11.clear().await.unwrap();
// assert_eq!(l11.len().await.unwrap(), 0);
// assert_eq!(l11.all::<i32>().await.unwrap(), vec![]);
//
// let pop_v = l11.pop_f::<_, i32>(|_| true).await.unwrap();
// println!("pop val: {:?}", pop_v);
// println!(
// "all: {:?}, len: {:?}",
// l11.all::<i32>().await.unwrap(),
// l11.len().await
// );
// assert_eq!(l11.len().await.unwrap(), 0);
// assert_eq!(l11.all::<i32>().await.unwrap(), vec![]);
}

#[tokio::main]
Expand Down
61 changes: 10 additions & 51 deletions src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use core::fmt;
use std::future::Future;

use async_trait::async_trait;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -116,6 +115,8 @@ pub trait StorageDB: Send + Sync {
async fn list_iter<'a>(
&'a mut self,
) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>>;

async fn info(&self) -> Result<serde_json::Value>;
}

#[async_trait]
Expand Down Expand Up @@ -178,17 +179,6 @@ pub trait Map: Sync + Send {
P: AsRef<[u8]> + Send,
V: DeserializeOwned + Sync + Send + 'a + 'static;

async fn retain<'a, F, Out, V>(&'a self, f: F) -> Result<()>
where
F: Fn(Result<(Key, V)>) -> Out + Send + Sync + 'static,
Out: Future<Output = bool> + Send + 'a,
V: DeserializeOwned + Sync + Send + 'a;

async fn retain_with_key<'a, F, Out>(&'a self, f: F) -> Result<()>
where
F: Fn(Result<Key>) -> Out + Send + Sync + 'static,
Out: Future<Output = bool> + Send + 'a;

#[cfg(feature = "ttl")]
async fn expire_at(&self, dur: TimestampMillis) -> Result<bool>;

Expand Down Expand Up @@ -225,11 +215,6 @@ pub trait List: Sync + Send {
where
V: DeserializeOwned + Sync + Send;

async fn pop_f<'a, F, V>(&'a self, f: F) -> Result<Option<V>>
where
F: Fn(&V) -> bool + Send + Sync + 'static,
V: DeserializeOwned + Sync + Send + 'a + 'static;

async fn all<V>(&self) -> Result<Vec<V>>
where
V: DeserializeOwned + Sync + Send;
Expand Down Expand Up @@ -482,6 +467,14 @@ impl DefaultStorageDB {
DefaultStorageDB::Redis(db) => db.list_iter().await,
}
}

#[inline]
pub async fn info(&self) -> Result<serde_json::Value> {
match self {
DefaultStorageDB::Sled(db) => db.info().await,
DefaultStorageDB::Redis(db) => db.info().await,
}
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -633,29 +626,6 @@ impl Map for StorageMap {
}
}

async fn retain<'a, F, Out, V>(&'a self, f: F) -> Result<()>
where
F: Fn(Result<(Key, V)>) -> Out + Send + Sync + 'static,
Out: Future<Output = bool> + Send + 'a,
V: DeserializeOwned + Sync + Send + 'a,
{
match self {
StorageMap::Sled(m) => m.retain(f).await,
StorageMap::Redis(m) => m.retain(f).await,
}
}

async fn retain_with_key<'a, F, Out>(&'a self, f: F) -> Result<()>
where
F: Fn(Result<Key>) -> Out + Send + Sync + 'static,
Out: Future<Output = bool> + Send + 'a,
{
match self {
StorageMap::Sled(m) => m.retain_with_key(f).await,
StorageMap::Redis(m) => m.retain_with_key(f).await,
}
}

#[cfg(feature = "ttl")]
async fn expire_at(&self, dur: TimestampMillis) -> Result<bool> {
match self {
Expand Down Expand Up @@ -754,17 +724,6 @@ impl List for StorageList {
}
}

async fn pop_f<'a, F, V>(&'a self, f: F) -> Result<Option<V>>
where
F: Fn(&V) -> bool + Send + Sync + 'static,
V: DeserializeOwned + Sync + Send + 'a + 'static,
{
match self {
StorageList::Sled(list) => list.pop_f(f).await,
StorageList::Redis(list) => list.pop_f(f).await,
}
}

async fn all<V>(&self) -> Result<Vec<V>>
where
V: DeserializeOwned + Sync + Send,
Expand Down
Loading

0 comments on commit 201e05e

Please sign in to comment.