diff --git a/Cargo.lock b/Cargo.lock index 4751c746f..ac62977a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1562,6 +1562,11 @@ name = "fastcdc" version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a71061d097bfa9a5a4d2efdec57990d9a88745020b365191d37e48541a1628f2" +dependencies = [ + "async-stream", + "tokio", + "tokio-stream", +] [[package]] name = "fastrand" @@ -3446,6 +3451,7 @@ dependencies = [ "async-trait", "base64 0.21.2", "byteorder", + "bytes", "cid", "console_error_panic_hook", "ed25519-zebra", @@ -3464,6 +3470,7 @@ dependencies = [ "serde_json", "strum 0.25.0", "strum_macros", + "tempfile", "thiserror", "tiny-bip39", "tokio", @@ -3651,6 +3658,7 @@ dependencies = [ "js-sys", "libipld-cbor", "libipld-core", + "rand 0.8.5", "rexie", "serde", "sled", diff --git a/Cargo.toml b/Cargo.toml index 464a76374..2061fc452 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ resolver = "2" anyhow = { version = "1" } async-stream = { version = "0.3" } axum = { version = "^0.6.18" } +bytes = { version = "1" } cid = { version = "0.10" } directories = { version = "5" } fastcdc = { version = "3.1" } @@ -31,6 +32,7 @@ libipld = { version = "0.16" } libipld-core = { version = "0.16" } libipld-cbor = { version = "0.16" } pathdiff = { version = "0.2.1" } +rand = { version = "0.8.5" } sentry-tracing = { version = "0.31.5" } serde = { version = "^1" } serde_json = { version = "^1" } diff --git a/rust/noosphere-cli/src/native/content.rs b/rust/noosphere-cli/src/native/content.rs index ab5377546..ecca2b53c 100644 --- a/rust/noosphere-cli/src/native/content.rs +++ b/rust/noosphere-cli/src/native/content.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, Result}; use cid::Cid; use globset::{Glob, GlobSet, GlobSetBuilder}; use noosphere_core::data::{BodyChunkIpld, ContentType}; -use noosphere_storage::{BlockStore, MemoryStore}; +use noosphere_storage::{BlockStore, MemoryStore, Scratch}; use pathdiff::diff_paths; use std::collections::{BTreeMap, BTreeSet}; use subtext::util::to_slug; @@ -84,7 +84,10 @@ impl Content { /// provided store. // TODO(#556): This is slow; we could probably do a concurrent traversal // similar to how we traverse when rendering files to disk - pub async fn read_all(paths: &SpherePaths, store: &mut S) -> Result { + pub async fn read_all( + paths: &SpherePaths, + store: &mut S, + ) -> Result { let root_path = paths.root(); let mut directories = vec![(None, tokio::fs::read_dir(root_path).await?)]; @@ -144,7 +147,7 @@ impl Content { }; let file_bytes = fs::read(path).await?; - let body_cid = BodyChunkIpld::store_bytes(&file_bytes, store).await?; + let body_cid = BodyChunkIpld::encode(file_bytes.as_ref(), store).await?; content.matched.insert( slug, @@ -172,7 +175,6 @@ impl Content { let mut new_blocks = MemoryStore::default(); let file_content = Content::read_all(workspace.require_sphere_paths()?, &mut new_blocks).await?; - let sphere_context = workspace.sphere_context().await?; let walker = SphereWalker::from(&sphere_context); diff --git a/rust/noosphere-core/Cargo.toml b/rust/noosphere-core/Cargo.toml index b233d6748..03d5fde9a 100644 --- a/rust/noosphere-core/Cargo.toml +++ b/rust/noosphere-core/Cargo.toml @@ -23,6 +23,8 @@ sentry = ["dep:sentry-tracing"] helpers = [] [dependencies] +bytes = { workspace = true } +tempfile = { workspace = true } tracing = { workspace = true } cid = { workspace = true } url = { workspace = true } @@ -34,14 +36,14 @@ async-stream = { workspace = true } async-once-cell = "~0.4" anyhow = { workspace = true } thiserror = { workspace = true } -fastcdc = { workspace = true } +fastcdc = { workspace = true, features = ["tokio"] } futures = "~0.3" serde = { workspace = true } serde_json = { workspace = true } byteorder = "^1.4" base64 = "0.21" ed25519-zebra = "^3" -rand = "~0.8" +rand = { workspace = true } once_cell = "^1" tiny-bip39 = "^1" tokio-stream = { workspace = true } diff --git a/rust/noosphere-core/src/data/body_chunk.rs b/rust/noosphere-core/src/data/body_chunk.rs index 66d935164..b5ecf54fd 100644 --- a/rust/noosphere-core/src/data/body_chunk.rs +++ b/rust/noosphere-core/src/data/body_chunk.rs @@ -1,12 +1,19 @@ +use crate::stream::reverse_stream; use anyhow::{anyhow, Result}; +use async_stream::try_stream; +use bytes::Bytes; use cid::Cid; -use fastcdc::v2020::FastCDC; +use fastcdc::v2020::{AsyncStreamCDC, FastCDC}; use libipld_cbor::DagCborCodec; +use noosphere_storage::{BlockStore, Scratch}; use serde::{Deserialize, Serialize}; - -use noosphere_storage::BlockStore; +use tokio::io::AsyncRead; +use tokio_stream::{Stream, StreamExt}; pub const BODY_CHUNK_MAX_SIZE: u32 = 1024 * 1024; // ~1mb/chunk worst case, ~.5mb/chunk average case +/// Encoding content larger than `CONTENT_STORAGE_MEMORY_LIMIT` will +/// use disk-storage rather than memory storage. +pub const CONTENT_STORAGE_MEMORY_LIMIT: u64 = 1024 * 1024 * 5; // 5mb /// A body chunk is a simplified flexible byte layout used for linking /// chunks of bytes. This is necessary to support cases when body contents @@ -21,7 +28,7 @@ pub struct BodyChunkIpld { } impl BodyChunkIpld { - // TODO(#498): Re-write to address potentially unbounded memory overhead + #[deprecated(note = "Use `BodyChunkIpld::encode` instead for a streaming interface")] pub async fn store_bytes(bytes: &[u8], store: &mut S) -> Result { let chunks = FastCDC::new( bytes, @@ -56,7 +63,7 @@ impl BodyChunkIpld { next_chunk_cid.ok_or_else(|| anyhow!("No CID; did you try to store zero bytes?")) } - // TODO(#498): Re-write to address potentially unbounded memory overhead + #[deprecated(note = "Use `BodyChunkIpld::decode` instead for a streaming interface")] pub async fn load_all_bytes(&self, store: &S) -> Result> { let mut all_bytes = self.bytes.clone(); let mut next_cid = self.next; @@ -70,4 +77,141 @@ impl BodyChunkIpld { Ok(all_bytes) } + + /// Encode `content` as a [BodyChunkIpld] chain in streaming fashion, + /// returning a [Stream] that yields a [Cid] and [BodyChunkIpld] tuple + /// in reverse order. + pub async fn encode_streaming<'a, R, S>( + content: R, + store: &'a S, + ) -> impl Stream> + Unpin + 'a + where + R: AsyncRead + Unpin + 'a, + S: Scratch + BlockStore, + { + Box::pin(try_stream! { + let mut chunker = AsyncStreamCDC::new( + content, + fastcdc::v2020::MINIMUM_MIN, + BODY_CHUNK_MAX_SIZE / 2, + BODY_CHUNK_MAX_SIZE, + ); + let stream = chunker.as_stream().map(|chunk_data| chunk_data.map(|chunk_data| chunk_data.data)); + let stream = reverse_stream(stream, store, CONTENT_STORAGE_MEMORY_LIMIT); + tokio::pin!(stream); + + let mut store = store.to_owned(); + let mut next_chunk_cid = None; + while let Some(chunk) = stream.try_next().await? { + let chunk = BodyChunkIpld { + bytes: chunk, + next: next_chunk_cid, + }; + + let cid = store.save::(&chunk).await?; + yield (cid, chunk); + next_chunk_cid = Some(cid); + } + }) + } + + /// Encode `content` as a [BodyChunkIpld] chain in streaming fashion, + /// returning the root [Cid] upon completion. + pub async fn encode(content: R, store: &S) -> Result + where + R: AsyncRead + Unpin, + S: Scratch + BlockStore, + { + let stream = BodyChunkIpld::encode_streaming(content, store).await; + tokio::pin!(stream); + + let mut head_cid = None; + while let Some((cid, _)) = stream.try_next().await? { + head_cid = Some(cid); + } + match head_cid { + Some(cid) => Ok(cid), + None => Err(anyhow!("Could not encode empty buffer.")), + } + } + + /// Decode a [BodyChunkIpld] chain via [Cid] into a [Bytes] stream. + pub fn decode( + cid: &Cid, + store: &S, + ) -> impl Stream> + Unpin + where + S: BlockStore, + { + let mut next = Some(*cid); + let store = store.clone(); + Box::pin(try_stream! { + while let Some(cid) = next { + debug!("Unpacking block {}...", cid); + let chunk = store.load::(&cid).await.map_err(|error| { + std::io::Error::new(std::io::ErrorKind::UnexpectedEof, error.to_string()) + })?; + yield Bytes::from(chunk.bytes); + next = chunk.next; + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use noosphere_storage::{helpers::make_disposable_storage, MemoryStore, SphereDb}; + use tokio_stream::StreamExt; + + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::wasm_bindgen_test; + #[cfg(target_arch = "wasm32")] + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_reads_and_writes_chunks_nonstreaming() -> Result<()> { + let mut store = MemoryStore::default(); + let chunk1 = vec![1; BODY_CHUNK_MAX_SIZE.try_into().unwrap()]; + let chunk2 = vec![2; BODY_CHUNK_MAX_SIZE.try_into().unwrap()]; + let chunk3 = vec![3; >::try_into(BODY_CHUNK_MAX_SIZE).unwrap() / 2]; + let bytes = [chunk1.clone(), chunk2.clone(), chunk3.clone()].concat(); + + #[allow(deprecated)] + let cid = BodyChunkIpld::store_bytes(&bytes, &mut store).await?; + let ipld_chunk = store.load::(&cid).await?; + + #[allow(deprecated)] + let output = ipld_chunk.load_all_bytes(&store).await?; + assert_eq!(output, bytes); + Ok(()) + } + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_reads_and_writes_chunks_streaming() -> Result<()> { + let provider = make_disposable_storage().await?; + let db = SphereDb::new(&provider).await?; + let mut chunks = vec![]; + for n in 1..10 { + let mut chunk: Vec = vec![0; BODY_CHUNK_MAX_SIZE.try_into().unwrap()]; + chunk.fill(n); + chunks.push(chunk); + } + let bytes = chunks.concat(); + assert!(bytes.len() as u64 > CONTENT_STORAGE_MEMORY_LIMIT); + + let cid = BodyChunkIpld::encode(bytes.as_ref(), &db).await?; + + let stream = BodyChunkIpld::decode(&cid, &db); + drop(db); + tokio::pin!(stream); + let mut output: Vec> = vec![]; + while let Some(chunk) = stream.try_next().await? { + output.push(chunk.into()); + } + assert_eq!(output.concat(), bytes); + Ok(()) + } } diff --git a/rust/noosphere-core/src/lib.rs b/rust/noosphere-core/src/lib.rs index 414745718..092871d25 100644 --- a/rust/noosphere-core/src/lib.rs +++ b/rust/noosphere-core/src/lib.rs @@ -6,6 +6,7 @@ pub mod data; pub mod view; pub mod error; +pub mod stream; pub mod tracing; #[cfg(any(test, feature = "helpers"))] diff --git a/rust/noosphere-core/src/stream.rs b/rust/noosphere-core/src/stream.rs new file mode 100644 index 000000000..b2064cfdb --- /dev/null +++ b/rust/noosphere-core/src/stream.rs @@ -0,0 +1,240 @@ +use anyhow::{anyhow, Result}; +use async_stream::try_stream; +use async_trait::async_trait; +use noosphere_storage::{KeyValueStore, MemoryStore, Scratch}; +use serde::{de::DeserializeOwned, Serialize}; +use tokio_stream::{Stream, StreamExt}; + +/// Takes a [Stream] and returns a stream that yields the items +/// in reverse. A [Scratch] provider is needed when `memory_limit` is +/// reached in order to buffer a large stream. +pub fn reverse_stream<'a, St, S, T, E>( + stream: St, + provider: &'a S, + memory_limit: u64, +) -> impl Stream> + Unpin + 'a +where + T: Reversable + 'static, + St: Stream> + 'a, + S: Scratch, + E: Into, +{ + Box::pin(try_stream! { + tokio::pin!(stream); + let mut mem_store = Some(ReversableStore::new(MemoryStore::default())); + let mut db_store: Option> = None; + + while let Some(item) = stream.try_next().await? { + if let Some(store) = db_store.as_mut() { + store.push(item).await?; + } else if let Some(store) = mem_store.as_mut() { + store.push(item).await?; + + if store.byte_length() > memory_limit { + let previous_store = mem_store.take().unwrap(); + let scratch_store = provider.get_scratch_store().await?; + db_store = Some(ReversableStore::from_store(scratch_store, previous_store).await?); + } + } + } + + if let Some(store) = db_store.take() { + let output = store.into_reverse_stream().await; + tokio::pin!(output); + while let Some(out) = output.try_next().await? { + yield out; + } + } else if let Some(store) = mem_store.take() { + let output = store.into_reverse_stream().await; + tokio::pin!(output); + while let Some(out) = output.try_next().await? { + yield out; + } + } else { + panic!("Unrecoverable reversable stream state."); + }; + }) +} + +/// An accumulating store that can stream out its items +/// forward or in reverse. +struct ReversableStore> { + item_count: usize, + byte_length: u64, + store: S, + _marker: std::marker::PhantomData, +} + +impl ReversableStore +where + T: Reversable + 'static, + S: ReversableStorage, +{ + fn new(store: S) -> Self { + ReversableStore { + item_count: 0, + byte_length: 0, + store, + _marker: std::marker::PhantomData, + } + } + + /// Drains `other` store into a newly created store using `inner`. + async fn from_store(inner: S, other: ReversableStore) -> Result { + let mut store = ReversableStore::new(inner); + let mut stream = other.into_forward_stream().await; + while let Some(item) = stream.try_next().await? { + store.push(item).await?; + } + Ok(store) + } + + /// Push a new `item` to the store. + async fn push(&mut self, item: T) -> Result<()> { + self.byte_length += item.size_of(); + self.store.push(item, self.item_count).await?; + self.item_count += 1; + Ok(()) + } + + /// Get total byte length of all items in the store. + pub fn byte_length(&self) -> u64 { + self.byte_length + } + + /// Drain this store's items in a forward stream. + pub async fn into_forward_stream( + mut self, + ) -> impl Stream> + Unpin { + Box::pin(try_stream! { + for index in 0..self.item_count { + let item = self.store.get(index).await?; + yield item; + } + }) + } + + /// Drain this store's items in a reverse stream. + pub async fn into_reverse_stream( + mut self, + ) -> impl Stream> + Unpin { + Box::pin(try_stream! { + for n in 0..self.item_count { + let index = self.item_count - n - 1; + let item = self.store.get(index).await?; + yield item; + } + }) + } +} + +/// A trait for interacting with [KeyValueStore]s as an +/// immutable stack. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +trait ReversableStorage { + /// Push a new item into the store. + async fn push(&mut self, item: T, index: usize) -> Result<()>; + /// Retrieve an item from the store. + async fn get(&mut self, index: usize) -> Result; +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl ReversableStorage for S { + async fn push(&mut self, item: T, index: usize) -> Result<()> { + let key = format!("{}", index); + self.set_key(&key, item).await?; + Ok(()) + } + + async fn get(&mut self, index: usize) -> Result { + let key = format!("{}", index); + let item = self.get_key(&key).await?; + item.ok_or_else(|| anyhow!("Missing chunk.")) + } +} + +#[cfg(target_arch = "wasm32")] +pub trait Sendable {} +#[cfg(target_arch = "wasm32")] +impl Sendable for T {} +#[cfg(not(target_arch = "wasm32"))] +pub trait Sendable: Send {} +#[cfg(not(target_arch = "wasm32"))] +impl Sendable for T {} + +/// Helper trait for item types supported by `reverse_stream`. +/// Currently only implemented for `Vec`, other types may be +/// supported in the future. +pub trait Reversable: Sendable + Serialize + DeserializeOwned { + /// Get the byte size of this item. + fn size_of(&self) -> u64; +} + +impl Reversable for Vec { + fn size_of(&self) -> u64 { + self.len() as u64 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use async_stream::stream; + use noosphere_storage::helpers::make_disposable_storage; + use tokio; + use tokio_stream::StreamExt; + + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::wasm_bindgen_test; + #[cfg(target_arch = "wasm32")] + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_reverses_streams() -> Result<()> { + let provider = make_disposable_storage().await?; + let input_stream = Box::pin(stream! { + for n in 1..=5 { + yield Result::<_>::Ok(vec![n]); + } + }); + let mut reversed = reverse_stream(input_stream, &provider, 1024); + let mut output = vec![]; + while let Some(value) = reversed.try_next().await? { + output.push(value); + } + assert_eq!(output, vec![vec![5], vec![4], vec![3], vec![2], vec![1],]); + Ok(()) + } + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_reverses_large_streams() -> Result<()> { + let chunk_size: u32 = 1024; + let memory_limit = chunk_size * 5; + let item_count = 10; + let provider = make_disposable_storage().await?; + let input_stream = Box::pin(stream! { + for n in 1..=item_count { + let chunk: Vec = vec![n; chunk_size.try_into().unwrap()]; + yield Result::<_>::Ok(chunk); + } + }); + + assert!( + memory_limit < (chunk_size * >::into(item_count)), + "memory limit will be surpassed" + ); + + let mut reversed = reverse_stream(input_stream, &provider, memory_limit.into()); + let mut counter = 10; + while let Some(value) = reversed.try_next().await? { + assert_eq!(value, vec![counter; chunk_size.try_into().unwrap()]); + counter -= 1; + } + assert_eq!(counter, 0); + Ok(()) + } +} diff --git a/rust/noosphere-gateway/Cargo.toml b/rust/noosphere-gateway/Cargo.toml index b74925d18..ac30aea1b 100644 --- a/rust/noosphere-gateway/Cargo.toml +++ b/rust/noosphere-gateway/Cargo.toml @@ -25,7 +25,7 @@ iroh-car = { workspace = true } thiserror = { workspace = true } strum = "0.25" strum_macros = "0.25" -bytes = "^1" +bytes = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } diff --git a/rust/noosphere-into/Cargo.toml b/rust/noosphere-into/Cargo.toml index 3ab9c54ba..55b47649c 100644 --- a/rust/noosphere-into/Cargo.toml +++ b/rust/noosphere-into/Cargo.toml @@ -30,9 +30,9 @@ horrorshow = "~0.8" cid = { workspace = true } libipld-cbor = { workspace = true } -bytes = "^1" -tokio-util = "~0.7" +bytes = { workspace = true } tokio-stream = { workspace = true } +tokio-util = "~0.7" tokio = { workspace = true, features = ["io-util", "macros", "test-util"] } async-stream = { workspace = true } diff --git a/rust/noosphere-ipfs/Cargo.toml b/rust/noosphere-ipfs/Cargo.toml index 77695e8be..646e9ba44 100644 --- a/rust/noosphere-ipfs/Cargo.toml +++ b/rust/noosphere-ipfs/Cargo.toml @@ -50,7 +50,7 @@ ipfs-api-prelude = "0.6" [dev-dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -rand = "~0.8" +rand = { workspace = true } iroh-car = { workspace = true } libipld-cbor = { workspace = true } noosphere-core = { version = "0.15.1", path = "../noosphere-core" } diff --git a/rust/noosphere-ipfs/src/storage.rs b/rust/noosphere-ipfs/src/storage.rs index 67b785c76..815ad51a8 100644 --- a/rust/noosphere-ipfs/src/storage.rs +++ b/rust/noosphere-ipfs/src/storage.rs @@ -2,7 +2,7 @@ use crate::IpfsClient; use anyhow::Result; use async_trait::async_trait; use cid::Cid; -use noosphere_storage::{BlockStore, Storage}; +use noosphere_storage::{BlockStore, Scratch, Storage}; use std::sync::Arc; use tokio::sync::RwLock; @@ -56,7 +56,6 @@ where C: IpfsClient + IpfsStorageConditionalSendSync, { type BlockStore = IpfsStore; - type KeyValueStore = S::KeyValueStore; async fn get_block_store(&self, name: &str) -> Result { @@ -69,6 +68,20 @@ where } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Scratch for IpfsStorage +where + S: Storage + IpfsStorageConditionalSendSync, + C: IpfsClient + IpfsStorageConditionalSendSync, +{ + type ScratchStore = ::KeyValueStore; + + async fn get_scratch_store(&self) -> Result { + unimplemented!(); + } +} + /// An implementation of [BlockStore] that wraps some other implementation of /// same. It forwards most behavior to its wrapped implementation, except when /// reading blocks. In that case, if a block cannot be found locally, it will diff --git a/rust/noosphere-ns/Cargo.toml b/rust/noosphere-ns/Cargo.toml index 7a7763d49..d15807b84 100644 --- a/rust/noosphere-ns/Cargo.toml +++ b/rust/noosphere-ns/Cargo.toml @@ -57,7 +57,7 @@ url = { version = "^2", features = [ "serde" ], optional = true } [dev-dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -rand = { version = "0.8.5" } +rand = { workspace = true } libipld-cbor = { workspace = true } tempfile = { workspace = true } diff --git a/rust/noosphere-sphere/Cargo.toml b/rust/noosphere-sphere/Cargo.toml index e942e50b5..46e946aeb 100644 --- a/rust/noosphere-sphere/Cargo.toml +++ b/rust/noosphere-sphere/Cargo.toml @@ -37,7 +37,7 @@ tokio-util = { version = "0.7.7", features = ["io"] } futures-util = "0.3.27" libipld-core = { workspace = true } libipld-cbor = { workspace = true } -bytes = "^1" +bytes = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } diff --git a/rust/noosphere-sphere/src/content/decoder.rs b/rust/noosphere-sphere/src/content/decoder.rs deleted file mode 100644 index 643d8527e..000000000 --- a/rust/noosphere-sphere/src/content/decoder.rs +++ /dev/null @@ -1,29 +0,0 @@ -use async_stream::try_stream; -use bytes::Bytes; -use cid::Cid; -use libipld_cbor::DagCborCodec; -use noosphere_core::data::BodyChunkIpld; -use noosphere_storage::BlockStore; -use tokio_stream::Stream; - -/// Helper to easily decode a linked list of `BodyChunkIpld` as a byte stream -pub struct BodyChunkDecoder<'a, 'b, S: BlockStore>(pub &'a Cid, pub &'b S); - -impl<'a, 'b, S: BlockStore> BodyChunkDecoder<'a, 'b, S> { - /// Consume the [BodyChunkDecoder] and return an async [Stream] of bytes - /// representing the raw body contents - pub fn stream(self) -> impl Stream> + Unpin { - let mut next = Some(*self.0); - let store = self.1.clone(); - Box::pin(try_stream! { - while let Some(cid) = next { - debug!("Unpacking block {}...", cid); - let chunk = store.load::(&cid).await.map_err(|error| { - std::io::Error::new(std::io::ErrorKind::UnexpectedEof, error.to_string()) - })?; - yield Bytes::from(chunk.bytes); - next = chunk.next; - } - }) - } -} diff --git a/rust/noosphere-sphere/src/content/mod.rs b/rust/noosphere-sphere/src/content/mod.rs index ec6444013..18f9dbcad 100644 --- a/rust/noosphere-sphere/src/content/mod.rs +++ b/rust/noosphere-sphere/src/content/mod.rs @@ -2,12 +2,10 @@ //! with a public "slug", that is addressable by them or others who have replicated the sphere //! data. -mod decoder; mod file; mod read; mod write; -pub use decoder::*; pub use file::*; pub use read::*; pub use write::*; diff --git a/rust/noosphere-sphere/src/content/write.rs b/rust/noosphere-sphere/src/content/write.rs index 2904d21e9..df2052414 100644 --- a/rust/noosphere-sphere/src/content/write.rs +++ b/rust/noosphere-sphere/src/content/write.rs @@ -156,7 +156,8 @@ where // amount to byte streams, but in point of fact we can support anything // that may be referenced by CID including arbitrary IPLD structures let body_cid = - BodyChunkIpld::store_bytes(&bytes, self.sphere_context_mut().await?.db_mut()).await?; + BodyChunkIpld::encode(bytes.as_ref(), self.sphere_context_mut().await?.db_mut()) + .await?; self.link(slug, content_type, &body_cid, additional_headers) .await diff --git a/rust/noosphere-sphere/src/internal.rs b/rust/noosphere-sphere/src/internal.rs index d6913ed17..7f507eedf 100644 --- a/rust/noosphere-sphere/src/internal.rs +++ b/rust/noosphere-sphere/src/internal.rs @@ -1,4 +1,4 @@ -use super::{BodyChunkDecoder, SphereFile}; +use super::SphereFile; use crate::{AsyncFileBody, HasSphereContext}; use anyhow::{anyhow, Result}; use async_trait::async_trait; @@ -9,7 +9,7 @@ use tokio_util::io::StreamReader; use cid::Cid; use noosphere_core::{ authority::Access, - data::{ContentType, Header, Link, MemoIpld}, + data::{BodyChunkIpld, ContentType, Header, Link, MemoIpld}, }; /// A module-private trait for internal trait methods; this is a workaround for @@ -86,7 +86,7 @@ where let stream = match content_type { // TODO(#86): Content-type aware decoding of body bytes - Some(_) => BodyChunkDecoder(&memo.body, &db).stream(), + Some(_) => BodyChunkIpld::decode(&memo.body, &db), None => return Err(anyhow!("No content type specified")), }; diff --git a/rust/noosphere-sphere/src/replication/stream.rs b/rust/noosphere-sphere/src/replication/stream.rs index 6150afbb7..f4e8fba81 100644 --- a/rust/noosphere-sphere/src/replication/stream.rs +++ b/rust/noosphere-sphere/src/replication/stream.rs @@ -6,7 +6,7 @@ use cid::Cid; use libipld_cbor::DagCborCodec; use noosphere_core::{ authority::collect_ucan_proofs, - data::{ContentType, Link, MemoIpld, SphereIpld}, + data::{BodyChunkIpld, ContentType, Link, MemoIpld, SphereIpld}, view::Sphere, }; use noosphere_storage::{BlockStore, BlockStoreTap, UcanStore}; @@ -17,7 +17,6 @@ use ucan::{store::UcanJwtStore, Ucan}; use crate::{ walk_versioned_map_changes_and, walk_versioned_map_elements, walk_versioned_map_elements_and, - BodyChunkDecoder, }; /// Stream all the blocks required to reconstruct the history of a sphere since a @@ -231,7 +230,7 @@ where revocations_result??; } Some(_) => { - let stream = BodyChunkDecoder(&memo.body, &store).stream(); + let stream = BodyChunkIpld::decode(&memo.body, &store); drop(store); @@ -275,8 +274,8 @@ mod tests { use crate::{ car_stream, helpers::{simulated_sphere_context, touch_all_sphere_blocks, SimulationAccess}, - memo_body_stream, memo_history_stream, BodyChunkDecoder, HasMutableSphereContext, - HasSphereContext, SphereContentWrite, SpherePetnameWrite, + memo_body_stream, memo_history_stream, HasMutableSphereContext, HasSphereContext, + SphereContentWrite, SpherePetnameWrite, }; #[cfg(target_arch = "wasm32")] @@ -538,7 +537,7 @@ mod tests { let memo = store.load::(&content_cid).await?; let mut buffer = Vec::new(); - let body_stream = BodyChunkDecoder(&memo.body, &store).stream(); + let body_stream = BodyChunkIpld::decode(&memo.body, &store); tokio::pin!(body_stream); diff --git a/rust/noosphere-storage/Cargo.toml b/rust/noosphere-storage/Cargo.toml index b3ad41872..9524d9ab7 100644 --- a/rust/noosphere-storage/Cargo.toml +++ b/rust/noosphere-storage/Cargo.toml @@ -29,12 +29,13 @@ tracing = "~0.1" ucan = { workspace = true } libipld-core = { workspace = true } libipld-cbor = { workspace = true } +rand = { workspace = true } serde = { workspace = true } base64 = "=0.21.2" url = { version = "^2" } +witty-phrase-generator = { version = "~0.2", optional = true } [dev-dependencies] -witty-phrase-generator = "~0.2" wasm-bindgen-test = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] @@ -53,3 +54,7 @@ features = [ "Window", "DedicatedWorkerGlobalScope", ] + +[features] +default = ["helpers"] +helpers = ["dep:witty-phrase-generator"] diff --git a/rust/noosphere-storage/src/db.rs b/rust/noosphere-storage/src/db.rs index 09660d07d..569460d7d 100644 --- a/rust/noosphere-storage/src/db.rs +++ b/rust/noosphere-storage/src/db.rs @@ -13,7 +13,7 @@ use std::{collections::BTreeSet, fmt::Debug}; use tokio_stream::{Stream, StreamExt}; use ucan::store::{UcanStore, UcanStoreConditionalSend}; -use crate::{BlockStore, BlockStoreSend, KeyValueStore, MemoryStore, Storage}; +use crate::{BlockStore, BlockStoreSend, KeyValueStore, MemoryStore, Scratch, Storage}; use async_stream::try_stream; @@ -33,9 +33,15 @@ pub const BLOCK_STORE: &str = "blocks"; pub const LINK_STORE: &str = "links"; pub const VERSION_STORE: &str = "versions"; pub const METADATA_STORE: &str = "metadata"; +pub const SCRATCH_STORE: &str = "scratch"; -pub const SPHERE_DB_STORE_NAMES: &[&str] = - &[BLOCK_STORE, LINK_STORE, VERSION_STORE, METADATA_STORE]; +pub const SPHERE_DB_STORE_NAMES: &[&str] = &[ + BLOCK_STORE, + LINK_STORE, + VERSION_STORE, + METADATA_STORE, + SCRATCH_STORE, +]; /// A [SphereDb] is a high-level storage primitive for Noosphere's APIs. It /// takes a [Storage] and implements [BlockStore] and [KeyValueStore], @@ -45,8 +51,9 @@ pub const SPHERE_DB_STORE_NAMES: &[&str] = #[derive(Clone, Debug)] pub struct SphereDb where - S: Storage, + S: Storage + Scratch, { + storage: S, block_store: S::BlockStore, link_store: S::KeyValueStore, version_store: S::KeyValueStore, @@ -55,10 +62,11 @@ where impl SphereDb where - S: Storage, + S: Storage + Scratch, { pub async fn new(storage: &S) -> Result> { Ok(SphereDb { + storage: storage.to_owned(), block_store: storage.get_block_store(BLOCK_STORE).await?, link_store: storage.get_key_value_store(LINK_STORE).await?, version_store: storage.get_key_value_store(VERSION_STORE).await?, @@ -244,7 +252,7 @@ where #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl BlockStore for SphereDb where - S: Storage, + S: Storage + Scratch, { async fn put_links(&mut self, cid: &Cid, block: &[u8]) -> Result<()> where @@ -274,7 +282,7 @@ where #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl KeyValueStore for SphereDb where - S: Storage, + S: Storage + Scratch, { async fn set_key(&mut self, key: K, value: V) -> Result<()> where @@ -304,7 +312,7 @@ where #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl UcanStore for SphereDb where - S: Storage, + S: Storage + Scratch, { async fn read>(&self, cid: &Cid) -> Result> { self.get::(cid).await @@ -318,19 +326,34 @@ where } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Scratch for SphereDb +where + S: Storage + Scratch, +{ + type ScratchStore = ::ScratchStore; + + async fn get_scratch_store(&self) -> Result { + self.storage.get_scratch_store().await + } +} + #[cfg(test)] mod tests { - + use super::*; + use crate::{ + block_encode, derive_cid, helpers::make_disposable_storage, BlockStore, KeyValueStoreSend, + MemoryStorage, Scratch, SphereDb, + }; use libipld_cbor::DagCborCodec; use libipld_core::{ipld::Ipld, raw::RawCodec}; + use tokio_stream::StreamExt; use ucan::store::UcanJwtStore; + #[cfg(target_arch = "wasm32")] use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; - use crate::{block_encode, derive_cid, BlockStore, MemoryStorage, SphereDb}; - - use tokio_stream::StreamExt; - #[cfg(target_arch = "wasm32")] wasm_bindgen_test_configure!(run_in_browser); @@ -404,4 +427,50 @@ mod tests { assert_eq!(token, Some("foobar".into())); } + + async fn test_storage_scratch_provider(db: SphereDb) -> anyhow::Result<()> + where + S: Storage + Scratch, + S::ScratchStore: KeyValueStoreSend + 'static, + { + let mut stores = vec![]; + for n in 1..3 { + stores.push((db.get_scratch_store().await?, n)); + } + + for record in stores.iter_mut() { + record + .0 + .set_key(format!("foo-{}", record.1), format!("bar-{}", record.1)) + .await?; + } + + for record in stores { + for i in 1..3 { + let value = record.0.get_key(&format!("foo-{}", i)).await?; + if record.1 == i { + assert_eq!(value, Some(format!("bar-{}", i))); + } else { + assert_eq!(value, None); + } + } + } + Ok(()) + } + + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + pub async fn it_supports_scratch_space_platform_default() -> anyhow::Result<()> { + let storage_provider = make_disposable_storage().await?; + let db = SphereDb::new(&storage_provider).await.unwrap(); + test_storage_scratch_provider(db).await + } + + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + pub async fn it_supports_scratch_space_memory() -> anyhow::Result<()> { + let storage_provider = MemoryStorage::default(); + let db = SphereDb::new(&storage_provider).await.unwrap(); + test_storage_scratch_provider(db).await + } } diff --git a/rust/noosphere-storage/src/helpers.rs b/rust/noosphere-storage/src/helpers.rs index 0e423a831..456363990 100644 --- a/rust/noosphere-storage/src/helpers.rs +++ b/rust/noosphere-storage/src/helpers.rs @@ -6,6 +6,12 @@ use crate::{NativeStorage, NativeStorageInit, NativeStore}; #[cfg(not(target_arch = "wasm32"))] pub async fn make_disposable_store() -> Result { + let provider = make_disposable_storage().await?; + provider.get_block_store("foo").await +} + +#[cfg(not(target_arch = "wasm32"))] +pub async fn make_disposable_storage() -> Result { let temp_dir = std::env::temp_dir(); let temp_name: String = witty_phrase_generator::WPGen::new() .with_words(3) @@ -14,8 +20,7 @@ pub async fn make_disposable_store() -> Result { .map(String::from) .collect(); let db = sled::open(temp_dir.join(temp_name)).unwrap(); - let provider = NativeStorage::new(NativeStorageInit::Db(db))?; - provider.get_block_store("foo").await + NativeStorage::new(NativeStorageInit::Db(db)) } #[cfg(target_arch = "wasm32")] @@ -23,6 +28,12 @@ use crate::{WebStorage, WebStore}; #[cfg(target_arch = "wasm32")] pub async fn make_disposable_store() -> Result { + let provider = make_disposable_storage().await?; + provider.get_block_store(crate::db::BLOCK_STORE).await +} + +#[cfg(target_arch = "wasm32")] +pub async fn make_disposable_storage() -> Result { let temp_name: String = witty_phrase_generator::WPGen::new() .with_words(3) .unwrap() @@ -30,6 +41,5 @@ pub async fn make_disposable_store() -> Result { .map(|word| String::from(word)) .collect(); - let provider = WebStorage::new(&temp_name).await?; - provider.get_block_store(crate::db::BLOCK_STORE).await + WebStorage::new(&temp_name).await } diff --git a/rust/noosphere-storage/src/implementation/memory.rs b/rust/noosphere-storage/src/implementation/memory.rs index 75e5335f6..be4328642 100644 --- a/rust/noosphere-storage/src/implementation/memory.rs +++ b/rust/noosphere-storage/src/implementation/memory.rs @@ -6,6 +6,7 @@ use tokio::sync::Mutex; use crate::storage::Storage; use crate::store::Store; +use crate::Scratch; #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -45,7 +46,6 @@ impl MemoryStorage { #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Storage for MemoryStorage { type BlockStore = MemoryStore; - type KeyValueStore = MemoryStore; async fn get_block_store(&self, name: &str) -> Result { @@ -130,3 +130,24 @@ impl Store for MemoryStore { Ok(dags.remove(key)) } } + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Scratch for MemoryStorage { + type ScratchStore = MemoryStore; + async fn get_scratch_store(&self) -> Result { + Ok(MemoryStore::default()) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Scratch for MemoryStore { + type ScratchStore = MemoryStore; + // It's uncommon for a Store to implement a Scratch, but trivial + // for an in-memory implementation, and makes temporary stores + // easier to work with in the codebase. + async fn get_scratch_store(&self) -> Result { + Ok(MemoryStore::default()) + } +} diff --git a/rust/noosphere-storage/src/implementation/native.rs b/rust/noosphere-storage/src/implementation/native.rs index 62bf1658c..ae5091358 100644 --- a/rust/noosphere-storage/src/implementation/native.rs +++ b/rust/noosphere-storage/src/implementation/native.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; -use crate::storage::Storage; use crate::store::Store; +use crate::{storage::Storage, Scratch}; use anyhow::Result; use async_trait::async_trait; @@ -38,7 +38,6 @@ impl NativeStorage { #[async_trait] impl Storage for NativeStorage { type BlockStore = NativeStore; - type KeyValueStore = NativeStore; async fn get_block_store(&self, name: &str) -> Result { @@ -99,3 +98,57 @@ impl Drop for NativeStorage { let _ = self.db.flush(); } } + +#[async_trait] +impl Scratch for NativeStorage { + type ScratchStore = TempNativeStore; + + async fn get_scratch_store(&self) -> Result { + TempNativeStore::new(&self.db) + } +} + +#[derive(Clone)] +/// A [NativeStore] that does not persist data after dropping. +/// Can be created from [NativeStorage]'s [Scratch] implementation. +pub struct TempNativeStore { + db: Db, + name: String, + store: NativeStore, +} + +impl TempNativeStore { + pub(crate) fn new(db: &Db) -> Result { + let db = db.to_owned(); + let name = format!("temp-native-store-{}", rand::random::()); + let store = NativeStore::new(&db.open_tree(&name)?); + Ok(TempNativeStore { db, store, name }) + } +} + +#[async_trait] +impl Store for TempNativeStore { + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(key).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(key, bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(key).await + } + + async fn flush(&self) -> Result<()> { + self.store.flush().await + } +} + +impl Drop for TempNativeStore { + fn drop(&mut self) { + if let Err(e) = self.db.drop_tree(&self.name) { + error!("Could not drop temporary tree: {}", e); + } + } +} diff --git a/rust/noosphere-storage/src/implementation/tracking.rs b/rust/noosphere-storage/src/implementation/tracking.rs index ba690d755..e0d236625 100644 --- a/rust/noosphere-storage/src/implementation/tracking.rs +++ b/rust/noosphere-storage/src/implementation/tracking.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use std::sync::Arc; use tokio::sync::Mutex; -use crate::{store::Store, MemoryStorage, MemoryStore, Storage}; +use crate::{store::Store, MemoryStorage, MemoryStore, Scratch, Storage}; #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct StoreStats { @@ -90,7 +90,6 @@ impl TrackingStorage { #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Storage for TrackingStorage { type BlockStore = TrackingStore; - type KeyValueStore = TrackingStore; async fn get_block_store(&self, name: &str) -> Result { @@ -103,3 +102,16 @@ impl Storage for TrackingStorage { Ok(key_value_store) } } + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Scratch for TrackingStorage +where + T: Storage + Scratch, + T::ScratchStore: Store, +{ + type ScratchStore = TrackingStore<::ScratchStore>; + async fn get_scratch_store(&self) -> Result { + Ok(TrackingStore::wrap(self.storage.get_scratch_store().await?)) + } +} diff --git a/rust/noosphere-storage/src/implementation/web.rs b/rust/noosphere-storage/src/implementation/web.rs index bbaea4ea9..3eccaef64 100644 --- a/rust/noosphere-storage/src/implementation/web.rs +++ b/rust/noosphere-storage/src/implementation/web.rs @@ -1,5 +1,8 @@ -use crate::store::Store; -use crate::{db::SPHERE_DB_STORE_NAMES, storage::Storage}; +use crate::{ + db::{SCRATCH_STORE, SPHERE_DB_STORE_NAMES}, + storage::Storage, +}; +use crate::{store::Store, Scratch}; use anyhow::{anyhow, Result}; use async_trait::async_trait; use js_sys::Uint8Array; @@ -9,7 +12,7 @@ use rexie::{ use std::{fmt::Debug, rc::Rc}; use wasm_bindgen::{JsCast, JsValue}; -pub const INDEXEDDB_STORAGE_VERSION: u32 = 1; +pub const INDEXEDDB_STORAGE_VERSION: u32 = 2; #[derive(Clone)] pub struct WebStorage { @@ -24,7 +27,10 @@ impl Debug for WebStorage { impl WebStorage { pub async fn new(db_name: &str) -> Result { - Self::configure(INDEXEDDB_STORAGE_VERSION, db_name, SPHERE_DB_STORE_NAMES).await + let storage = + Self::configure(INDEXEDDB_STORAGE_VERSION, db_name, SPHERE_DB_STORE_NAMES).await?; + WebStorage::clear_scratch_storage(storage.db.clone()).await?; + Ok(storage) } async fn configure(version: u32, db_name: &str, store_names: &[&str]) -> Result { @@ -58,12 +64,30 @@ impl WebStorage { store_name: name.to_string(), }) } + + /// We cannot dynamically create tables with IndexedDb, so we create + /// a generic `SCRATCH_STORE` table that all scratch stores use, + /// each partitioning keys by a random key prefix. In lieu of + /// removing these values on [TempWebStore] drop (no async drop), we + /// clear out the scratch storage on [WebStorage] instantiation. + async fn clear_scratch_storage(db: Rc) -> Result<()> { + let scratch = WebStore { + db, + store_name: SCRATCH_STORE.to_owned(), + }; + let (store, tx) = scratch.start_transaction(TransactionMode::ReadWrite)?; + store + .clear() + .await + .map_err(|error| anyhow!("{:?}", error))?; + WebStore::finish_transaction(tx).await?; + Ok(()) + } } #[async_trait(?Send)] impl Storage for WebStorage { type BlockStore = WebStore; - type KeyValueStore = WebStore; async fn get_block_store(&self, name: &str) -> Result { @@ -75,6 +99,15 @@ impl Storage for WebStorage { } } +#[async_trait(?Send)] +impl Scratch for WebStorage { + type ScratchStore = TempWebStore; + + async fn get_scratch_store(&self) -> Result { + Ok(TempWebStore::new(self.db.clone())) + } +} + #[derive(Clone)] pub struct WebStore { db: Rc, @@ -82,7 +115,10 @@ pub struct WebStore { } impl WebStore { - fn start_transaction(&self, mode: TransactionMode) -> Result<(IdbStore, Transaction)> { + pub(crate) fn start_transaction( + &self, + mode: TransactionMode, + ) -> Result<(IdbStore, Transaction)> { let tx = self .db .transaction(&[&self.store_name], mode) @@ -94,7 +130,7 @@ impl WebStore { Ok((store, tx)) } - async fn finish_transaction(tx: Transaction) -> Result<()> { + pub(crate) async fn finish_transaction(tx: Transaction) -> Result<()> { tx.done().await.map_err(|error| anyhow!("{:?}", error))?; Ok(()) } @@ -179,3 +215,108 @@ impl Store for WebStore { Ok(old_value) } } + +#[derive(Clone)] +/// A [WebStore] that does not persist data after dropping. +/// Can be created from [WebStorage]'s [Scratch] implementation. +pub struct TempWebStore { + store: WebStore, + partition_name: Vec, +} + +impl TempWebStore { + pub(crate) fn new(db: Rc) -> Self { + let store = WebStore { + db, + store_name: SCRATCH_STORE.to_owned(), + }; + let partition_name = format!("temp-web-store-{}/", rand::random::()).into(); + TempWebStore { + store, + partition_name, + } + } + + fn partition_key(&self, key: &[u8]) -> Vec { + vec![self.partition_name.clone(), key.to_owned()].concat() + } +} + +#[async_trait(?Send)] +impl Store for TempWebStore { + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(&self.partition_key(key)).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(&self.partition_key(key), bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(&self.partition_key(key)).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::key_value::KeyValueStore; + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + wasm_bindgen_test_configure!(run_in_browser); + + #[derive(Clone)] + pub struct WebStorageV1 { + db: Rc, + } + + impl WebStorageV1 { + pub async fn new(db_name: &str) -> Result { + const V1_STORES: [&str; 4] = ["blocks", "links", "versions", "metadata"]; + let mut builder = RexieBuilder::new(db_name).version(1); + + for name in V1_STORES { + builder = builder.add_object_store(ObjectStore::new(name).auto_increment(false)); + } + + let db = builder + .build() + .await + .map_err(|error| anyhow!("{:?}", error))?; + + Ok(WebStorageV1 { db: Rc::new(db) }) + } + + async fn get_store(&self, name: &str) -> Result { + if self + .db + .store_names() + .iter() + .find(|val| val.as_str() == name) + .is_none() + { + return Err(anyhow!("No such store named {}", name)); + } + + Ok(WebStore { + db: self.db.clone(), + store_name: name.to_string(), + }) + } + } + + #[wasm_bindgen_test] + async fn it_can_upgrade_from_v1() -> Result<()> { + let key = String::from("foo"); + let value = String::from("bar"); + { + let storage_v1 = WebStorageV1::new("v1_test").await?; + let mut store_v1 = storage_v1.get_store("links").await?; + store_v1.set_key(&key, &value).await?; + } + + let storage_v2 = WebStorage::new("v1_test").await?; + let store_v2 = storage_v2.get_store("links").await?; + assert_eq!(store_v2.get_key::<_, String>(&key).await?.unwrap(), value); + Ok(()) + } +} diff --git a/rust/noosphere-storage/src/lib.rs b/rust/noosphere-storage/src/lib.rs index 34ea6b44d..f32e1cc18 100644 --- a/rust/noosphere-storage/src/lib.rs +++ b/rust/noosphere-storage/src/lib.rs @@ -13,6 +13,7 @@ mod key_value; mod db; mod encoding; mod retry; +mod scratch; mod storage; mod store; mod tap; @@ -25,11 +26,12 @@ pub use encoding::*; pub use implementation::*; pub use key_value::*; pub use retry::*; +pub use scratch::*; pub use storage::*; pub use store::*; pub use tap::*; -#[cfg(test)] +#[cfg(feature = "helpers")] pub mod helpers; #[cfg(test)] diff --git a/rust/noosphere-storage/src/scratch.rs b/rust/noosphere-storage/src/scratch.rs new file mode 100644 index 000000000..9f0053a97 --- /dev/null +++ b/rust/noosphere-storage/src/scratch.rs @@ -0,0 +1,25 @@ +use crate::key_value::KeyValueStore; +use anyhow::Result; +use async_trait::async_trait; + +#[cfg(not(target_arch = "wasm32"))] +pub trait ScratchSendSync: Send + Sync {} + +#[cfg(not(target_arch = "wasm32"))] +impl ScratchSendSync for T where T: Send + Sync {} + +#[cfg(target_arch = "wasm32")] +pub trait ScratchSendSync {} + +#[cfg(target_arch = "wasm32")] +impl ScratchSendSync for T {} + +/// [Scratch] is a general trait for a storage provider to provide +/// a temporary, isolated [KeyValueStore] that does not persist after dropping. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait Scratch: ScratchSendSync { + type ScratchStore: KeyValueStore; + + async fn get_scratch_store(&self) -> Result; +} diff --git a/rust/noosphere-storage/src/storage.rs b/rust/noosphere-storage/src/storage.rs index 4c104e090..4b54c7a5c 100644 --- a/rust/noosphere-storage/src/storage.rs +++ b/rust/noosphere-storage/src/storage.rs @@ -1,5 +1,6 @@ use crate::block::BlockStore; use crate::key_value::KeyValueStore; +use crate::Scratch; use anyhow::Result; use async_trait::async_trait; use std::fmt::Debug; @@ -24,7 +25,7 @@ impl StorageSendSync for T {} /// other Noosphere constructs. #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -pub trait Storage: Clone + StorageSendSync + Debug { +pub trait Storage: Scratch + Clone + StorageSendSync + Debug { type BlockStore: BlockStore; type KeyValueStore: KeyValueStore; @@ -34,5 +35,5 @@ pub trait Storage: Clone + StorageSendSync + Debug { /// Get a [KeyValueStore] where all values stored in it are scoped to the /// given name - async fn get_key_value_store(&self, name: &str) -> Result; + async fn get_key_value_store(&self, name: &str) -> Result<::KeyValueStore>; } diff --git a/rust/noosphere/Cargo.toml b/rust/noosphere/Cargo.toml index 8adc5f965..971b2694f 100644 --- a/rust/noosphere/Cargo.toml +++ b/rust/noosphere/Cargo.toml @@ -36,7 +36,7 @@ tokio-stream = { workspace = true } tokio-util = { version = "~0.7", features = ["io"] } libipld-core = { workspace = true } libipld-cbor = { workspace = true } -bytes = "^1" +bytes = { workspace = true } noosphere-core = { version = "0.15.1", path = "../noosphere-core" } noosphere-sphere = { version = "0.10.1", path = "../noosphere-sphere" }