Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

feat: Introduce Importable/Exportable Storage traits. #633

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
},
"rust-analyzer.cargo.features": [
"test-kubo",
"helpers",
"performance"
"helpers"
]
}
}
2 changes: 1 addition & 1 deletion rust/noosphere-common/src/unshared.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::ConditionalSend;
use futures_util::Stream;

/// NOTE: This type was adapted from https://github.com/Nullus157/async-compression/blob/main/src/unshared.rs
/// NOTE: This type was adapted from <https://github.com/Nullus157/async-compression/blob/main/src/unshared.rs>
/// Original implementation licensed MIT/Apache 2
///
/// Wraps a type and only allows unique borrowing, the main usecase is to wrap a `!Sync` type and
Expand Down
20 changes: 9 additions & 11 deletions rust/noosphere-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,28 @@ readme = "README.md"
anyhow = { workspace = true }
async-trait = "~0.1"
async-stream = { workspace = true }
tokio-stream = { workspace = true }
base64 = "=0.21.2"
cid = { workspace = true }
noosphere-common = { version = "0.1.0", path = "../noosphere-common" }
tracing = "~0.1"
ucan = { workspace = true }
instant = { version = "0.1.12", features = ["wasm-bindgen"] }
libipld-core = { workspace = true }
libipld-cbor = { workspace = true }
noosphere-common = { version = "0.1.0", path = "../noosphere-common" }
rand = { workspace = true }
serde = { workspace = true }
base64 = "=0.21.2"
tokio-stream = { workspace = true }
tracing = "~0.1"
ucan = { workspace = true }
url = { version = "^2" }
witty-phrase-generator = "~0.2"

[dev-dependencies]
witty-phrase-generator = "~0.2"
wasm-bindgen-test = { workspace = true }
rand = { workspace = true }
noosphere-core-dev = { path = "../noosphere-core", features = ["helpers"], package = "noosphere-core" }
noosphere-common = { path = "../noosphere-common", features = ["helpers"] }
instant = { version = "0.1.12", features = ["wasm-bindgen"] }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tempfile = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
sled = "~0.34"
tempfile = { workspace = true }
tokio = { workspace = true, features = ["full"] }
rocksdb = { version = "0.21.0", optional = true }

Expand Down
6 changes: 2 additions & 4 deletions rust/noosphere-storage/examples/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,15 @@ impl BenchmarkStorage {
))]
let (storage, storage_name) = {
(
noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path(
storage_path.into(),
))?,
noosphere_storage::SledStorage::new(&storage_path)?,
"SledDbStorage",
)
};

#[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))]
let (storage, storage_name) = {
(
noosphere_storage::RocksDbStorage::new(storage_path.into())?,
noosphere_storage::RocksDbStorage::new(&storage_path)?,
"RocksDbStorage",
)
};
Expand Down
157 changes: 157 additions & 0 deletions rust/noosphere-storage/src/backup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use crate::storage::Storage;
use anyhow::Result;
use async_trait::async_trait;
use noosphere_common::ConditionalSend;
use std::path::{Path, PathBuf};

#[cfg(not(target_arch = "wasm32"))]
use crate::FsBackedStorage;

#[cfg(not(target_arch = "wasm32"))]
fn create_backup_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
use instant::SystemTime;
use rand::Rng;

let mut path = path.as_ref().to_owned();
let timestamp = SystemTime::UNIX_EPOCH
.elapsed()
.map_err(|_| anyhow::anyhow!("Could not generate timestamp."))?
.as_secs();
let nonce = rand::thread_rng().gen::<u32>();
path.set_extension(format!("backup.{}-{}", timestamp, nonce));
Ok(path)
}

/// [Storage] that can be backed up and restored.
/// [FsBackedStorage] types get a blanket implementation.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait BackupStorage: Storage {
/// Backup [Storage] located at `path`, moving to a backup location.
async fn backup<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<PathBuf>;
/// Backup [Storage] at `restore_to`, moving [Storage] from `backup_path` to `restore_to`.
async fn restore<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
backup_path: P,
restore_to: Q,
) -> Result<PathBuf>;
/// List paths to backups for `path`.
async fn list_backups<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Vec<PathBuf>>;
}

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl<T> BackupStorage for T
where
T: FsBackedStorage,
{
async fn backup<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<PathBuf> {
let backup_path = create_backup_path(path.as_ref())?;
T::rename(path, &backup_path).await?;
Ok(backup_path)
}

async fn restore<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
backup_path: P,
restore_to: Q,
) -> Result<PathBuf> {
let restoration_path = restore_to.as_ref().to_owned();
let original_backup = T::backup(&restoration_path).await?;
T::rename(backup_path, &restoration_path).await?;
Ok(original_backup)
}

async fn list_backups<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Vec<PathBuf>> {
let mut backups = vec![];
let matcher = format!(
"{}.backup.",
path.as_ref()
.file_name()
.ok_or_else(|| anyhow::anyhow!("Could not stringify path."))?
.to_str()
.ok_or_else(|| anyhow::anyhow!("Could not stringify path."))?
);
let parent_dir = path
.as_ref()
.parent()
.ok_or_else(|| anyhow::anyhow!("Could not find storage parent directory."))?;
let mut stream = tokio::fs::read_dir(parent_dir).await?;
while let Ok(Some(entry)) = stream.next_entry().await {
if let Ok(file_name) = entry.file_name().into_string() {
if file_name.starts_with(&matcher) {
backups.push(entry.path());
}
}
}
Ok(backups)
}
}

