diff --git a/Cargo.lock b/Cargo.lock index 090e772d7..69986f4dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1649,6 +1649,11 @@ name = "fastcdc" version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a71061d097bfa9a5a4d2efdec57990d9a88745020b365191d37e48541a1628f2" +dependencies = [ + "async-stream", + "tokio", + "tokio-stream", +] [[package]] name = "fastrand" @@ -3592,6 +3597,7 @@ dependencies = [ "async-trait", "base64 0.21.2", "byteorder", + "bytes", "cid", "console_error_panic_hook", "ed25519-zebra", @@ -3610,6 +3616,7 @@ dependencies = [ "serde_json", "strum 0.25.0", "strum_macros", + "tempfile", "thiserror", "tiny-bip39", "tokio", @@ -3798,6 +3805,7 @@ dependencies = [ "js-sys", "libipld-cbor", "libipld-core", + "rand 0.8.5", "rexie", "serde", "sled", diff --git a/Cargo.toml b/Cargo.toml index 034b50867..c1f16cef2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ resolver = "2" [workspace.dependencies] anyhow = { version = "1" } axum = { version = "^0.6.18" } +bytes = { version = "^1" } cid = { version = "0.10" } directories = { version = "5" } fastcdc = { version = "3.1" } @@ -30,6 +31,7 @@ libipld = { version = "0.16" } libipld-core = { version = "0.16" } libipld-cbor = { version = "0.16" } pathdiff = { version = "0.2.1" } +rand = { version = "0.8.5" } sentry-tracing = { version = "0.31.5" } serde = { version = "^1" } serde_json = { version = "^1" } diff --git a/rust/noosphere-cli/src/native/content.rs b/rust/noosphere-cli/src/native/content.rs index ab5377546..ecca2b53c 100644 --- a/rust/noosphere-cli/src/native/content.rs +++ b/rust/noosphere-cli/src/native/content.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, Result}; use cid::Cid; use globset::{Glob, GlobSet, GlobSetBuilder}; use noosphere_core::data::{BodyChunkIpld, ContentType}; -use noosphere_storage::{BlockStore, MemoryStore}; +use noosphere_storage::{BlockStore, MemoryStore, Scratch}; use pathdiff::diff_paths; use std::collections::{BTreeMap, BTreeSet}; use subtext::util::to_slug; @@ -84,7 +84,10 @@ impl Content { /// provided store. // TODO(#556): This is slow; we could probably do a concurrent traversal // similar to how we traverse when rendering files to disk - pub async fn read_all(paths: &SpherePaths, store: &mut S) -> Result { + pub async fn read_all( + paths: &SpherePaths, + store: &mut S, + ) -> Result { let root_path = paths.root(); let mut directories = vec![(None, tokio::fs::read_dir(root_path).await?)]; @@ -144,7 +147,7 @@ impl Content { }; let file_bytes = fs::read(path).await?; - let body_cid = BodyChunkIpld::store_bytes(&file_bytes, store).await?; + let body_cid = BodyChunkIpld::encode(file_bytes.as_ref(), store).await?; content.matched.insert( slug, @@ -172,7 +175,6 @@ impl Content { let mut new_blocks = MemoryStore::default(); let file_content = Content::read_all(workspace.require_sphere_paths()?, &mut new_blocks).await?; - let sphere_context = workspace.sphere_context().await?; let walker = SphereWalker::from(&sphere_context); diff --git a/rust/noosphere-core/Cargo.toml b/rust/noosphere-core/Cargo.toml index a6dc7ecd8..f4c4acd87 100644 --- a/rust/noosphere-core/Cargo.toml +++ b/rust/noosphere-core/Cargo.toml @@ -23,6 +23,8 @@ sentry = ["dep:sentry-tracing"] helpers = [] [dependencies] +bytes = { workspace = true } +tempfile = { workspace = true } tracing = { workspace = true } cid = { workspace = true } url = { workspace = true } @@ -35,14 +37,14 @@ async-stream = "~0.3" async-once-cell = "~0.4" anyhow = "^1" thiserror = { workspace = true } -fastcdc = { workspace = true } +fastcdc = { workspace = true, features = ["tokio"] } futures = "~0.3" serde = { workspace = true } serde_json = { workspace = true } byteorder = "^1.4" base64 = "0.21" ed25519-zebra = "^3" -rand = "~0.8" +rand = { workspace = true } once_cell = "^1" tiny-bip39 = "^1" tokio-stream = "~0.1" diff --git a/rust/noosphere-core/src/data/body_chunk.rs b/rust/noosphere-core/src/data/body_chunk.rs index 66d935164..618fef6f7 100644 --- a/rust/noosphere-core/src/data/body_chunk.rs +++ b/rust/noosphere-core/src/data/body_chunk.rs @@ -1,12 +1,18 @@ use anyhow::{anyhow, Result}; +use async_stream::try_stream; +use bytes::Bytes; use cid::Cid; -use fastcdc::v2020::FastCDC; +use fastcdc::v2020::{AsyncStreamCDC, FastCDC}; use libipld_cbor::DagCborCodec; +use noosphere_storage::{BlockStore, KeyValueStore, Scratch}; use serde::{Deserialize, Serialize}; - -use noosphere_storage::BlockStore; +use tokio::io::AsyncRead; +use tokio_stream::{Stream, StreamExt}; pub const BODY_CHUNK_MAX_SIZE: u32 = 1024 * 1024; // ~1mb/chunk worst case, ~.5mb/chunk average case +/// Encoding content larger than `CONTENT_STORAGE_MEMORY_LIMIT` will +/// use disk-storage rather than memory storage. +pub const CONTENT_STORAGE_MEMORY_LIMIT: u64 = 1024 * 1024 * 5; // 5mb /// A body chunk is a simplified flexible byte layout used for linking /// chunks of bytes. This is necessary to support cases when body contents @@ -21,7 +27,7 @@ pub struct BodyChunkIpld { } impl BodyChunkIpld { - // TODO(#498): Re-write to address potentially unbounded memory overhead + #[deprecated] pub async fn store_bytes(bytes: &[u8], store: &mut S) -> Result { let chunks = FastCDC::new( bytes, @@ -56,7 +62,7 @@ impl BodyChunkIpld { next_chunk_cid.ok_or_else(|| anyhow!("No CID; did you try to store zero bytes?")) } - // TODO(#498): Re-write to address potentially unbounded memory overhead + #[deprecated] pub async fn load_all_bytes(&self, store: &S) -> Result> { let mut all_bytes = self.bytes.clone(); let mut next_cid = self.next; @@ -70,4 +76,339 @@ impl BodyChunkIpld { Ok(all_bytes) } + + /// Encode `content` as a [BodyChunkIpld] chain in streaming fashion. + pub async fn encode(content: R, store: &S) -> Result + where + R: AsyncRead + Unpin, + S: Scratch + BlockStore, + { + let mut chunker = AsyncStreamCDC::new( + content, + fastcdc::v2020::MINIMUM_MIN, + BODY_CHUNK_MAX_SIZE / 2, + BODY_CHUNK_MAX_SIZE, + ); + let stream = chunker.as_stream(); + tokio::pin!(stream); + + let mut encoder = BodyChunkEncoder::new(store); + while let Some(chunk) = stream.try_next().await? { + encoder.push(chunk.data).await?; + } + encoder.encode().await + } + + /// Decode a [BodyChunkIpld] chain via [Cid] into a [Bytes] stream. + pub fn decode( + cid: &Cid, + store: &S, + ) -> impl Stream> + Unpin + where + S: BlockStore, + { + let mut next = Some(*cid); + let store = store.clone(); + Box::pin(try_stream! { + while let Some(cid) = next { + debug!("Unpacking block {}...", cid); + let chunk = store.load::(&cid).await.map_err(|error| { + std::io::Error::new(std::io::ErrorKind::UnexpectedEof, error.to_string()) + })?; + yield Bytes::from(chunk.bytes); + next = chunk.next; + } + }) + } +} + +/// `BodyChunkEncoder` is responsible for collecting chunks from `fastcdc`, +/// and encoding as a sequence of [BodyChunkIpld]'s linked together. +/// As the chunker runs from the start of the buffer, and an immutable linked +/// list requires the [Cid] of the next item, we need to construct the linked +/// list in reverse. +/// +/// The encoder takes two passes: one to collect the chunks, +/// and then constructs the linked list in reverse. To prevent unbounded memory usage, +/// chunks greater than some threshold will be written to the [Scratch] provider. +struct BodyChunkEncoder<'a, S: Scratch + BlockStore> { + store: &'a S, + data: BlobVec<'a, S>, +} + +impl<'a, S: Scratch + BlockStore> BodyChunkEncoder<'a, S> { + pub fn new(store: &'a S) -> Self { + let data = BlobVec::new(store, CONTENT_STORAGE_MEMORY_LIMIT); + BodyChunkEncoder { store, data } + } + + /// Push a new chunk to be encoded. + pub async fn push(&mut self, chunk: Vec) -> Result<()> { + self.data.push(chunk).await?; + Ok(()) + } + + /// Consumes this [BodyChunkEncoder], encoding into the store, returning + /// the root [Cid] upon success. + pub async fn encode(self) -> Result { + if self.data.len() == &0 { + return Err(anyhow!("Could not encode empty buffer.")); + } + + let mut store = self.store.to_owned(); + let mut next_chunk_cid = None; + let chunk_stream = self.data.into_stream_rev().await; + tokio::pin!(chunk_stream); + + while let Some(chunk) = chunk_stream.try_next().await? { + next_chunk_cid = Some( + store + .save::(&BodyChunkIpld { + bytes: chunk, + next: next_chunk_cid, + }) + .await?, + ); + } + + match next_chunk_cid { + Some(cid) => Ok(cid), + None => Err(anyhow!("Could not encode empty buffer.")), + } + } +} + +/// Container for short-term storage of sequenced blobs +/// for consumption as a forward or reverse stream. +/// [BlobVec] data is backed by memory, dynamically converting +/// to [Scratch] provider storage if memory threshold is reached. +/// Blobs can then be drained via `BlobVec::into_stream`. +struct BlobVec<'a, S: Scratch> { + chunks: BlobVecData, + memory_limit: u64, + provider: &'a S, +} + +impl<'a, S: Scratch> BlobVec<'a, S> { + /// Creates a new [BlobVec] that stores data in memory + /// until `memory_limit` is reached. + pub fn new(provider: &'a S, memory_limit: u64) -> Self { + BlobVec { + chunks: BlobVecData::new_with_memory(), + memory_limit, + provider, + } + } + + /// Returns the number of items pushed. + pub fn len(&self) -> &usize { + self.chunks.chunk_count() + } + + /// Push a new item. + pub async fn push(&mut self, chunk: Vec) -> Result<()> { + self.chunks.push(chunk).await?; + + #[cfg(not(target_arch = "wasm32"))] + if matches!(self.chunks, BlobVecData::Memory { .. }) + && self.chunks.byte_length() > &self.memory_limit + { + self.upgrade_storage().await?; + } + + Ok(()) + } + + /// Drain all items into a stream. + #[allow(unused)] + pub async fn into_stream(self) -> impl Stream, anyhow::Error>> + Unpin { + self.chunks.into_stream(false).await + } + + /// Drain all items into a stream in reverse. + pub async fn into_stream_rev( + self, + ) -> impl Stream, anyhow::Error>> + Unpin { + self.chunks.into_stream(true).await + } + + #[cfg(not(target_arch = "wasm32"))] + async fn upgrade_storage(&mut self) -> Result<()> { + let store = self.provider.get_scratch_store().await?; + let mut storage = BlobVecData::new_with_db(store).await?; + std::mem::swap(&mut self.chunks, &mut storage); + let previous_stream = storage.into_stream(false).await; + tokio::pin!(previous_stream); + while let Some(chunk) = previous_stream.try_next().await? { + self.chunks.push(chunk).await?; + } + Ok(()) + } +} + +enum BlobVecData { + Memory { + chunks: Vec>, + chunk_count: usize, + byte_length: u64, + }, + #[cfg(not(target_arch = "wasm32"))] + Db { + store: S, + chunk_count: usize, + byte_length: u64, + }, +} + +impl BlobVecData { + pub fn new_with_memory() -> Self { + BlobVecData::Memory { + chunks: vec![vec![]], + chunk_count: 0, + byte_length: 0, + } + } + + pub async fn new_with_db(store: S) -> Result { + Ok(BlobVecData::Db { + store, + chunk_count: 0, + byte_length: 0, + }) + } + + #[allow(unused)] + pub fn chunk_count(&self) -> &usize { + match self { + BlobVecData::Memory { chunk_count, .. } => chunk_count, + BlobVecData::Db { chunk_count, .. } => chunk_count, + } + } + + pub fn byte_length(&self) -> &u64 { + match self { + BlobVecData::Memory { byte_length, .. } => byte_length, + BlobVecData::Db { byte_length, .. } => byte_length, + } + } + + pub async fn push(&mut self, chunk: Vec) -> Result<()> { + match self { + BlobVecData::Memory { + chunks, + ref mut chunk_count, + ref mut byte_length, + } => { + *byte_length += chunk.len() as u64; + chunks.push(chunk); + *chunk_count += 1; + } + BlobVecData::Db { + store, + ref mut chunk_count, + ref mut byte_length, + .. + } => { + *byte_length += chunk.len() as u64; + let key = format!("{}", chunk_count); + store.set_key(&key, chunk).await?; + *chunk_count += 1; + } + } + Ok(()) + } + + pub async fn into_stream( + self, + is_reversed: bool, + ) -> impl Stream, anyhow::Error>> + Unpin { + Box::pin(try_stream! { + match self { + BlobVecData::Memory { + chunks, + .. + } => { + if is_reversed { + let iterator = chunks.into_iter().rev(); + for chunk in iterator { + yield chunk; + } + } else { + let iterator = chunks.into_iter(); + for chunk in iterator { + yield chunk; + } + } + }, + BlobVecData::Db{ + store, + chunk_count, + .. + } => { + for n in 0..chunk_count { + let index = if is_reversed { + chunk_count - n - 1 + } else { + n + }; + let key = format!("{}", index); + let chunk = store.get_key(&key).await?; + yield chunk.ok_or_else(|| anyhow!("Missing chunk."))?; + } + } + } + }) + } +} + +#[cfg(test)] +mod tests { + use noosphere_storage::{helpers::make_disposable_storage, MemoryStore, SphereDb}; + use tokio_stream::StreamExt; + + use super::*; + + #[tokio::test] + async fn it_reads_and_writes_chunks_nonstreaming() -> Result<()> { + let mut store = MemoryStore::default(); + let chunk1 = vec![1; BODY_CHUNK_MAX_SIZE.try_into().unwrap()]; + let chunk2 = vec![2; BODY_CHUNK_MAX_SIZE.try_into().unwrap()]; + let chunk3 = vec![3; >::try_into(BODY_CHUNK_MAX_SIZE).unwrap() / 2]; + let bytes = [chunk1.clone(), chunk2.clone(), chunk3.clone()].concat(); + + #[allow(deprecated)] + let cid = BodyChunkIpld::store_bytes(&bytes, &mut store).await?; + let ipld_chunk = store.load::(&cid).await?; + + #[allow(deprecated)] + let output = ipld_chunk.load_all_bytes(&store).await?; + assert_eq!(output, bytes); + Ok(()) + } + + #[tokio::test] + async fn it_reads_and_writes_chunks_streaming() -> Result<()> { + let provider = make_disposable_storage().await?; + let db = SphereDb::new(&provider).await?; + let mut chunks = vec![]; + for n in 1..10 { + let mut chunk: Vec = vec![0; BODY_CHUNK_MAX_SIZE.try_into().unwrap()]; + chunk.fill(n); + chunks.push(chunk); + } + let bytes = chunks.concat(); + assert!(bytes.len() as u64 > CONTENT_STORAGE_MEMORY_LIMIT); + + let cid = BodyChunkIpld::encode(bytes.as_ref(), &db).await?; + + let stream = BodyChunkIpld::decode(&cid, &db); + drop(db); + tokio::pin!(stream); + let mut output: Vec> = vec![]; + while let Some(chunk) = stream.try_next().await? { + output.push(chunk.into()); + } + assert_eq!(output.concat(), bytes); + Ok(()) + } } diff --git a/rust/noosphere-gateway/Cargo.toml b/rust/noosphere-gateway/Cargo.toml index bb6ad6b20..4222b5458 100644 --- a/rust/noosphere-gateway/Cargo.toml +++ b/rust/noosphere-gateway/Cargo.toml @@ -25,7 +25,7 @@ iroh-car = { workspace = true } thiserror = { workspace = true } strum = "0.25" strum_macros = "0.25" -bytes = "^1" +bytes = { workspace = true } tokio = { version = "^1", features = ["full"] } tokio-stream = "~0.1" diff --git a/rust/noosphere-into/Cargo.toml b/rust/noosphere-into/Cargo.toml index fb2f47aaa..eae5c93d2 100644 --- a/rust/noosphere-into/Cargo.toml +++ b/rust/noosphere-into/Cargo.toml @@ -31,7 +31,7 @@ horrorshow = "~0.8" cid = { workspace = true } libipld-cbor = { workspace = true } -bytes = "^1" +bytes = { workspace = true } tokio-stream = "~0.1" tokio-util = "~0.7" tokio = { version = "^1", features = ["io-util", "macros", "test-util"] } diff --git a/rust/noosphere-ipfs/Cargo.toml b/rust/noosphere-ipfs/Cargo.toml index c3e3fb982..fd14c4a2e 100644 --- a/rust/noosphere-ipfs/Cargo.toml +++ b/rust/noosphere-ipfs/Cargo.toml @@ -50,7 +50,7 @@ ipfs-api-prelude = "~0.5" [dev-dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -rand = "~0.8" +rand = { workspace = true } iroh-car = { workspace = true } libipld-cbor = { workspace = true } noosphere-storage = { version = "0.8.1", path = "../noosphere-storage" } diff --git a/rust/noosphere-ipfs/src/storage.rs b/rust/noosphere-ipfs/src/storage.rs index 67b785c76..815ad51a8 100644 --- a/rust/noosphere-ipfs/src/storage.rs +++ b/rust/noosphere-ipfs/src/storage.rs @@ -2,7 +2,7 @@ use crate::IpfsClient; use anyhow::Result; use async_trait::async_trait; use cid::Cid; -use noosphere_storage::{BlockStore, Storage}; +use noosphere_storage::{BlockStore, Scratch, Storage}; use std::sync::Arc; use tokio::sync::RwLock; @@ -56,7 +56,6 @@ where C: IpfsClient + IpfsStorageConditionalSendSync, { type BlockStore = IpfsStore; - type KeyValueStore = S::KeyValueStore; async fn get_block_store(&self, name: &str) -> Result { @@ -69,6 +68,20 @@ where } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Scratch for IpfsStorage +where + S: Storage + IpfsStorageConditionalSendSync, + C: IpfsClient + IpfsStorageConditionalSendSync, +{ + type ScratchStore = ::KeyValueStore; + + async fn get_scratch_store(&self) -> Result { + unimplemented!(); + } +} + /// An implementation of [BlockStore] that wraps some other implementation of /// same. It forwards most behavior to its wrapped implementation, except when /// reading blocks. In that case, if a block cannot be found locally, it will diff --git a/rust/noosphere-ns/Cargo.toml b/rust/noosphere-ns/Cargo.toml index 3a2375363..3b1b60d21 100644 --- a/rust/noosphere-ns/Cargo.toml +++ b/rust/noosphere-ns/Cargo.toml @@ -57,7 +57,7 @@ url = { version = "^2", features = [ "serde" ], optional = true } [dev-dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -rand = { version = "0.8.5" } +rand = { workspace = true } libipld-cbor = { workspace = true } tempfile = { workspace = true } diff --git a/rust/noosphere-sphere/Cargo.toml b/rust/noosphere-sphere/Cargo.toml index 184ee996b..e50bdd659 100644 --- a/rust/noosphere-sphere/Cargo.toml +++ b/rust/noosphere-sphere/Cargo.toml @@ -37,7 +37,7 @@ tokio-util = { version = "0.7.7", features = ["io"] } futures-util = "0.3.27" libipld-core = { workspace = true } libipld-cbor = { workspace = true } -bytes = "^1" +bytes = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } diff --git a/rust/noosphere-sphere/src/content/decoder.rs b/rust/noosphere-sphere/src/content/decoder.rs deleted file mode 100644 index 643d8527e..000000000 --- a/rust/noosphere-sphere/src/content/decoder.rs +++ /dev/null @@ -1,29 +0,0 @@ -use async_stream::try_stream; -use bytes::Bytes; -use cid::Cid; -use libipld_cbor::DagCborCodec; -use noosphere_core::data::BodyChunkIpld; -use noosphere_storage::BlockStore; -use tokio_stream::Stream; - -/// Helper to easily decode a linked list of `BodyChunkIpld` as a byte stream -pub struct BodyChunkDecoder<'a, 'b, S: BlockStore>(pub &'a Cid, pub &'b S); - -impl<'a, 'b, S: BlockStore> BodyChunkDecoder<'a, 'b, S> { - /// Consume the [BodyChunkDecoder] and return an async [Stream] of bytes - /// representing the raw body contents - pub fn stream(self) -> impl Stream> + Unpin { - let mut next = Some(*self.0); - let store = self.1.clone(); - Box::pin(try_stream! { - while let Some(cid) = next { - debug!("Unpacking block {}...", cid); - let chunk = store.load::(&cid).await.map_err(|error| { - std::io::Error::new(std::io::ErrorKind::UnexpectedEof, error.to_string()) - })?; - yield Bytes::from(chunk.bytes); - next = chunk.next; - } - }) - } -} diff --git a/rust/noosphere-sphere/src/content/mod.rs b/rust/noosphere-sphere/src/content/mod.rs index ec6444013..18f9dbcad 100644 --- a/rust/noosphere-sphere/src/content/mod.rs +++ b/rust/noosphere-sphere/src/content/mod.rs @@ -2,12 +2,10 @@ //! with a public "slug", that is addressable by them or others who have replicated the sphere //! data. -mod decoder; mod file; mod read; mod write; -pub use decoder::*; pub use file::*; pub use read::*; pub use write::*; diff --git a/rust/noosphere-sphere/src/content/write.rs b/rust/noosphere-sphere/src/content/write.rs index 2904d21e9..df2052414 100644 --- a/rust/noosphere-sphere/src/content/write.rs +++ b/rust/noosphere-sphere/src/content/write.rs @@ -156,7 +156,8 @@ where // amount to byte streams, but in point of fact we can support anything // that may be referenced by CID including arbitrary IPLD structures let body_cid = - BodyChunkIpld::store_bytes(&bytes, self.sphere_context_mut().await?.db_mut()).await?; + BodyChunkIpld::encode(bytes.as_ref(), self.sphere_context_mut().await?.db_mut()) + .await?; self.link(slug, content_type, &body_cid, additional_headers) .await diff --git a/rust/noosphere-sphere/src/internal.rs b/rust/noosphere-sphere/src/internal.rs index d6913ed17..7f507eedf 100644 --- a/rust/noosphere-sphere/src/internal.rs +++ b/rust/noosphere-sphere/src/internal.rs @@ -1,4 +1,4 @@ -use super::{BodyChunkDecoder, SphereFile}; +use super::SphereFile; use crate::{AsyncFileBody, HasSphereContext}; use anyhow::{anyhow, Result}; use async_trait::async_trait; @@ -9,7 +9,7 @@ use tokio_util::io::StreamReader; use cid::Cid; use noosphere_core::{ authority::Access, - data::{ContentType, Header, Link, MemoIpld}, + data::{BodyChunkIpld, ContentType, Header, Link, MemoIpld}, }; /// A module-private trait for internal trait methods; this is a workaround for @@ -86,7 +86,7 @@ where let stream = match content_type { // TODO(#86): Content-type aware decoding of body bytes - Some(_) => BodyChunkDecoder(&memo.body, &db).stream(), + Some(_) => BodyChunkIpld::decode(&memo.body, &db), None => return Err(anyhow!("No content type specified")), }; diff --git a/rust/noosphere-sphere/src/replication/stream.rs b/rust/noosphere-sphere/src/replication/stream.rs index 6150afbb7..f4e8fba81 100644 --- a/rust/noosphere-sphere/src/replication/stream.rs +++ b/rust/noosphere-sphere/src/replication/stream.rs @@ -6,7 +6,7 @@ use cid::Cid; use libipld_cbor::DagCborCodec; use noosphere_core::{ authority::collect_ucan_proofs, - data::{ContentType, Link, MemoIpld, SphereIpld}, + data::{BodyChunkIpld, ContentType, Link, MemoIpld, SphereIpld}, view::Sphere, }; use noosphere_storage::{BlockStore, BlockStoreTap, UcanStore}; @@ -17,7 +17,6 @@ use ucan::{store::UcanJwtStore, Ucan}; use crate::{ walk_versioned_map_changes_and, walk_versioned_map_elements, walk_versioned_map_elements_and, - BodyChunkDecoder, }; /// Stream all the blocks required to reconstruct the history of a sphere since a @@ -231,7 +230,7 @@ where revocations_result??; } Some(_) => { - let stream = BodyChunkDecoder(&memo.body, &store).stream(); + let stream = BodyChunkIpld::decode(&memo.body, &store); drop(store); @@ -275,8 +274,8 @@ mod tests { use crate::{ car_stream, helpers::{simulated_sphere_context, touch_all_sphere_blocks, SimulationAccess}, - memo_body_stream, memo_history_stream, BodyChunkDecoder, HasMutableSphereContext, - HasSphereContext, SphereContentWrite, SpherePetnameWrite, + memo_body_stream, memo_history_stream, HasMutableSphereContext, HasSphereContext, + SphereContentWrite, SpherePetnameWrite, }; #[cfg(target_arch = "wasm32")] @@ -538,7 +537,7 @@ mod tests { let memo = store.load::(&content_cid).await?; let mut buffer = Vec::new(); - let body_stream = BodyChunkDecoder(&memo.body, &store).stream(); + let body_stream = BodyChunkIpld::decode(&memo.body, &store); tokio::pin!(body_stream); diff --git a/rust/noosphere-storage/Cargo.toml b/rust/noosphere-storage/Cargo.toml index cb73836d1..cebd03d6d 100644 --- a/rust/noosphere-storage/Cargo.toml +++ b/rust/noosphere-storage/Cargo.toml @@ -30,12 +30,13 @@ tracing = "~0.1" ucan = { workspace = true } libipld-core = { workspace = true } libipld-cbor = { workspace = true } +rand = { workspace = true } serde = { workspace = true } base64 = "=0.21.2" url = { version = "^2" } +witty-phrase-generator = { version = "~0.2", optional = true } [dev-dependencies] -witty-phrase-generator = "~0.2" wasm-bindgen-test = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] @@ -54,3 +55,7 @@ features = [ "Window", "DedicatedWorkerGlobalScope", ] + +[features] +default = ["helpers"] +helpers = ["dep:witty-phrase-generator"] \ No newline at end of file diff --git a/rust/noosphere-storage/src/db.rs b/rust/noosphere-storage/src/db.rs index 09660d07d..e3e755174 100644 --- a/rust/noosphere-storage/src/db.rs +++ b/rust/noosphere-storage/src/db.rs @@ -13,7 +13,7 @@ use std::{collections::BTreeSet, fmt::Debug}; use tokio_stream::{Stream, StreamExt}; use ucan::store::{UcanStore, UcanStoreConditionalSend}; -use crate::{BlockStore, BlockStoreSend, KeyValueStore, MemoryStore, Storage}; +use crate::{BlockStore, BlockStoreSend, KeyValueStore, MemoryStore, Scratch, Storage}; use async_stream::try_stream; @@ -45,8 +45,9 @@ pub const SPHERE_DB_STORE_NAMES: &[&str] = #[derive(Clone, Debug)] pub struct SphereDb where - S: Storage, + S: Storage + Scratch, { + storage: S, block_store: S::BlockStore, link_store: S::KeyValueStore, version_store: S::KeyValueStore, @@ -55,10 +56,11 @@ where impl SphereDb where - S: Storage, + S: Storage + Scratch, { pub async fn new(storage: &S) -> Result> { Ok(SphereDb { + storage: storage.to_owned(), block_store: storage.get_block_store(BLOCK_STORE).await?, link_store: storage.get_key_value_store(LINK_STORE).await?, version_store: storage.get_key_value_store(VERSION_STORE).await?, @@ -244,7 +246,7 @@ where #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl BlockStore for SphereDb where - S: Storage, + S: Storage + Scratch, { async fn put_links(&mut self, cid: &Cid, block: &[u8]) -> Result<()> where @@ -274,7 +276,7 @@ where #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl KeyValueStore for SphereDb where - S: Storage, + S: Storage + Scratch, { async fn set_key(&mut self, key: K, value: V) -> Result<()> where @@ -304,7 +306,7 @@ where #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl UcanStore for SphereDb where - S: Storage, + S: Storage + Scratch, { async fn read>(&self, cid: &Cid) -> Result> { self.get::(cid).await @@ -318,6 +320,19 @@ where } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Scratch for SphereDb +where + S: Storage + Scratch, +{ + type ScratchStore = ::ScratchStore; + + async fn get_scratch_store(&self) -> Result { + self.storage.get_scratch_store().await + } +} + #[cfg(test)] mod tests { diff --git a/rust/noosphere-storage/src/helpers.rs b/rust/noosphere-storage/src/helpers.rs index 0e423a831..456363990 100644 --- a/rust/noosphere-storage/src/helpers.rs +++ b/rust/noosphere-storage/src/helpers.rs @@ -6,6 +6,12 @@ use crate::{NativeStorage, NativeStorageInit, NativeStore}; #[cfg(not(target_arch = "wasm32"))] pub async fn make_disposable_store() -> Result { + let provider = make_disposable_storage().await?; + provider.get_block_store("foo").await +} + +#[cfg(not(target_arch = "wasm32"))] +pub async fn make_disposable_storage() -> Result { let temp_dir = std::env::temp_dir(); let temp_name: String = witty_phrase_generator::WPGen::new() .with_words(3) @@ -14,8 +20,7 @@ pub async fn make_disposable_store() -> Result { .map(String::from) .collect(); let db = sled::open(temp_dir.join(temp_name)).unwrap(); - let provider = NativeStorage::new(NativeStorageInit::Db(db))?; - provider.get_block_store("foo").await + NativeStorage::new(NativeStorageInit::Db(db)) } #[cfg(target_arch = "wasm32")] @@ -23,6 +28,12 @@ use crate::{WebStorage, WebStore}; #[cfg(target_arch = "wasm32")] pub async fn make_disposable_store() -> Result { + let provider = make_disposable_storage().await?; + provider.get_block_store(crate::db::BLOCK_STORE).await +} + +#[cfg(target_arch = "wasm32")] +pub async fn make_disposable_storage() -> Result { let temp_name: String = witty_phrase_generator::WPGen::new() .with_words(3) .unwrap() @@ -30,6 +41,5 @@ pub async fn make_disposable_store() -> Result { .map(|word| String::from(word)) .collect(); - let provider = WebStorage::new(&temp_name).await?; - provider.get_block_store(crate::db::BLOCK_STORE).await + WebStorage::new(&temp_name).await } diff --git a/rust/noosphere-storage/src/implementation/memory.rs b/rust/noosphere-storage/src/implementation/memory.rs index 0b1097d4e..4d6e1600a 100644 --- a/rust/noosphere-storage/src/implementation/memory.rs +++ b/rust/noosphere-storage/src/implementation/memory.rs @@ -6,6 +6,7 @@ use std::{collections::HashMap, sync::Arc}; use crate::storage::Storage; use crate::store::Store; +use crate::Scratch; #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -45,7 +46,6 @@ impl MemoryStorage { #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Storage for MemoryStorage { type BlockStore = MemoryStore; - type KeyValueStore = MemoryStore; async fn get_block_store(&self, name: &str) -> Result { @@ -130,3 +130,24 @@ impl Store for MemoryStore { Ok(dags.remove(key)) } } + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Scratch for MemoryStorage { + type ScratchStore = MemoryStore; + async fn get_scratch_store(&self) -> Result { + Ok(MemoryStore::default()) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Scratch for MemoryStore { + type ScratchStore = MemoryStore; + // It's uncommon for a Store to implement a Scratch, but trivial + // for an in-memory implementation, and makes temporary stores + // easier to work with in the codebase. + async fn get_scratch_store(&self) -> Result { + Ok(MemoryStore::default()) + } +} diff --git a/rust/noosphere-storage/src/implementation/native.rs b/rust/noosphere-storage/src/implementation/native.rs index 62bf1658c..e5ffd1961 100644 --- a/rust/noosphere-storage/src/implementation/native.rs +++ b/rust/noosphere-storage/src/implementation/native.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; -use crate::storage::Storage; use crate::store::Store; +use crate::{storage::Storage, Scratch}; use anyhow::Result; use async_trait::async_trait; @@ -38,7 +38,6 @@ impl NativeStorage { #[async_trait] impl Storage for NativeStorage { type BlockStore = NativeStore; - type KeyValueStore = NativeStore; async fn get_block_store(&self, name: &str) -> Result { @@ -99,3 +98,55 @@ impl Drop for NativeStorage { let _ = self.db.flush(); } } + +#[async_trait] +impl Scratch for NativeStorage { + type ScratchStore = TempNativeStore; + + async fn get_scratch_store(&self) -> Result { + TempNativeStore::new(&self.db) + } +} + +#[derive(Clone)] +pub struct TempNativeStore { + db: Db, + name: String, + store: NativeStore, +} + +impl TempNativeStore { + pub fn new(db: &Db) -> Result { + let db = db.to_owned(); + let name = format!("temp-native-store-{}", rand::random::()); + let store = NativeStore::new(&db.open_tree(&name)?); + Ok(TempNativeStore { db, store, name }) + } +} + +#[async_trait] +impl Store for TempNativeStore { + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(key).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(key, bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(key).await + } + + async fn flush(&self) -> Result<()> { + self.store.flush().await + } +} + +impl Drop for TempNativeStore { + fn drop(&mut self) { + if let Err(e) = self.db.drop_tree(&self.name) { + error!("Could not drop temporary tree: {}", e); + } + } +} diff --git a/rust/noosphere-storage/src/implementation/tracking.rs b/rust/noosphere-storage/src/implementation/tracking.rs index 3110d7678..99b7f9835 100644 --- a/rust/noosphere-storage/src/implementation/tracking.rs +++ b/rust/noosphere-storage/src/implementation/tracking.rs @@ -3,7 +3,7 @@ use async_std::sync::Mutex; use async_trait::async_trait; use std::sync::Arc; -use crate::{store::Store, MemoryStorage, MemoryStore, Storage}; +use crate::{store::Store, MemoryStorage, MemoryStore, Scratch, Storage}; #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct StoreStats { @@ -90,7 +90,6 @@ impl TrackingStorage { #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Storage for TrackingStorage { type BlockStore = TrackingStore; - type KeyValueStore = TrackingStore; async fn get_block_store(&self, name: &str) -> Result { @@ -103,3 +102,16 @@ impl Storage for TrackingStorage { Ok(key_value_store) } } + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Scratch for TrackingStorage +where + T: Storage + Scratch, + T::ScratchStore: Store, +{ + type ScratchStore = TrackingStore<::ScratchStore>; + async fn get_scratch_store(&self) -> Result { + Ok(TrackingStore::wrap(self.storage.get_scratch_store().await?)) + } +} diff --git a/rust/noosphere-storage/src/implementation/web.rs b/rust/noosphere-storage/src/implementation/web.rs index bbaea4ea9..b0b3f62d2 100644 --- a/rust/noosphere-storage/src/implementation/web.rs +++ b/rust/noosphere-storage/src/implementation/web.rs @@ -63,8 +63,8 @@ impl WebStorage { #[async_trait(?Send)] impl Storage for WebStorage { type BlockStore = WebStore; - type KeyValueStore = WebStore; + type TempStore = WebStore; async fn get_block_store(&self, name: &str) -> Result { self.get_store(name).await @@ -75,7 +75,16 @@ impl Storage for WebStorage { } } -#[derive(Clone)] +#[async_trait(?Send)] +impl Scratch for WebStorage { + type KeyValueStore = WebStore; + + async fn get_scratch_store(&self) -> Result { + self.get_store("foo").await + } +} + +#[derive(Clone, Clone)] pub struct WebStore { db: Rc, store_name: String, diff --git a/rust/noosphere-storage/src/lib.rs b/rust/noosphere-storage/src/lib.rs index 34ea6b44d..f32e1cc18 100644 --- a/rust/noosphere-storage/src/lib.rs +++ b/rust/noosphere-storage/src/lib.rs @@ -13,6 +13,7 @@ mod key_value; mod db; mod encoding; mod retry; +mod scratch; mod storage; mod store; mod tap; @@ -25,11 +26,12 @@ pub use encoding::*; pub use implementation::*; pub use key_value::*; pub use retry::*; +pub use scratch::*; pub use storage::*; pub use store::*; pub use tap::*; -#[cfg(test)] +#[cfg(feature = "helpers")] pub mod helpers; #[cfg(test)] diff --git a/rust/noosphere-storage/src/scratch.rs b/rust/noosphere-storage/src/scratch.rs new file mode 100644 index 000000000..4e9414605 --- /dev/null +++ b/rust/noosphere-storage/src/scratch.rs @@ -0,0 +1,25 @@ +use crate::key_value::KeyValueStore; +use anyhow::Result; +use async_trait::async_trait; + +#[cfg(not(target_arch = "wasm32"))] +pub trait ScratchSendSync: Send + Sync {} + +#[cfg(not(target_arch = "wasm32"))] +impl ScratchSendSync for T where T: Send + Sync {} + +#[cfg(target_arch = "wasm32")] +pub trait ScratchSendSync {} + +#[cfg(target_arch = "wasm32")] +impl ScratchSendSync for T {} + +/// [Scratch] is a general trait for a storage provider to provide +/// a temporary [KeyValueStore] that does not persist after dropping. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait Scratch: ScratchSendSync { + type ScratchStore: KeyValueStore; + + async fn get_scratch_store(&self) -> Result; +} diff --git a/rust/noosphere-storage/src/storage.rs b/rust/noosphere-storage/src/storage.rs index 4c104e090..4b54c7a5c 100644 --- a/rust/noosphere-storage/src/storage.rs +++ b/rust/noosphere-storage/src/storage.rs @@ -1,5 +1,6 @@ use crate::block::BlockStore; use crate::key_value::KeyValueStore; +use crate::Scratch; use anyhow::Result; use async_trait::async_trait; use std::fmt::Debug; @@ -24,7 +25,7 @@ impl StorageSendSync for T {} /// other Noosphere constructs. #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -pub trait Storage: Clone + StorageSendSync + Debug { +pub trait Storage: Scratch + Clone + StorageSendSync + Debug { type BlockStore: BlockStore; type KeyValueStore: KeyValueStore; @@ -34,5 +35,5 @@ pub trait Storage: Clone + StorageSendSync + Debug { /// Get a [KeyValueStore] where all values stored in it are scoped to the /// given name - async fn get_key_value_store(&self, name: &str) -> Result; + async fn get_key_value_store(&self, name: &str) -> Result<::KeyValueStore>; } diff --git a/rust/noosphere/Cargo.toml b/rust/noosphere/Cargo.toml index 7345598b5..fba3f9072 100644 --- a/rust/noosphere/Cargo.toml +++ b/rust/noosphere/Cargo.toml @@ -36,7 +36,7 @@ tokio-stream = "~0.1" tokio-util = { version = "~0.7", features = ["io"] } libipld-core = { workspace = true } libipld-cbor = { workspace = true } -bytes = "^1" +bytes = { workspace = true } noosphere-core = { version = "0.15.1", path = "../noosphere-core" } noosphere-sphere = { version = "0.10.1", path = "../noosphere-sphere" }