Skip to content

refactor: Remove slot mutex and simplfy per blob state #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 20 commits into
base: slim-down-state
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod downloader;
pub mod proto;
pub mod remote;
pub mod tags;
use crate::api::proto::WaitIdleRequest;
pub use crate::{store::util::Tag, util::temp_tag::TempTag};

pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
Expand Down Expand Up @@ -314,6 +315,7 @@ impl Store {
Request::ClearProtected(msg) => local.send((msg, tx)),
Request::SyncDb(msg) => local.send((msg, tx)),
Request::Shutdown(msg) => local.send((msg, tx)),
Request::WaitIdle(msg) => local.send((msg, tx)),
}
})
});
Expand All @@ -332,6 +334,23 @@ impl Store {
Ok(())
}

/// Waits for the store to become completely idle.
///
/// This is mostly useful for tests, where you want to check that e.g. the
/// store has written all data to disk.
///
/// Note that a store is not guaranteed to become idle, if it is being
/// interacted with concurrently. So this might wait forever.
///
/// Also note that once you get the callback, the store is not guaranteed to
/// still be idle. All this tells you that there was a point in time where
/// the store was idle between the call and the response.
pub async fn wait_idle(&self) -> irpc::Result<()> {
let msg = WaitIdleRequest;
self.client.rpc(msg).await?;
Ok(())
}

pub(crate) fn from_sender(client: ApiClient) -> Self {
Self { client }
}
Expand Down
5 changes: 5 additions & 0 deletions src/api/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,16 @@ pub enum Request {
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
SyncDb(SyncDbRequest),
#[rpc(tx = oneshot::Sender<()>)]
WaitIdle(WaitIdleRequest),
#[rpc(tx = oneshot::Sender<()>)]
Shutdown(ShutdownRequest),
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
ClearProtected(ClearProtectedRequest),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct WaitIdleRequest;

#[derive(Debug, Serialize, Deserialize)]
pub struct SyncDbRequest;

Expand Down
41 changes: 40 additions & 1 deletion src/api/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,8 +1064,15 @@ mod tests {
use testresult::TestResult;

use crate::{
api::blobs::Blobs,
protocol::{ChunkRangesSeq, GetRequest},
store::fs::{tests::INTERESTING_SIZES, FsStore},
store::{
fs::{
tests::{create_n0_bao, test_data, INTERESTING_SIZES},
FsStore,
},
mem::MemStore,
},
tests::{add_test_hash_seq, add_test_hash_seq_incomplete},
util::ChunkRangesExt,
};
Expand Down Expand Up @@ -1117,6 +1124,38 @@ mod tests {
Ok(())
}

async fn test_observe_partial(blobs: &Blobs) -> TestResult<()> {
let sizes = INTERESTING_SIZES;
for size in sizes {
let data = test_data(size);
let ranges = ChunkRanges::chunk(0);
let (hash, bao) = create_n0_bao(&data, &ranges)?;
blobs.import_bao_bytes(hash, ranges.clone(), bao).await?;
let bitfield = blobs.observe(hash).await?;
if size > 1024 {
assert_eq!(bitfield.ranges, ranges);
} else {
assert_eq!(bitfield.ranges, ChunkRanges::all());
}
}
Ok(())
}

#[tokio::test]
async fn test_observe_partial_mem() -> TestResult<()> {
let store = MemStore::new();
test_observe_partial(store.blobs()).await?;
Ok(())
}

#[tokio::test]
async fn test_observe_partial_fs() -> TestResult<()> {
let td = tempfile::tempdir()?;
let store = FsStore::load(td.path()).await?;
test_observe_partial(store.blobs()).await?;
Ok(())
}

#[tokio::test]
async fn test_local_info_hash_seq() -> TestResult<()> {
let sizes = INTERESTING_SIZES;
Expand Down
Loading
Loading