#[cfg(all(not(target_arch = "wasm32"), test))]
mod test {
use crate::{OpenStorage, PreferredPlatformStorage, Store};

use super::*;

#[tokio::test]
pub async fn it_can_backup_storages() -> Result<()> {
noosphere_core_dev::tracing::initialize_tracing(None);

let temp_dir = tempfile::TempDir::new()?;
let db_source = temp_dir.path().join("db");

{
let storage = PreferredPlatformStorage::open(&db_source).await?;
let mut store = storage.get_key_value_store("links").await?;
store.write(b"1", b"1").await?;
}

let backup_1 = PreferredPlatformStorage::backup(&db_source).await?;

{
let storage = PreferredPlatformStorage::open(&db_source).await?;
let mut store = storage.get_key_value_store("links").await?;
assert!(store.read(b"1").await?.is_none(), "Backup is a move");
store.write(b"2", b"2").await?;
}

let backup_2 = PreferredPlatformStorage::backup(&db_source).await?;

{
let storage = PreferredPlatformStorage::open(&db_source).await?;
let mut store = storage.get_key_value_store("links").await?;
assert!(store.read(b"1").await?.is_none(), "Backup is a move");
assert!(store.read(b"2").await?.is_none(), "Backup is a move");
store.write(b"3", b"3").await?;
}

let backups = PreferredPlatformStorage::list_backups(&db_source).await?;
assert_eq!(backups.len(), 2);
assert!(backups.contains(&backup_1));
assert!(backups.contains(&backup_2));

let backup_3 = PreferredPlatformStorage::restore(&backup_1, &db_source).await?;
{
let storage = PreferredPlatformStorage::open(&db_source).await?;
let store = storage.get_key_value_store("links").await?;
assert_eq!(store.read(b"1").await?.unwrap(), b"1");
assert!(store.read(b"2").await?.is_none(), "Backup is a move");
assert!(store.read(b"3").await?.is_none(), "Backup is a move");
}

let backups = PreferredPlatformStorage::list_backups(db_source).await?;
assert_eq!(backups.len(), 2);
assert!(
backups.contains(&backup_3),
"contains backup from restoration."
);
assert!(
!backups.contains(&backup_1),
"moves backup that was restored."
);
assert!(
backups.contains(&backup_2),
"contains backups that were untouched."
);
Ok(())
}
}
6 changes: 6 additions & 0 deletions rust/noosphere-storage/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ use crate::{BlockStore, KeyValueStore, MemoryStore, Storage};

use async_stream::try_stream;

/// Key for the block store in a [SphereDb]'s [Storage].
pub const BLOCK_STORE: &str = "blocks";
/// Key for the link store in a [SphereDb]'s [Storage].
pub const LINK_STORE: &str = "links";
/// Key for the version store in a [SphereDb]'s [Storage].
pub const VERSION_STORE: &str = "versions";
/// Key for the metadata store in a [SphereDb]'s [Storage].
pub const METADATA_STORE: &str = "metadata";

/// All store keys used by [SphereDb].
pub const SPHERE_DB_STORE_NAMES: &[&str] =
&[BLOCK_STORE, LINK_STORE, VERSION_STORE, METADATA_STORE];

Expand All @@ -46,6 +51,7 @@ impl<S> SphereDb<S>
where
S: Storage,
{
/// Creates a new [SphereDb] using underlying `storage`.
pub async fn new(storage: &S) -> Result<SphereDb<S>> {
Ok(SphereDb {
block_store: storage.get_block_store(BLOCK_STORE).await?,
Expand Down
68 changes: 68 additions & 0 deletions rust/noosphere-storage/src/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::storage::Storage;
use anyhow::Result;
use async_trait::async_trait;
use noosphere_common::ConditionalSend;
use std::path::Path;

/// [Storage] that is based on a file system. Implementing [FsBackedStorage]
/// provides blanket implementations for other trait-based [Storage] operations.
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
pub trait FsBackedStorage: Storage + Sized {
/// Deletes the storage located at `path` directory. Returns `Ok(())` if
/// the directory is successfully removed, or if it already does not exist.
async fn delete<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()> {
match std::fs::metadata(path.as_ref()) {
Ok(_) => std::fs::remove_dir_all(path.as_ref()).map_err(|e| e.into()),
Err(_) => Ok(()),
}
}

/// Moves the storage located at `from` to the `to` location.
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
from: P,
to: Q,
) -> Result<()> {
std::fs::rename(from, to).map_err(|e| e.into())
}
}

/// [Storage] that is based on a file system.
#[cfg(target_arch = "wasm32")]
#[async_trait(?Send)]
pub trait FsBackedStorage: Storage + Sized {
/// Deletes the storage located at `path` directory. Returns `Ok(())` if
/// the directory is successfully removed, or if it already does not exist.
async fn delete<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()>;

/// Moves the storage located at `from` to the `to` location.
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
from: P,
to: Q,
) -> Result<()>;
}

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl<T> crate::ops::DeleteStorage for T
where
T: FsBackedStorage,
{
async fn delete<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()> {
<T as FsBackedStorage>::delete(path).await
}
}

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl<T> crate::ops::RenameStorage for T
where
T: FsBackedStorage,
{
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
from: P,
to: Q,
) -> Result<()> {
<T as FsBackedStorage>::rename(from, to).await
}
}
34 changes: 0 additions & 34 deletions rust/noosphere-storage/src/helpers.rs

This file was deleted.

Loading