diff --git a/Cargo.lock b/Cargo.lock index 090e772d7..dc3e989de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1649,6 +1649,11 @@ name = "fastcdc" version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a71061d097bfa9a5a4d2efdec57990d9a88745020b365191d37e48541a1628f2" +dependencies = [ + "async-stream", + "tokio", + "tokio-stream", +] [[package]] name = "fastrand" @@ -3592,6 +3597,7 @@ dependencies = [ "async-trait", "base64 0.21.2", "byteorder", + "bytes", "cid", "console_error_panic_hook", "ed25519-zebra", @@ -3610,6 +3616,7 @@ dependencies = [ "serde_json", "strum 0.25.0", "strum_macros", + "tempfile", "thiserror", "tiny-bip39", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 034b50867..30bc0b158 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ resolver = "2" [workspace.dependencies] anyhow = { version = "1" } axum = { version = "^0.6.18" } +bytes = { version = "^1" } cid = { version = "0.10" } directories = { version = "5" } fastcdc = { version = "3.1" } diff --git a/rust/noosphere-cli/src/native/content.rs b/rust/noosphere-cli/src/native/content.rs index ab5377546..ff5dc0706 100644 --- a/rust/noosphere-cli/src/native/content.rs +++ b/rust/noosphere-cli/src/native/content.rs @@ -144,7 +144,7 @@ impl Content { }; let file_bytes = fs::read(path).await?; - let body_cid = BodyChunkIpld::store_bytes(&file_bytes, store).await?; + let body_cid = BodyChunkIpld::encode(file_bytes.as_ref(), store).await?; content.matched.insert( slug, diff --git a/rust/noosphere-core/Cargo.toml b/rust/noosphere-core/Cargo.toml index a6dc7ecd8..339ffd0e3 100644 --- a/rust/noosphere-core/Cargo.toml +++ b/rust/noosphere-core/Cargo.toml @@ -23,6 +23,8 @@ sentry = ["dep:sentry-tracing"] helpers = [] [dependencies] +bytes = { workspace = true } +tempfile = { workspace = true } tracing = { workspace = true } cid = { workspace = true } url = { workspace = true } @@ -35,7 +37,7 @@ async-stream = "~0.3" async-once-cell = "~0.4" anyhow = "^1" thiserror = { workspace = true } -fastcdc = { workspace = true } +fastcdc = { workspace = true, features = ["tokio"] } futures = "~0.3" serde = { workspace = true } serde_json = { workspace = true } diff --git a/rust/noosphere-core/src/data/body_chunk.rs b/rust/noosphere-core/src/data/body_chunk.rs index 66d935164..2bbe622be 100644 --- a/rust/noosphere-core/src/data/body_chunk.rs +++ b/rust/noosphere-core/src/data/body_chunk.rs @@ -1,12 +1,18 @@ use anyhow::{anyhow, Result}; +use async_stream::try_stream; +use bytes::Bytes; use cid::Cid; -use fastcdc::v2020::FastCDC; +use fastcdc::v2020::{AsyncStreamCDC, FastCDC}; use libipld_cbor::DagCborCodec; -use serde::{Deserialize, Serialize}; - use noosphere_storage::BlockStore; +use serde::{Deserialize, Serialize}; +use tokio::io::AsyncRead; +use tokio_stream::{Stream, StreamExt}; pub const BODY_CHUNK_MAX_SIZE: u32 = 1024 * 1024; // ~1mb/chunk worst case, ~.5mb/chunk average case +/// Encoding content larger than `CONTENT_STORAGE_MEMORY_LIMIT` will +/// use disk-storage rather than memory storage. +pub const CONTENT_STORAGE_MEMORY_LIMIT: u64 = 1024 * 1024 * 5; // 5mb /// A body chunk is a simplified flexible byte layout used for linking /// chunks of bytes. This is necessary to support cases when body contents @@ -21,7 +27,7 @@ pub struct BodyChunkIpld { } impl BodyChunkIpld { - // TODO(#498): Re-write to address potentially unbounded memory overhead + #[deprecated] pub async fn store_bytes(bytes: &[u8], store: &mut S) -> Result { let chunks = FastCDC::new( bytes, @@ -56,7 +62,7 @@ impl BodyChunkIpld { next_chunk_cid.ok_or_else(|| anyhow!("No CID; did you try to store zero bytes?")) } - // TODO(#498): Re-write to address potentially unbounded memory overhead + #[deprecated] pub async fn load_all_bytes(&self, store: &S) -> Result> { let mut all_bytes = self.bytes.clone(); let mut next_cid = self.next; @@ -70,4 +76,343 @@ impl BodyChunkIpld { Ok(all_bytes) } + + /// Encode `content` as a [BodyChunkIpld] chain in streaming fashion. + pub async fn encode(content: R, store: &S) -> Result + where + R: AsyncRead + Unpin, + S: BlockStore, + { + let mut chunker = AsyncStreamCDC::new( + content, + fastcdc::v2020::MINIMUM_MIN, + BODY_CHUNK_MAX_SIZE / 2, + BODY_CHUNK_MAX_SIZE, + ); + let stream = chunker.as_stream(); + tokio::pin!(stream); + + let mut encoder = BodyChunkEncoder::new(store.to_owned()); + while let Some(chunk) = stream.try_next().await? { + encoder.push(chunk.data).await?; + } + encoder.encode().await + } + + /// Decode a [BodyChunkIpld] chain via [Cid] into a [Bytes] stream. + pub fn decode( + cid: &Cid, + store: &S, + ) -> impl Stream> + Unpin { + let mut next = Some(*cid); + let store = store.clone(); + Box::pin(try_stream! { + while let Some(cid) = next { + debug!("Unpacking block {}...", cid); + let chunk = store.load::(&cid).await.map_err(|error| { + std::io::Error::new(std::io::ErrorKind::UnexpectedEof, error.to_string()) + })?; + yield Bytes::from(chunk.bytes); + next = chunk.next; + } + }) + } +} + +/// `BodyChunkEncoder` is responsible for collecting chunks from `fastcdc`, +/// and encoding as a sequence of [BodyChunkIpld]'s linked together. +/// As the chunker runs from the start of the buffer, and an immutable linked +/// list requires the [Cid] of the next item, we need to construct the linked +/// list in reverse. +/// +/// The encoder takes two passes: one to collect the chunks, +/// and then constructs the linked list in reverse. To prevent unbounded memory usage, +/// chunks greater than some threshold will be written to disk. +struct BodyChunkEncoder { + store: S, + data: BlobVec, +} + +impl BodyChunkEncoder { + pub fn new(store: S) -> Self { + BodyChunkEncoder { + store, + data: BlobVec::new(CONTENT_STORAGE_MEMORY_LIMIT), + } + } + + /// Push a new chunk to be encoded. + pub async fn push(&mut self, chunk: Vec) -> Result<()> { + self.data.push(chunk).await?; + Ok(()) + } + + /// Consumes this [BodyChunkEncoder], encoding into the store, returning + /// the root [Cid] upon success. + pub async fn encode(mut self) -> Result { + if self.data.len() == &0 { + return Err(anyhow!("Could not encode empty buffer.")); + } + + let mut next_chunk_cid = None; + let chunk_stream = self.data.into_stream_rev().await; + tokio::pin!(chunk_stream); + + while let Some(chunk) = chunk_stream.try_next().await? { + next_chunk_cid = Some( + self.store + .save::(&BodyChunkIpld { + bytes: chunk, + next: next_chunk_cid, + }) + .await?, + ); + } + + match next_chunk_cid { + Some(cid) => Ok(cid), + None => Err(anyhow!("Could not encode empty buffer.")), + } + } +} + +/// Container for short-term storage of sequenced blobs +/// for consumption as a forward or reverse stream. +/// [BlobVec] data is backed by memory, dynamically converting +/// to disk storage if memory threshold is reached (on non-`wasm32` platforms). +/// Blobs can then be drained via `BlobVec::into_stream`. +struct BlobVec { + chunks: BlobVecData, + memory_limit: u64, +} + +impl BlobVec { + /// Creates a new [BlobVec] that stores data in memory + /// until `memory_limit` is reached. + pub fn new(memory_limit: u64) -> Self { + BlobVec { + chunks: BlobVecData::new_with_memory(), + memory_limit, + } + } + + /// Returns the number of items pushed. + pub fn len(&self) -> &usize { + self.chunks.chunk_count() + } + + /// Push a new item. + pub async fn push(&mut self, chunk: Vec) -> Result<()> { + self.chunks.push(chunk).await?; + + #[cfg(not(target_arch = "wasm32"))] + if matches!(self.chunks, BlobVecData::Memory { .. }) + && self.chunks.byte_length() > &self.memory_limit + { + self.upgrade_storage().await?; + } + + Ok(()) + } + + /// Drain all items into a stream. + #[allow(unused)] + pub async fn into_stream(self) -> impl Stream, std::io::Error>> + Unpin { + self.chunks.into_stream(false).await + } + + /// Drain all items into a stream in reverse. + pub async fn into_stream_rev( + self, + ) -> impl Stream, std::io::Error>> + Unpin { + self.chunks.into_stream(true).await + } + + #[cfg(not(target_arch = "wasm32"))] + async fn upgrade_storage(&mut self) -> Result<()> { + let mut storage = BlobVecData::new_with_disk()?; + std::mem::swap(&mut self.chunks, &mut storage); + let previous_stream = storage.into_stream(false).await; + tokio::pin!(previous_stream); + while let Some(chunk) = previous_stream.try_next().await? { + self.chunks.push(chunk).await?; + } + Ok(()) + } +} + +enum BlobVecData { + Memory { + chunks: Vec>, + chunk_count: usize, + byte_length: u64, + }, + #[cfg(not(target_arch = "wasm32"))] + Disk { + temp_dir: tempfile::TempDir, + chunk_count: usize, + byte_length: u64, + }, +} + +impl BlobVecData { + pub fn new_with_memory() -> Self { + BlobVecData::Memory { + chunks: vec![vec![]], + chunk_count: 0, + byte_length: 0, + } + } + + #[cfg(not(target_arch = "wasm32"))] + pub fn new_with_disk() -> Result { + let temp_dir = tempfile::Builder::new() + .prefix("ns-encoder-chunks") + .tempdir()?; + + Ok(BlobVecData::Disk { + temp_dir, + chunk_count: 0, + byte_length: 0, + }) + } + + #[allow(unused)] + pub fn chunk_count(&self) -> &usize { + match self { + BlobVecData::Memory { chunk_count, .. } => chunk_count, + #[cfg(not(target_arch = "wasm32"))] + BlobVecData::Disk { chunk_count, .. } => chunk_count, + } + } + + pub fn byte_length(&self) -> &u64 { + match self { + BlobVecData::Memory { byte_length, .. } => byte_length, + #[cfg(not(target_arch = "wasm32"))] + BlobVecData::Disk { byte_length, .. } => byte_length, + } + } + + pub async fn push(&mut self, chunk: Vec) -> Result<()> { + match self { + BlobVecData::Memory { + chunks, + ref mut chunk_count, + ref mut byte_length, + } => { + *byte_length += chunk.len() as u64; + chunks.push(chunk); + *chunk_count += 1; + } + #[cfg(not(target_arch = "wasm32"))] + BlobVecData::Disk { + temp_dir, + ref mut chunk_count, + ref mut byte_length, + } => { + *byte_length += chunk.len() as u64; + let path = temp_dir.path().join(format!("{}", chunk_count)); + tokio::fs::write(path, chunk).await?; + *chunk_count += 1; + } + } + Ok(()) + } + + pub async fn into_stream( + self, + is_reversed: bool, + ) -> impl Stream, std::io::Error>> + Unpin { + Box::pin(try_stream! { + match self { + BlobVecData::Memory { + chunks, + .. + } => { + if is_reversed { + let iterator = chunks.into_iter().rev(); + for chunk in iterator { + yield chunk; + } + } else { + let iterator = chunks.into_iter(); + for chunk in iterator { + yield chunk; + } + } + }, + #[cfg(not(target_arch = "wasm32"))] + BlobVecData::Disk { + temp_dir, + chunk_count, + .. + } => { + let temp_dir_path = temp_dir.path(); + for n in 0..chunk_count { + let index = if is_reversed { + chunk_count - n - 1 + } else { + n + }; + + let path = temp_dir_path.join(format!("{}", index)); + let chunk = tokio::fs::read(path).await?; + yield chunk; + } + } + } + }) + } +} + +#[cfg(test)] +mod tests { + use noosphere_storage::MemoryStore; + use tokio_stream::StreamExt; + + use super::*; + + #[tokio::test] + async fn it_reads_and_writes_chunks_nonstreaming() -> Result<()> { + let mut store = MemoryStore::default(); + let chunk1 = vec![1; BODY_CHUNK_MAX_SIZE.try_into().unwrap()]; + let chunk2 = vec![2; BODY_CHUNK_MAX_SIZE.try_into().unwrap()]; + let chunk3 = vec![3; >::try_into(BODY_CHUNK_MAX_SIZE).unwrap() / 2]; + let bytes = [chunk1.clone(), chunk2.clone(), chunk3.clone()].concat(); + + #[allow(deprecated)] + let cid = BodyChunkIpld::store_bytes(&bytes, &mut store).await?; + let ipld_chunk = store.load::(&cid).await?; + + #[allow(deprecated)] + let output = ipld_chunk.load_all_bytes(&store).await?; + assert_eq!(output, bytes); + Ok(()) + } + + #[tokio::test] + async fn it_reads_and_writes_chunks_streaming() -> Result<()> { + let store = MemoryStore::default(); + let mut chunks = vec![]; + for n in 1..10 { + let mut chunk: Vec = vec![0; BODY_CHUNK_MAX_SIZE.try_into().unwrap()]; + chunk.fill(n); + chunks.push(chunk); + } + let bytes = chunks.concat(); + assert!(bytes.len() as u64 > CONTENT_STORAGE_MEMORY_LIMIT); + + let cid = BodyChunkIpld::encode(bytes.as_ref(), &store).await?; + + let stream = BodyChunkIpld::decode(&cid, &store); + drop(store); + tokio::pin!(stream); + let mut output: Vec> = vec![]; + while let Some(chunk) = stream.try_next().await? { + output.push(chunk.into()); + } + assert_eq!(output.concat(), bytes); + Ok(()) + } } diff --git a/rust/noosphere-gateway/Cargo.toml b/rust/noosphere-gateway/Cargo.toml index bb6ad6b20..4222b5458 100644 --- a/rust/noosphere-gateway/Cargo.toml +++ b/rust/noosphere-gateway/Cargo.toml @@ -25,7 +25,7 @@ iroh-car = { workspace = true } thiserror = { workspace = true } strum = "0.25" strum_macros = "0.25" -bytes = "^1" +bytes = { workspace = true } tokio = { version = "^1", features = ["full"] } tokio-stream = "~0.1" diff --git a/rust/noosphere-into/Cargo.toml b/rust/noosphere-into/Cargo.toml index fb2f47aaa..eae5c93d2 100644 --- a/rust/noosphere-into/Cargo.toml +++ b/rust/noosphere-into/Cargo.toml @@ -31,7 +31,7 @@ horrorshow = "~0.8" cid = { workspace = true } libipld-cbor = { workspace = true } -bytes = "^1" +bytes = { workspace = true } tokio-stream = "~0.1" tokio-util = "~0.7" tokio = { version = "^1", features = ["io-util", "macros", "test-util"] } diff --git a/rust/noosphere-sphere/Cargo.toml b/rust/noosphere-sphere/Cargo.toml index 184ee996b..e50bdd659 100644 --- a/rust/noosphere-sphere/Cargo.toml +++ b/rust/noosphere-sphere/Cargo.toml @@ -37,7 +37,7 @@ tokio-util = { version = "0.7.7", features = ["io"] } futures-util = "0.3.27" libipld-core = { workspace = true } libipld-cbor = { workspace = true } -bytes = "^1" +bytes = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } diff --git a/rust/noosphere-sphere/src/content/decoder.rs b/rust/noosphere-sphere/src/content/decoder.rs deleted file mode 100644 index 643d8527e..000000000 --- a/rust/noosphere-sphere/src/content/decoder.rs +++ /dev/null @@ -1,29 +0,0 @@ -use async_stream::try_stream; -use bytes::Bytes; -use cid::Cid; -use libipld_cbor::DagCborCodec; -use noosphere_core::data::BodyChunkIpld; -use noosphere_storage::BlockStore; -use tokio_stream::Stream; - -/// Helper to easily decode a linked list of `BodyChunkIpld` as a byte stream -pub struct BodyChunkDecoder<'a, 'b, S: BlockStore>(pub &'a Cid, pub &'b S); - -impl<'a, 'b, S: BlockStore> BodyChunkDecoder<'a, 'b, S> { - /// Consume the [BodyChunkDecoder] and return an async [Stream] of bytes - /// representing the raw body contents - pub fn stream(self) -> impl Stream> + Unpin { - let mut next = Some(*self.0); - let store = self.1.clone(); - Box::pin(try_stream! { - while let Some(cid) = next { - debug!("Unpacking block {}...", cid); - let chunk = store.load::(&cid).await.map_err(|error| { - std::io::Error::new(std::io::ErrorKind::UnexpectedEof, error.to_string()) - })?; - yield Bytes::from(chunk.bytes); - next = chunk.next; - } - }) - } -} diff --git a/rust/noosphere-sphere/src/content/mod.rs b/rust/noosphere-sphere/src/content/mod.rs index ec6444013..18f9dbcad 100644 --- a/rust/noosphere-sphere/src/content/mod.rs +++ b/rust/noosphere-sphere/src/content/mod.rs @@ -2,12 +2,10 @@ //! with a public "slug", that is addressable by them or others who have replicated the sphere //! data. -mod decoder; mod file; mod read; mod write; -pub use decoder::*; pub use file::*; pub use read::*; pub use write::*; diff --git a/rust/noosphere-sphere/src/content/write.rs b/rust/noosphere-sphere/src/content/write.rs index 2904d21e9..df2052414 100644 --- a/rust/noosphere-sphere/src/content/write.rs +++ b/rust/noosphere-sphere/src/content/write.rs @@ -156,7 +156,8 @@ where // amount to byte streams, but in point of fact we can support anything // that may be referenced by CID including arbitrary IPLD structures let body_cid = - BodyChunkIpld::store_bytes(&bytes, self.sphere_context_mut().await?.db_mut()).await?; + BodyChunkIpld::encode(bytes.as_ref(), self.sphere_context_mut().await?.db_mut()) + .await?; self.link(slug, content_type, &body_cid, additional_headers) .await diff --git a/rust/noosphere-sphere/src/internal.rs b/rust/noosphere-sphere/src/internal.rs index d6913ed17..7f507eedf 100644 --- a/rust/noosphere-sphere/src/internal.rs +++ b/rust/noosphere-sphere/src/internal.rs @@ -1,4 +1,4 @@ -use super::{BodyChunkDecoder, SphereFile}; +use super::SphereFile; use crate::{AsyncFileBody, HasSphereContext}; use anyhow::{anyhow, Result}; use async_trait::async_trait; @@ -9,7 +9,7 @@ use tokio_util::io::StreamReader; use cid::Cid; use noosphere_core::{ authority::Access, - data::{ContentType, Header, Link, MemoIpld}, + data::{BodyChunkIpld, ContentType, Header, Link, MemoIpld}, }; /// A module-private trait for internal trait methods; this is a workaround for @@ -86,7 +86,7 @@ where let stream = match content_type { // TODO(#86): Content-type aware decoding of body bytes - Some(_) => BodyChunkDecoder(&memo.body, &db).stream(), + Some(_) => BodyChunkIpld::decode(&memo.body, &db), None => return Err(anyhow!("No content type specified")), }; diff --git a/rust/noosphere-sphere/src/replication/stream.rs b/rust/noosphere-sphere/src/replication/stream.rs index 6150afbb7..f4e8fba81 100644 --- a/rust/noosphere-sphere/src/replication/stream.rs +++ b/rust/noosphere-sphere/src/replication/stream.rs @@ -6,7 +6,7 @@ use cid::Cid; use libipld_cbor::DagCborCodec; use noosphere_core::{ authority::collect_ucan_proofs, - data::{ContentType, Link, MemoIpld, SphereIpld}, + data::{BodyChunkIpld, ContentType, Link, MemoIpld, SphereIpld}, view::Sphere, }; use noosphere_storage::{BlockStore, BlockStoreTap, UcanStore}; @@ -17,7 +17,6 @@ use ucan::{store::UcanJwtStore, Ucan}; use crate::{ walk_versioned_map_changes_and, walk_versioned_map_elements, walk_versioned_map_elements_and, - BodyChunkDecoder, }; /// Stream all the blocks required to reconstruct the history of a sphere since a @@ -231,7 +230,7 @@ where revocations_result??; } Some(_) => { - let stream = BodyChunkDecoder(&memo.body, &store).stream(); + let stream = BodyChunkIpld::decode(&memo.body, &store); drop(store); @@ -275,8 +274,8 @@ mod tests { use crate::{ car_stream, helpers::{simulated_sphere_context, touch_all_sphere_blocks, SimulationAccess}, - memo_body_stream, memo_history_stream, BodyChunkDecoder, HasMutableSphereContext, - HasSphereContext, SphereContentWrite, SpherePetnameWrite, + memo_body_stream, memo_history_stream, HasMutableSphereContext, HasSphereContext, + SphereContentWrite, SpherePetnameWrite, }; #[cfg(target_arch = "wasm32")] @@ -538,7 +537,7 @@ mod tests { let memo = store.load::(&content_cid).await?; let mut buffer = Vec::new(); - let body_stream = BodyChunkDecoder(&memo.body, &store).stream(); + let body_stream = BodyChunkIpld::decode(&memo.body, &store); tokio::pin!(body_stream); diff --git a/rust/noosphere/Cargo.toml b/rust/noosphere/Cargo.toml index 7345598b5..fba3f9072 100644 --- a/rust/noosphere/Cargo.toml +++ b/rust/noosphere/Cargo.toml @@ -36,7 +36,7 @@ tokio-stream = "~0.1" tokio-util = { version = "~0.7", features = ["io"] } libipld-core = { workspace = true } libipld-cbor = { workspace = true } -bytes = "^1" +bytes = { workspace = true } noosphere-core = { version = "0.15.1", path = "../noosphere-core" } noosphere-sphere = { version = "0.10.1", path = "../noosphere-sphere" }