From eeab932763cd642702bc6ac85a6bbc10968a107d Mon Sep 17 00:00:00 2001 From: Christopher Joel <240083+cdata@users.noreply.github.com> Date: Thu, 5 Oct 2023 22:05:27 -0500 Subject: [PATCH] feat: Improved IPFS Kubo syndication (#666) --- Cargo.lock | 88 +----- Cargo.toml | 3 + .../src/native/commands/sphere/auth.rs | 8 +- .../src/native/commands/sphere/history.rs | 2 +- rust/noosphere-cli/src/native/render/job.rs | 8 +- rust/noosphere-collections/Cargo.toml | 2 +- rust/noosphere-common/src/channel.rs | 5 +- rust/noosphere-core/Cargo.toml | 2 +- rust/noosphere-core/src/api/client.rs | 2 +- .../src/context/content/read.rs | 2 +- rust/noosphere-core/src/context/cursor.rs | 12 +- .../src/context/sync/strategy.rs | 13 +- rust/noosphere-core/src/data/bundle.rs | 2 +- rust/noosphere-core/src/data/link.rs | 2 + rust/noosphere-core/src/data/memo.rs | 2 +- rust/noosphere-core/src/helpers/context.rs | 9 +- rust/noosphere-core/src/stream/memo.rs | 50 ++- rust/noosphere-core/src/stream/mod.rs | 291 +++++++++++------- rust/noosphere-core/src/view/sphere.rs | 22 +- rust/noosphere-core/src/view/timeline.rs | 18 +- rust/noosphere-gateway/Cargo.toml | 6 +- .../src/handlers/v0alpha1/push.rs | 6 +- .../src/handlers/v0alpha1/replicate.rs | 2 +- .../src/handlers/v0alpha2/push.rs | 8 +- rust/noosphere-gateway/src/worker/cleanup.rs | 2 +- .../src/worker/syndication.rs | 287 +++++++++++------ .../src/transform/sphere/html.rs | 4 +- rust/noosphere-ipfs/Cargo.toml | 2 + rust/noosphere-ipfs/examples/car.rs | 103 +++++++ rust/noosphere-ipfs/src/client/kubo.rs | 24 +- rust/noosphere-storage/examples/bench/main.rs | 2 +- .../examples/bench/performance.rs | 2 +- 32 files changed, 621 insertions(+), 370 deletions(-) create mode 100644 rust/noosphere-ipfs/examples/car.rs diff --git a/Cargo.lock b/Cargo.lock index f0a0a69fe..306cd1a4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -583,19 +583,6 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" -[[package]] -name = "bitvec" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" -dependencies = [ - "funty", - "radium", - "serde", - "tap", - "wyz", -] - [[package]] name = "blake2" version = "0.10.6" @@ -1320,21 +1307,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "deterministic-bloom" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12a3873e91e360aee2403cbafd2beb42f02ace06da9b053574518f003aa2490d" -dependencies = [ - "bitvec", - "miette", - "rand_core 0.6.4", - "serde", - "thiserror", - "tracing", - "xxhash-rust", -] - [[package]] name = "digest" version = "0.9.0" @@ -1692,12 +1664,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "funty" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" - [[package]] name = "futf" version = "0.1.5" @@ -3182,29 +3148,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "miette" -version = "5.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e" -dependencies = [ - "miette-derive", - "once_cell", - "thiserror", - "unicode-width", -] - -[[package]] -name = "miette-derive" -version = "5.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", -] - [[package]] name = "mime" version = "0.3.17" @@ -3630,7 +3573,6 @@ dependencies = [ "axum", "bytes", "cid", - "deterministic-bloom", "iroh-car", "libipld-cbor", "libipld-core", @@ -3648,6 +3590,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "tower", "tower-http", "tracing", @@ -3702,6 +3645,8 @@ dependencies = [ "iroh-car", "libipld-cbor", "libipld-core", + "libipld-json", + "multihash 0.18.1", "noosphere-common", "noosphere-core", "noosphere-storage", @@ -4379,12 +4324,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "radium" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" - [[package]] name = "rand" version = "0.8.5" @@ -5443,12 +5382,6 @@ dependencies = [ "libc", ] -[[package]] -name = "tap" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" - [[package]] name = "tempfile" version = "3.8.0" @@ -6718,15 +6651,6 @@ dependencies = [ "rand", ] -[[package]] -name = "wyz" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" -dependencies = [ - "tap", -] - [[package]] name = "x25519-dalek" version = "1.1.1" @@ -6787,12 +6711,6 @@ dependencies = [ "time", ] -[[package]] -name = "xxhash-rust" -version = "0.8.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9828b178da53440fa9c766a3d2f73f7cf5d0ac1fe3980c1e5018d899fd19e07b" - [[package]] name = "yamux" version = "0.10.2" diff --git a/Cargo.toml b/Cargo.toml index ba7185d16..77b0a4cb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ resolver = "2" [workspace.dependencies] anyhow = { version = "1" } +async-recursion = { version = "1" } async-stream = { version = "0.3" } axum = { version = "^0.6.18" } bytes = { version = "^1" } @@ -34,6 +35,8 @@ js-sys = { version = "^0.3" } libipld = { version = "0.16" } libipld-core = { version = "0.16" } libipld-cbor = { version = "0.16" } +libipld-json = { version = "0.16" } +multihash = { version = "0.18" } pathdiff = { version = "0.2.1" } rand = { version = "0.8" } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] } diff --git a/rust/noosphere-cli/src/native/commands/sphere/auth.rs b/rust/noosphere-cli/src/native/commands/sphere/auth.rs index 977398f52..9343fdc82 100644 --- a/rust/noosphere-cli/src/native/commands/sphere/auth.rs +++ b/rust/noosphere-cli/src/native/commands/sphere/auth.rs @@ -139,7 +139,7 @@ pub async fn auth_list(tree: bool, as_json: bool, workspace: &Workspace) -> Resu while let Some((name, identity, link)) = authorization_stream.try_next().await? { let jwt = Jwt(db.require_token(&link).await?); max_name_length = max_name_length.max(name.len()); - authorization_meta.insert(link.clone(), (name, identity, jwt)); + authorization_meta.insert(link, (name, identity, jwt)); } if tree { @@ -162,13 +162,13 @@ pub async fn auth_list(tree: bool, as_json: bool, workspace: &Workspace) -> Resu if *ucan.issuer() == sphere_identity { // TODO(#554): Such an authorization ought not have any topical proofs, // but perhaps we should verify that - authorization_roots.push(link.clone()) + authorization_roots.push(*link) } else { for proof in proofs { let items = match authorization_hierarchy.get_mut(&proof) { Some(items) => items, None => { - authorization_hierarchy.insert(proof.clone(), Vec::new()); + authorization_hierarchy.insert(proof, Vec::new()); authorization_hierarchy.get_mut(&proof).ok_or_else(|| { anyhow!( "Could not access list of child authorizations for {}", @@ -177,7 +177,7 @@ pub async fn auth_list(tree: bool, as_json: bool, workspace: &Workspace) -> Resu })? } }; - items.push(link.clone()); + items.push(*link); } } } diff --git a/rust/noosphere-cli/src/native/commands/sphere/history.rs b/rust/noosphere-cli/src/native/commands/sphere/history.rs index a8237f3fd..c49be54de 100644 --- a/rust/noosphere-cli/src/native/commands/sphere/history.rs +++ b/rust/noosphere-cli/src/native/commands/sphere/history.rs @@ -10,7 +10,7 @@ use crate::workspace::Workspace; pub async fn history(workspace: &Workspace) -> Result<()> { let sphere_context = workspace.sphere_context().await?; let sphere = sphere_context.to_sphere().await?; - let latest_version = sphere.cid().clone(); + let latest_version = *sphere.cid(); let db = sphere.store().clone(); let history_stream = sphere.into_history_stream(None); diff --git a/rust/noosphere-cli/src/native/render/job.rs b/rust/noosphere-cli/src/native/render/job.rs index b6cce6d59..2eafc748f 100644 --- a/rust/noosphere-cli/src/native/render/job.rs +++ b/rust/noosphere-cli/src/native/render/job.rs @@ -212,7 +212,7 @@ where // Create a symlink to each peer (they will be rendered later, if // they haven't been already) - petname_change_buffer.add(name.clone(), (did.clone(), cid.clone().into()))?; + petname_change_buffer.add(name.clone(), (did.clone(), cid.into()))?; if petname_change_buffer.is_full() { petname_change_buffer.flush_to_writer(&self.writer).await?; @@ -278,10 +278,8 @@ where Some(identity) => match cursor.get_petname_record(&petname).await? { Some(link_record) => { if let Some(version) = link_record.get_link() { - petname_change_buffer.add( - petname.clone(), - (identity.clone(), Cid::from(version.clone())), - )?; + petname_change_buffer + .add(petname.clone(), (identity.clone(), Cid::from(version)))?; let mut petname_path = self.petname_path.clone(); petname_path.push(petname); diff --git a/rust/noosphere-collections/Cargo.toml b/rust/noosphere-collections/Cargo.toml index 51286cf9a..d835aa364 100644 --- a/rust/noosphere-collections/Cargo.toml +++ b/rust/noosphere-collections/Cargo.toml @@ -24,7 +24,7 @@ forest_hash_utils = "0.1.0" serde = { workspace = true } serde_bytes = "0.11" byteorder = "^1.4" -async-recursion = "^1" +async-recursion = { workspace = true } libipld-core = { workspace = true } libipld-cbor = { workspace = true } noosphere-storage = { version = "0.9.0", path = "../noosphere-storage" } diff --git a/rust/noosphere-common/src/channel.rs b/rust/noosphere-common/src/channel.rs index 3b4295922..7920ceaa7 100644 --- a/rust/noosphere-common/src/channel.rs +++ b/rust/noosphere-common/src/channel.rs @@ -198,10 +198,7 @@ mod tests { }); let res = client.send(Request::Ping()).await?; - assert!(match res { - Ok(Response::Pong()) => true, - _ => false, - }); + matches!(res, Ok(Response::Pong())); for n in 0..10 { client.send_oneshot(Request::SetFlag(n))?; diff --git a/rust/noosphere-core/Cargo.toml b/rust/noosphere-core/Cargo.toml index 53adb83a5..4ddb20827 100644 --- a/rust/noosphere-core/Cargo.toml +++ b/rust/noosphere-core/Cargo.toml @@ -28,7 +28,7 @@ tracing = { workspace = true } cid = { workspace = true } url = { workspace = true, features = ["serde"] } async-trait = "~0.1" -async-recursion = "^1" +async-recursion = { workspace = true } async-stream = { workspace = true } # NOTE: async-once-cell 0.4.0 shipped unstable feature usage diff --git a/rust/noosphere-core/src/api/client.rs b/rust/noosphere-core/src/api/client.rs index 3dc6c0e00..fdf208d0c 100644 --- a/rust/noosphere-core/src/api/client.rs +++ b/rust/noosphere-core/src/api/client.rs @@ -325,7 +325,7 @@ where store: S, push_body: v0alpha2::PushBody, ) -> impl Stream> + ConditionalSync + 'static { - let root = push_body.local_tip.clone().into(); + let root = push_body.local_tip.into(); trace!("Creating push stream..."); let block_stream = try_stream! { diff --git a/rust/noosphere-core/src/context/content/read.rs b/rust/noosphere-core/src/context/content/read.rs index 3a4b72026..97041ceac 100644 --- a/rust/noosphere-core/src/context/content/read.rs +++ b/rust/noosphere-core/src/context/content/read.rs @@ -42,7 +42,7 @@ where let hamt = links.get_hamt().await?; Ok(match hamt.get(&slug.to_string()).await? { - Some(memo) => Some(self.get_file(&revision, memo.clone()).await?), + Some(memo) => Some(self.get_file(&revision, *memo).await?), None => None, }) } diff --git a/rust/noosphere-core/src/context/cursor.rs b/rust/noosphere-core/src/context/cursor.rs index 9e6612e3c..50ac447bd 100644 --- a/rust/noosphere-core/src/context/cursor.rs +++ b/rust/noosphere-core/src/context/cursor.rs @@ -46,7 +46,7 @@ where SphereCursor { has_sphere_context, storage: PhantomData, - sphere_version: Some(sphere_version.clone()), + sphere_version: Some(*sphere_version), } } @@ -66,7 +66,7 @@ where /// version it is mounted to even when the latest version of the sphere /// changes. pub async fn mount_at(&mut self, sphere_version: &Link) -> Result<&Self> { - self.sphere_version = Some(sphere_version.clone()); + self.sphere_version = Some(*sphere_version); Ok(self) } @@ -112,7 +112,7 @@ where match sphere.get_parent().await? { Some(parent) => { - self.sphere_version = Some(parent.cid().clone()); + self.sphere_version = Some(*parent.cid()); Ok(self.sphere_version.as_ref()) } None => Ok(None), @@ -140,7 +140,7 @@ where let new_version = self.has_sphere_context.save(additional_headers).await?; if self.sphere_version.is_some() { - self.sphere_version = Some(new_version.clone()); + self.sphere_version = Some(new_version); } Ok(new_version) @@ -162,7 +162,7 @@ where async fn version(&self) -> Result> { match &self.sphere_version { - Some(sphere_version) => Ok(sphere_version.clone()), + Some(sphere_version) => Ok(*sphere_version), None => self.has_sphere_context.version().await, } } @@ -191,7 +191,7 @@ where async move { let replicate_parameters = since.as_ref().map(|since| ReplicateParameters { - since: Some(since.clone()), + since: Some(*since), }); let (db, client) = { let sphere_context = cursor.sphere_context().await?; diff --git a/rust/noosphere-core/src/context/sync/strategy.rs b/rust/noosphere-core/src/context/sync/strategy.rs index e50502b52..48452db91 100644 --- a/rust/noosphere-core/src/context/sync/strategy.rs +++ b/rust/noosphere-core/src/context/sync/strategy.rs @@ -191,9 +191,8 @@ where .into(); return Ok(( local_sphere_tip, - counterpart_sphere_base - .ok_or_else(|| anyhow!("Counterpart sphere history is missing!"))? - .clone(), + *counterpart_sphere_base + .ok_or_else(|| anyhow!("Counterpart sphere history is missing!"))?, updated_names, )); } @@ -272,12 +271,12 @@ where ) .await?; - new_base.clone() + new_base } // No new history at all (Some(current_tip), _, _) => { info!("Nothing to sync!"); - current_tip.clone() + *current_tip } // We should have local history but we don't! _ => { @@ -409,8 +408,8 @@ where .push(&PushBody { sphere: local_sphere_identity.clone(), local_base: local_sphere_base, - local_tip: local_sphere_tip.clone(), - counterpart_tip: Some(counterpart_sphere_tip.clone()), + local_tip: *local_sphere_tip, + counterpart_tip: Some(*counterpart_sphere_tip), name_record: Some(name_record), }) .await?; diff --git a/rust/noosphere-core/src/data/bundle.rs b/rust/noosphere-core/src/data/bundle.rs index 7d35963d4..0313012f9 100644 --- a/rust/noosphere-core/src/data/bundle.rs +++ b/rust/noosphere-core/src/data/bundle.rs @@ -699,7 +699,7 @@ mod tests { let (sphere, ucan, _) = Sphere::generate(&owner_did, &mut store).await.unwrap(); - let original_cid = sphere.cid().clone(); + let original_cid = *sphere.cid(); let foo_key = String::from("foo"); let foo_memo = MemoIpld::for_body(&mut store, b"foo").await.unwrap(); diff --git a/rust/noosphere-core/src/data/link.rs b/rust/noosphere-core/src/data/link.rs index 94c2c13c3..a94dec4de 100644 --- a/rust/noosphere-core/src/data/link.rs +++ b/rust/noosphere-core/src/data/link.rs @@ -39,6 +39,8 @@ where linked_type: PhantomData, } +impl Copy for Link where T: Clone {} + impl Debug for Link where T: Clone, diff --git a/rust/noosphere-core/src/data/memo.rs b/rust/noosphere-core/src/data/memo.rs index 975021799..1ce361e9b 100644 --- a/rust/noosphere-core/src/data/memo.rs +++ b/rust/noosphere-core/src/data/memo.rs @@ -120,7 +120,7 @@ impl MemoIpld { pub async fn branch_from(cid: &Link, store: &S) -> Result { match store.load::(cid).await { Ok(mut memo) => { - memo.parent = Some(cid.clone()); + memo.parent = Some(*cid); memo.remove_header(&Header::Signature); memo.remove_header(&Header::Proof); diff --git a/rust/noosphere-core/src/helpers/context.rs b/rust/noosphere-core/src/helpers/context.rs index 6d2bd8734..398b420b4 100644 --- a/rust/noosphere-core/src/helpers/context.rs +++ b/rust/noosphere-core/src/helpers/context.rs @@ -21,6 +21,10 @@ use crate::{ stream::{walk_versioned_map_elements, walk_versioned_map_elements_and}, }; +/// An alias for the [HasMutableSphereContext] type returned by [simulated_sphere_context] +pub type SimulatedHasMutableSphereContext = + Arc>>>; + /// Create a temporary, non-persisted [SphereContext] that tracks usage /// internally. This is intended for use in docs and tests, and should otherwise /// be ignored. When creating the simulated [SphereContext], you can pass an @@ -29,10 +33,7 @@ use crate::{ pub async fn simulated_sphere_context( profile: Access, db: Option>>, -) -> Result<( - Arc>>>, - Mnemonic, -)> { +) -> Result<(SimulatedHasMutableSphereContext, Mnemonic)> { let db = match db { Some(db) => db, None => { diff --git a/rust/noosphere-core/src/stream/memo.rs b/rust/noosphere-core/src/stream/memo.rs index 3da8482f5..3471c9666 100644 --- a/rust/noosphere-core/src/stream/memo.rs +++ b/rust/noosphere-core/src/stream/memo.rs @@ -6,6 +6,7 @@ use crate::{ view::Sphere, }; use anyhow::{anyhow, Result}; +use async_recursion::async_recursion; use async_stream::try_stream; use cid::Cid; use libipld_cbor::DagCborCodec; @@ -36,7 +37,7 @@ where { debug!("Streaming history via memo..."); - let latest = latest.clone(); + let latest = *latest; let since = since.cloned(); try_stream! { @@ -122,11 +123,10 @@ where if replicate_content { debug!("Replicating content..."); let content = sphere.get_content().await?; - let include_content = include_content; tasks.spawn(walk_versioned_map_changes_and(content, store.clone(), move |_, link, store| async move { if include_content { - walk_memo_body(store, &link).await?; + walk_memo_body(store, &link, include_content).await?; } else { link.load_from(&store).await?; }; @@ -190,7 +190,8 @@ where #[instrument(level = "trace", skip(store))] pub fn memo_body_stream( store: S, - memo_version: &Cid, + memo_version: &Link, + include_content: bool, ) -> impl Stream)>> + ConditionalSend where S: BlockStore + 'static, @@ -204,7 +205,7 @@ where let mut receiver_is_open = true; let mut walk_memo_finished = false; - let mut walk_memo_finishes = Box::pin(walk_memo_body(store, &memo_version)); + let mut walk_memo_finishes = Box::pin(walk_memo_body(store, &memo_version, include_content)); while receiver_is_open { select! { @@ -228,7 +229,13 @@ where #[allow(clippy::let_with_type_underscore)] #[instrument(level = "trace", skip(store))] -async fn walk_memo_body(store: S, memo_version: &Cid) -> Result<()> +#[cfg_attr(target_arch="wasm32", async_recursion(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_recursion)] +async fn walk_memo_body( + store: S, + memo_version: &Link, + include_content: bool, +) -> Result<()> where S: BlockStore + 'static, { @@ -256,15 +263,42 @@ where Ok(()) }, )); + tasks.spawn(walk_versioned_map_elements_and( content, store.clone(), move |_, link, store| async move { - link.load_from(&store).await?; + if include_content { + walk_memo_body(store, &link, true).await?; + } else { + link.load_from(&store).await?; + } + Ok(()) }, )); - tasks.spawn(walk_versioned_map_elements(delegations)); + + tasks.spawn(async move { + walk_versioned_map_elements_and( + delegations, + store, + |_, delegation, store| async move { + let ucan_store = UcanStore(store); + + collect_ucan_proofs( + &Ucan::from_str(&ucan_store.require_token(&delegation.jwt).await?)?, + &ucan_store, + ) + .await?; + + Ok(()) + }, + ) + .await?; + + Ok(()) as Result<_, anyhow::Error> + }); + tasks.spawn(walk_versioned_map_elements(revocations)); tasks.join().await?; diff --git a/rust/noosphere-core/src/stream/mod.rs b/rust/noosphere-core/src/stream/mod.rs index e8f16676f..d5cd5e0c4 100644 --- a/rust/noosphere-core/src/stream/mod.rs +++ b/rust/noosphere-core/src/stream/mod.rs @@ -14,16 +14,22 @@ pub use walk::*; #[cfg(test)] mod tests { use anyhow::Result; + use cid::Cid; + use libipld_core::{codec::Codec, ipld::Ipld, raw::RawCodec}; use std::collections::BTreeSet; - use ucan::store::UcanJwtStore; + use ucan::{crypto::KeyMaterial, store::UcanJwtStore}; use crate::{ - authority::Access, + authority::{generate_ed25519_key, Access}, context::{ - HasMutableSphereContext, HasSphereContext, SphereContentWrite, SpherePetnameWrite, + HasMutableSphereContext, HasSphereContext, SphereAuthorityWrite, SphereContentRead, + SphereContentWrite, SpherePetnameWrite, + }, + data::{BodyChunkIpld, ContentType, Link, LinkRecord, MemoIpld}, + helpers::{ + make_valid_link_record, simulated_sphere_context, touch_all_sphere_blocks, + SimulatedHasMutableSphereContext, }, - data::{BodyChunkIpld, ContentType, LinkRecord, MemoIpld}, - helpers::{make_valid_link_record, simulated_sphere_context, touch_all_sphere_blocks}, stream::{from_car_stream, memo_body_stream, memo_history_stream, to_car_stream}, tracing::initialize_tracing, view::{BodyChunkDecoder, Sphere}, @@ -38,6 +44,58 @@ mod tests { #[cfg(target_arch = "wasm32")] wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + pub const SCAFFOLD_CHANGES: &[(&[&str], &[&str])] = &[ + (&["dogs", "birds"], &["alice", "bob"]), + (&["cats", "dogs"], &["gordon"]), + (&["birds"], &["cdata"]), + (&["cows", "beetles"], &["jordan", "ben"]), + ]; + + pub async fn scaffold_sphere_context_with_history( + ) -> Result<(SimulatedHasMutableSphereContext, Vec>)> { + let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?; + let mut versions = Vec::new(); + let store = sphere_context.sphere_context().await?.db().clone(); + + for (content_change, petname_change) in SCAFFOLD_CHANGES.iter() { + for slug in *content_change { + sphere_context + .write( + slug, + &ContentType::Subtext, + format!("{} are cool", slug).as_bytes(), + None, + ) + .await?; + } + + for petname in *petname_change { + let (id, record, _) = make_valid_link_record(&mut UcanStore(store.clone())).await?; + sphere_context.set_petname(petname, Some(id)).await?; + versions.push(sphere_context.save(None).await?); + sphere_context.set_petname_record(petname, &record).await?; + } + + versions.push(sphere_context.save(None).await?); + } + + let additional_device_credential = generate_ed25519_key(); + let additional_device_did = additional_device_credential.get_did().await?.into(); + let additional_device_authorization = sphere_context + .authorize("otherdevice", &additional_device_did) + .await?; + + sphere_context.save(None).await?; + + sphere_context + .revoke_authorization(&additional_device_authorization) + .await?; + + sphere_context.save(None).await?; + + Ok((sphere_context, versions)) + } + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] async fn it_includes_all_link_records_and_proofs_from_the_address_book() -> Result<()> { @@ -61,6 +119,7 @@ mod tests { let stream = memo_body_stream( sphere_context.sphere_context().await?.db().clone(), &final_version, + false, ); tokio::pin!(stream); @@ -87,35 +146,7 @@ mod tests { async fn it_can_stream_all_blocks_in_a_sphere_version() -> Result<()> { initialize_tracing(None); - let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?; - - let changes = vec![ - (vec!["dogs", "birds"], vec!["alice", "bob"]), - (vec!["cats", "dogs"], vec!["gordon"]), - (vec!["birds"], vec!["cdata"]), - (vec!["cows", "beetles"], vec!["jordan", "ben"]), - ]; - - for (content_change, petname_change) in changes.iter() { - for slug in content_change { - sphere_context - .write( - slug, - &ContentType::Subtext, - format!("{} are cool", slug).as_bytes(), - None, - ) - .await?; - } - - for petname in petname_change { - sphere_context - .set_petname(petname, Some(format!("did:key:{}", petname).into())) - .await?; - } - - sphere_context.save(None).await?; - } + let (sphere_context, _) = scaffold_sphere_context_with_history().await?; let final_version = sphere_context.version().await?; @@ -126,6 +157,7 @@ mod tests { let stream = memo_body_stream( sphere_context.sphere_context().await?.db().clone(), &final_version, + false, ); tokio::pin!(stream); @@ -145,12 +177,12 @@ mod tests { let content = sphere.get_content().await?; let identities = sphere.get_address_book().await?.get_identities().await?; - for (content_change, petname_change) in changes.iter() { - for slug in content_change { + for (content_change, petname_change) in SCAFFOLD_CHANGES.iter() { + for slug in *content_change { let _ = content.get(&slug.to_string()).await?.cloned().unwrap(); } - for petname in petname_change { + for petname in *petname_change { let _ = identities.get(&petname.to_string()).await?; } } @@ -165,45 +197,14 @@ mod tests { async fn it_can_stream_all_delta_blocks_for_a_range_of_history() -> Result<()> { initialize_tracing(None); - let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?; - - let changes = vec![ - (vec!["dogs", "birds"], vec!["alice", "bob"]), - (vec!["cats", "dogs"], vec!["gordon"]), - (vec!["birds"], vec!["cdata"]), - (vec!["cows", "beetles"], vec!["jordan", "ben"]), - ]; + let (sphere_context, versions) = scaffold_sphere_context_with_history().await?; let original_store = sphere_context.sphere_context().await?.db().clone(); - let mut versions = Vec::new(); - - for (content_change, petname_change) in changes.iter() { - for slug in content_change { - sphere_context - .write( - slug, - &ContentType::Subtext, - format!("{} are cool", slug).as_bytes(), - None, - ) - .await?; - } - - for petname in petname_change { - let (id, record, _) = - make_valid_link_record(&mut UcanStore(original_store.clone())).await?; - sphere_context.set_petname(petname, Some(id)).await?; - versions.push(sphere_context.save(None).await?); - sphere_context.set_petname_record(petname, &record).await?; - } - - versions.push(sphere_context.save(None).await?); - } let mut other_store = MemoryStore::default(); let first_version = versions.first().unwrap(); - let stream = memo_body_stream(original_store.clone(), first_version); + let stream = memo_body_stream(original_store.clone(), first_version, false); tokio::pin!(stream); @@ -275,6 +276,7 @@ mod tests { let stream = memo_body_stream( sphere_context.sphere_context().await?.db().clone(), &content_cid, + false, ); let mut store = MemoryStore::default(); @@ -306,35 +308,7 @@ mod tests { async fn it_can_stream_all_blocks_in_a_sphere_version_as_a_car() -> Result<()> { initialize_tracing(None); - let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?; - - let changes = vec![ - (vec!["dogs", "birds"], vec!["alice", "bob"]), - (vec!["cats", "dogs"], vec!["gordon"]), - (vec!["birds"], vec!["cdata"]), - (vec!["cows", "beetles"], vec!["jordan", "ben"]), - ]; - - for (content_change, petname_change) in changes.iter() { - for slug in content_change { - sphere_context - .write( - slug, - &ContentType::Subtext, - format!("{} are cool", slug).as_bytes(), - None, - ) - .await?; - } - - for petname in petname_change { - sphere_context - .set_petname(petname, Some(format!("did:key:{}", petname).into())) - .await?; - } - - sphere_context.save(None).await?; - } + let (mut sphere_context, _) = scaffold_sphere_context_with_history().await?; let mut db = sphere_context.sphere_context().await?.db().clone(); let (id, link_record, _) = make_valid_link_record(&mut db).await?; @@ -350,8 +324,8 @@ mod tests { let mut other_store = MemoryStore::default(); let stream = to_car_stream( - vec![final_version.clone().into()], - memo_body_stream(db.clone(), &final_version), + vec![final_version.into()], + memo_body_stream(db.clone(), &final_version, false), ); let block_stream = from_car_stream(stream); @@ -374,12 +348,12 @@ mod tests { let content = sphere.get_content().await?; let identities = sphere.get_address_book().await?.get_identities().await?; - for (content_change, petname_change) in changes.iter() { - for slug in content_change { + for (content_change, petname_change) in SCAFFOLD_CHANGES.iter() { + for slug in *content_change { let _ = content.get(&slug.to_string()).await?.cloned().unwrap(); } - for petname in petname_change { + for petname in *petname_change { let _ = identities.get(&petname.to_string()).await?; } } @@ -396,4 +370,113 @@ mod tests { Ok(()) } + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_only_omits_memo_parent_references_when_streaming_sphere_body_with_content( + ) -> Result<()> { + initialize_tracing(None); + + let (sphere_context, mut versions) = scaffold_sphere_context_with_history().await?; + + debug!( + "Versions: {:#?}", + versions + .iter() + .map(|cid| cid.to_string()) + .collect::>() + ); + + let store = sphere_context.lock().await.db().clone(); + let last_version = versions.pop().unwrap(); + let last_version_parent = versions.pop().unwrap(); + + let mut links_referenced = BTreeSet::new(); + let mut links_included = BTreeSet::new(); + + // The root is referenced implicitly + links_referenced.insert(*last_version); + + let stream = memo_body_stream(store.clone(), &last_version, true); + + tokio::pin!(stream); + + while let Some((cid, block)) = stream.try_next().await? { + if cid == *last_version { + // Verify that parent of root is what we expect... + let memo = store.load::(&cid).await?; + assert_eq!(memo.parent, Some(last_version_parent)); + + let codec = DagCborCodec; + let mut root_references = BTreeSet::new(); + codec.references::>(&block, &mut root_references)?; + + assert!(root_references.contains(&last_version_parent)); + } + + links_included.insert(cid); + + match cid.codec() { + codec if codec == u64::from(DagCborCodec) => { + let codec = DagCborCodec; + codec.references::>(&block, &mut links_referenced)?; + } + codec if codec == u64::from(RawCodec) => { + let codec = DagCborCodec; + codec.references::>(&block, &mut links_referenced)?; + } + _ => { + unreachable!("No other codecs are used in our DAGs"); + } + } + } + + assert!( + !links_included.contains(&last_version_parent), + "Parent version should not be included" + ); + + let difference = links_referenced + .difference(&links_included) + .collect::>(); + + debug!( + "Difference: {:#?}", + difference + .iter() + .map(|cid| cid.to_string()) + .collect::>() + ); + + // These files have been each updated once after the first write, so their memos have + // parent pointers to old versions that won't be included in the CAR + let last_dogs_version = sphere_context + .read("dogs") + .await? + .unwrap() + .memo + .parent + .unwrap(); + let last_birds_version = sphere_context + .read("birds") + .await? + .unwrap() + .memo + .parent + .unwrap(); + + let expected_difference: Vec<&Cid> = vec![ + &last_version_parent, + &last_birds_version, + &last_dogs_version, + ]; + + assert_eq!(difference.len(), expected_difference.len()); + + for cid in expected_difference { + assert!(difference.contains(&cid)); + } + + Ok(()) + } } diff --git a/rust/noosphere-core/src/view/sphere.rs b/rust/noosphere-core/src/view/sphere.rs index 4ca173a6d..dd47be1df 100644 --- a/rust/noosphere-core/src/view/sphere.rs +++ b/rust/noosphere-core/src/view/sphere.rs @@ -195,7 +195,7 @@ where if replication_required { debug!("Attempting to replicate from gateway..."); - replicate(link_record_version.clone(), local_version).await?; + replicate(link_record_version, local_version).await?; } Ok(Some(Sphere::at(&link_record_version, self.store()))) @@ -210,7 +210,7 @@ impl Sphere { pub fn at(cid: &Link, store: &S) -> Sphere { Sphere { store: store.clone(), - cid: cid.clone(), + cid: *cid, body: OnceCell::new(), memo: OnceCell::new(), } @@ -639,7 +639,7 @@ impl Sphere { let timeslice = timeline.slice(self.cid(), Some(old_base)).exclude_past(); let rebase_revisions = timeslice.to_chronological().await?; - let mut next_base = new_base.clone(); + let mut next_base = *new_base; for cid in rebase_revisions.iter() { let mut revision = Sphere::rebase_version(cid, &next_base, &mut store).await?; @@ -1082,7 +1082,7 @@ mod tests { let owner_did = owner_key.get_did().await.unwrap(); let (sphere, _, _) = Sphere::generate(&owner_did, &mut store).await.unwrap(); - (sphere.cid().clone(), sphere.get_identity().await.unwrap()) + (*sphere.cid(), sphere.get_identity().await.unwrap()) }; let restored_sphere = Sphere::at(&sphere_cid, &store); @@ -1162,7 +1162,7 @@ mod tests { let owner_did = owner_key.get_did().await.unwrap(); let (mut sphere, ucan, _) = Sphere::generate(&owner_did, &mut store).await.unwrap(); - let mut lineage = vec![sphere.cid().clone()]; + let mut lineage = vec![*sphere.cid()]; let foo_key = String::from("foo"); for i in 0..2u8 { @@ -1198,7 +1198,7 @@ mod tests { let owner_did = owner_key.get_did().await.unwrap(); let (mut sphere, ucan, _) = Sphere::generate(&owner_did, &mut store).await.unwrap(); - let mut lineage = vec![sphere.cid().clone()]; + let mut lineage = vec![*sphere.cid()]; for i in 0..5u8 { let mut mutation = SphereMutation::new(&owner_did); @@ -1215,7 +1215,7 @@ mod tests { lineage.push(next_cid); } - let since = lineage[2].clone(); + let since = lineage[2]; let stream = sphere.into_content_changelog_stream(Some(&since)); @@ -1666,8 +1666,8 @@ mod tests { let (sphere, authorization, _) = Sphere::generate(&owner_did, &mut store).await?; - let base = sphere.cid().clone(); - let mut long_history_version = base.clone(); + let base = *sphere.cid(); + let mut long_history_version = base; for content in ["foo", "bar", "baz"] { let memo = MemoIpld::for_body(&mut store, content.as_bytes()).await?; @@ -1747,8 +1747,8 @@ mod tests { let (sphere, authorization, _) = Sphere::generate(&owner_did, &mut store).await?; - let base = sphere.cid().clone(); - let mut long_history_version = base.clone(); + let base = *sphere.cid(); + let mut long_history_version = base; for i in 0..100 { let petname_id = (i / 6 - 1) * 6; diff --git a/rust/noosphere-core/src/view/timeline.rs b/rust/noosphere-core/src/view/timeline.rs index 2ba1d32d0..e5cc6d592 100644 --- a/rust/noosphere-core/src/view/timeline.rs +++ b/rust/noosphere-core/src/view/timeline.rs @@ -59,7 +59,7 @@ impl<'a, S: BlockStore> Timeline<'a, S> { exclude_past: bool, ) -> impl TryStream, MemoIpld)>> { stream::try_unfold( - (Some(future.clone()), past.cloned(), self.store.clone()), + (Some(*future), past.cloned(), self.store.clone()), move |(from, to, storage)| async move { match &from { Some(from) => { @@ -73,12 +73,12 @@ impl<'a, S: BlockStore> Timeline<'a, S> { if exclude_past && to.as_ref() == next_dag.parent.as_ref() { None } else { - next_dag.parent.clone() + next_dag.parent } } }; - Ok(Some(((cid.clone(), next_dag), (next_from, to, storage)))) + Ok(Some(((*cid, next_dag), (next_from, to, storage)))) } None => Ok(None), } @@ -177,7 +177,7 @@ mod tests { let owner_did = owner_key.get_did().await?; let (mut sphere, ucan, _) = Sphere::generate(&owner_did, &mut store).await?; - let mut lineage = vec![sphere.cid().clone()]; + let mut lineage = vec![*sphere.cid()]; for i in 0..5u8 { let mut mutation = SphereMutation::new(&owner_did); @@ -192,8 +192,8 @@ mod tests { lineage.push(next_cid); } - let past = lineage[4].clone(); - let future = lineage[4].clone(); + let past = lineage[4]; + let future = lineage[4]; let timeline = Timeline::new(&store); let timeslice = timeline.slice(&future, Some(&past)); @@ -216,7 +216,7 @@ mod tests { let owner_did = owner_key.get_did().await?; let (mut sphere, ucan, _) = Sphere::generate(&owner_did, &mut store).await?; - let mut lineage = vec![sphere.cid().clone()]; + let mut lineage = vec![*sphere.cid()]; for i in 0..5u8 { let mut mutation = SphereMutation::new(&owner_did); @@ -231,8 +231,8 @@ mod tests { lineage.push(next_cid); } - let past = lineage[1].clone(); - let future = lineage[3].clone(); + let past = lineage[1]; + let future = lineage[3]; let timeline = Timeline::new(&store); let timeslice = timeline.slice(&future, Some(&past)); diff --git a/rust/noosphere-gateway/Cargo.toml b/rust/noosphere-gateway/Cargo.toml index 919f82e7f..5dddc18f3 100644 --- a/rust/noosphere-gateway/Cargo.toml +++ b/rust/noosphere-gateway/Cargo.toml @@ -15,6 +15,9 @@ repository = "https://github.com/subconsciousnetwork/noosphere" homepage = "https://github.com/subconsciousnetwork/noosphere" readme = "README.md" +[features] +test-kubo = [] + [dependencies] tracing = { workspace = true } @@ -34,11 +37,11 @@ bytes = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } +tokio-util = { workspace = true } tower = { workspace = true } tower-http = { workspace = true, features = ["cors", "trace"] } async-trait = "~0.1" async-stream = { workspace = true } -deterministic-bloom = { workspace = true } url = { workspace = true, features = ["serde"] } mime_guess = "^2" @@ -47,6 +50,7 @@ noosphere-ipfs = { version = "0.8.1", path = "../noosphere-ipfs" } noosphere-core = { version = "0.17.0", path = "../noosphere-core" } noosphere-ns = { version = "0.11.1", path = "../noosphere-ns" } noosphere-storage = { version = "0.9.0", path = "../noosphere-storage" } +noosphere-common = { version = "0.1.0", path = "../noosphere-common" } ucan = { workspace = true } ucan-key-support = { workspace = true } cid = { workspace = true } diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs index 26d11da01..3ebd6f93f 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs @@ -101,7 +101,7 @@ where // These steps are order-independent let _ = tokio::join!( self.notify_name_resolver(), - self.notify_ipfs_syndicator(next_version.clone()) + self.notify_ipfs_syndicator(next_version) ); Ok(PushResponse::Accepted { @@ -131,7 +131,7 @@ where let db = gateway_sphere_context.db(); let local_sphere_base_cid = db.get_version(sphere_identity).await?.map(|cid| cid.into()); - let request_sphere_base_cid = self.request_body.local_base.clone(); + let request_sphere_base_cid = self.request_body.local_base; match (local_sphere_base_cid, request_sphere_base_cid) { (Some(mine), theirs) => { @@ -335,7 +335,7 @@ where if let Err(error) = self.name_system_tx.send(NameSystemJob::ResolveSince { context: self.sphere_context.clone(), - since: self.request_body.local_base.clone(), + since: self.request_body.local_base, }) { warn!("Failed to request name system resolutions: {}", error); }; diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs index 5fb8cd533..8d97f3c28 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs @@ -105,7 +105,7 @@ where // Always fall back to a full replication Ok(StreamBody::new(Box::pin(to_car_stream( vec![memo_version], - memo_body_stream(store, &memo_version), + memo_body_stream(store, &memo_version.into(), false), )))) } diff --git a/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs b/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs index 03d2d8030..06fdc442d 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs @@ -110,10 +110,10 @@ where // These steps are order-independent let _ = tokio::join!( self.notify_name_resolver(&push_body), - self.notify_ipfs_syndicator(next_version.clone()) + self.notify_ipfs_syndicator(next_version) ); - let roots = vec![next_version.clone().into()]; + let roots = vec![next_version.into()]; let block_stream = try_stream! { yield block_serialize::(PushResponse::Accepted { @@ -155,7 +155,7 @@ where let db = gateway_sphere_context.db(); let local_sphere_base_cid = db.get_version(sphere_identity).await?.map(|cid| cid.into()); - let request_sphere_base_cid = push_body.local_base.clone(); + let request_sphere_base_cid = push_body.local_base; match (local_sphere_base_cid, request_sphere_base_cid) { (Some(mine), theirs) => { @@ -351,7 +351,7 @@ where if let Err(error) = self.name_system_tx.send(NameSystemJob::ResolveSince { context: self.sphere_context.clone(), - since: push_body.local_base.clone(), + since: push_body.local_base, }) { warn!("Failed to request name system resolutions: {}", error); }; diff --git a/rust/noosphere-gateway/src/worker/cleanup.rs b/rust/noosphere-gateway/src/worker/cleanup.rs index 21ab9bded..3396bba87 100644 --- a/rust/noosphere-gateway/src/worker/cleanup.rs +++ b/rust/noosphere-gateway/src/worker/cleanup.rs @@ -231,7 +231,7 @@ mod tests { wait(1).await; - let mut latest_version = base_version.clone(); + let mut latest_version = base_version; for _ in 0..10 { let (_, link_record, _) = make_valid_link_record(&mut gateway_db.clone()).await?; diff --git a/rust/noosphere-gateway/src/worker/syndication.rs b/rust/noosphere-gateway/src/worker/syndication.rs index f44f11f0e..e84a878a3 100644 --- a/rust/noosphere-gateway/src/worker/syndication.rs +++ b/rust/noosphere-gateway/src/worker/syndication.rs @@ -1,29 +1,29 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{io::Cursor, sync::Arc}; use anyhow::Result; use libipld_cbor::DagCborCodec; +use noosphere_common::UnsharedStream; use noosphere_core::context::{ metadata::COUNTERPART, HasMutableSphereContext, SphereContentRead, SphereContentWrite, SphereCursor, }; +use noosphere_core::stream::{memo_body_stream, to_car_stream}; use noosphere_core::{ data::{ContentType, Did, Link, MemoIpld}, view::Timeline, }; use noosphere_ipfs::{IpfsClient, KuboClient}; -use noosphere_storage::{block_deserialize, block_serialize, BlockStore, KeyValueStore, Storage}; +use noosphere_storage::{block_deserialize, block_serialize, KeyValueStore, Storage}; use serde::{Deserialize, Serialize}; use tokio::{ io::AsyncReadExt, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, task::JoinHandle, }; -use tokio_stream::StreamExt; +use tokio_util::io::StreamReader; use url::Url; -use deterministic_bloom::const_size::BloomFilter; -use iroh_car::{CarHeader, CarWriter}; - /// A [SyndicationJob] is a request to syndicate the blocks of a _counterpart_ /// sphere to the broader IPFS network. pub struct SyndicationJob { @@ -42,10 +42,35 @@ pub struct SyndicationJob { /// gives us a short-cut to determine if a block should be added. #[derive(Serialize, Deserialize)] pub struct SyndicationCheckpoint { - pub revision: Link, - pub syndicated_blocks: BloomFilter<256, 30>, + pub last_syndicated_version: Option>, + pub syndication_epoch: u64, +} + +impl SyndicationCheckpoint { + pub fn new() -> Result { + Ok(Self { + last_syndicated_version: None, + syndication_epoch: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(), + }) + } + + pub fn lifetime(&self) -> Result { + Ok(Duration::from_secs( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs() + - self.syndication_epoch, + )) + } + + pub fn is_expired(&self) -> Result { + Ok(self.lifetime()? > MAX_SYNDICATION_CHECKPOINT_LIFETIME) + } } +// Re-syndicate every 90 days +const MAX_SYNDICATION_CHECKPOINT_LIFETIME: Duration = Duration::from_secs(60 * 60 * 24 * 90); + /// Start a Tokio task that waits for [SyndicationJob] messages and then /// attempts to syndicate to the configured IPFS RPC. Currently only Kubo IPFS /// backends are supported. @@ -105,7 +130,7 @@ where // Take a lock on the `SphereContext` and look up the most recent // syndication checkpoint for this Kubo node - let (sphere_revision, ancestor_revision, mut syndicated_blocks, db) = { + let (sphere_revision, mut syndication_checkpoint, db) = { let db = { let context = context.sphere_context().await?; context.db().clone() @@ -115,35 +140,37 @@ where let sphere = context.to_sphere().await?; let content = sphere.get_content().await?; - let counterpart_revision = content.require(&counterpart_identity).await?.clone(); - - let (last_syndicated_revision, syndicated_blocks) = - match context.read(&checkpoint_key).await? { - Some(mut file) => match file.memo.content_type() { - Some(ContentType::Cbor) => { - let mut bytes = Vec::new(); - file.contents.read_to_end(&mut bytes).await?; - let SyndicationCheckpoint { - revision, - syndicated_blocks, - } = block_deserialize::(&bytes)?; - (Some(revision), syndicated_blocks) + let counterpart_revision = *content.require(&counterpart_identity).await?; + + let syndication_checkpoint = match context.read(&checkpoint_key).await? { + Some(mut file) => match file.memo.content_type() { + Some(ContentType::Cbor) => { + let mut bytes = Vec::new(); + file.contents.read_to_end(&mut bytes).await?; + let current_checkpoint = match block_deserialize::(&bytes) { + Ok(checkpoint) => checkpoint, + _ => SyndicationCheckpoint::new()?, + }; + + if current_checkpoint.is_expired()? { + SyndicationCheckpoint::new()? + } else { + current_checkpoint } - _ => (None, BloomFilter::default()), - }, - None => (None, BloomFilter::default()), - }; - - ( - counterpart_revision, - last_syndicated_revision, - syndicated_blocks, - db, - ) + } + _ => SyndicationCheckpoint::new()?, + }, + None => SyndicationCheckpoint::new()?, + }; + + (counterpart_revision, syndication_checkpoint, db) }; let timeline = Timeline::new(&db) - .slice(&sphere_revision, ancestor_revision.as_ref()) + .slice( + &sphere_revision, + syndication_checkpoint.last_syndicated_version.as_ref(), + ) .to_chronological() .await?; @@ -151,68 +178,14 @@ where // of blocks that are unique to that revision to the backing IPFS // implementation for cid in timeline { - // TODO(#175): At each increment, if there are sub-graphs of a - // sphere that should *not* be syndicated (e.g., other spheres - // referenced by this sphere that are probably syndicated - // elsewhere), we should add them to the bloom filter at this spot. - - let stream = db.query_links(&cid, { - let filter = Arc::new(syndicated_blocks.clone()); - - move |cid| { - let filter = filter.clone(); - // let kubo_client = kubo_client.clone(); - let cid = *cid; - - async move { - // The Bloom filter probabilistically tells us if we - // have syndicated a block; it is probabilistic because - // `contains` may give us false positives. But, all - // negatives are guaranteed to not have been added. So, - // we can rely on it as a short cut to find unsyndicated - // blocks, and for positives we can verify the pin - // status with the IPFS node. - if !filter.contains(&cid.to_bytes()) { - return Ok(true); - } + let car_stream = to_car_stream(vec![cid.into()], memo_body_stream(db.clone(), &cid, true)); + let car_reader = StreamReader::new(UnsharedStream::new(Box::pin(car_stream))); - Ok(false) - } + match kubo_client.syndicate_blocks(car_reader).await { + Ok(_) => { + syndication_checkpoint.last_syndicated_version = Some(cid); + debug!("Syndicated sphere revision {} to IPFS", cid) } - }); - - // TODO(#2): It would be cool to make reading from storage and - // writing to an HTTP request body concurrent / streamed; this way - // we could send over CARs of arbitrary size (within the limits of - // whatever the IPFS receiving implementation can support). - let mut car = Vec::new(); - let car_header = CarHeader::new_v1(vec![cid.clone().into()]); - let mut car_writer = CarWriter::new(car_header, &mut car); - - tokio::pin!(stream); - - loop { - match stream.try_next().await { - Ok(Some(cid)) => { - trace!("Syndication will include block {}", cid); - // TODO(#176): We need to build-up a list of blocks that aren't - // able to be loaded so that we can be resilient to incomplete - // data when syndicating to IPFS - syndicated_blocks.insert(&cid.to_bytes()); - - let block = db.require_block(&cid).await?; - - car_writer.write(cid, block).await?; - } - Err(error) => { - warn!("Encountered error while streaming links: {:?}", error); - } - _ => break, - } - } - - match kubo_client.syndicate_blocks(Cursor::new(car)).await { - Ok(_) => debug!("Syndicated sphere revision {} to IPFS", cid), Err(error) => warn!("Failed to syndicate revision {} to IPFS: {:?}", cid, error), }; } @@ -221,10 +194,7 @@ where // update the syndication checkpoint for this particular IPFS server { let mut cursor = SphereCursor::latest(context.clone()); - let (_, bytes) = block_serialize::(&SyndicationCheckpoint { - revision, - syndicated_blocks, - })?; + let (_, bytes) = block_serialize::(&syndication_checkpoint)?; cursor .write( @@ -239,3 +209,126 @@ where } Ok(()) } + +#[cfg(all(test, feature = "test-kubo"))] +mod tests { + use std::time::Duration; + + use anyhow::Result; + use noosphere_common::helpers::wait; + use noosphere_core::{ + authority::Access, + context::{HasMutableSphereContext, HasSphereContext, SphereContentWrite, COUNTERPART}, + data::ContentType, + helpers::simulated_sphere_context, + tracing::initialize_tracing, + }; + use noosphere_ipfs::{IpfsClient, KuboClient}; + use noosphere_storage::KeyValueStore; + use tokio::select; + use url::Url; + + use crate::worker::{start_ipfs_syndication, SyndicationCheckpoint, SyndicationJob}; + + #[tokio::test(flavor = "multi_thread")] + async fn it_syndicates_a_sphere_revision_to_kubo() -> Result<()> { + initialize_tracing(None); + + let (mut user_sphere_context, _) = + simulated_sphere_context(Access::ReadWrite, None).await?; + + let (mut gateway_sphere_context, _) = simulated_sphere_context( + Access::ReadWrite, + Some(user_sphere_context.lock().await.db().clone()), + ) + .await?; + + let user_sphere_identity = user_sphere_context.identity().await?; + + gateway_sphere_context + .lock() + .await + .db_mut() + .set_key(COUNTERPART, &user_sphere_identity) + .await?; + + let ipfs_url = Url::parse("http://127.0.0.1:5001")?; + let local_kubo_client = KuboClient::new(&ipfs_url.clone())?; + + let (syndication_tx, _syndication_join_handle) = start_ipfs_syndication::<_, _>(ipfs_url); + + user_sphere_context + .write("foo", &ContentType::Text, b"bar".as_ref(), None) + .await?; + + user_sphere_context.save(None).await?; + + user_sphere_context + .write("baz", &ContentType::Text, b"bar".as_ref(), None) + .await?; + + let version = user_sphere_context.save(None).await?; + + gateway_sphere_context + .link_raw(&user_sphere_identity, &version) + .await?; + gateway_sphere_context.save(None).await?; + + debug!("Sending syndication job..."); + syndication_tx.send(SyndicationJob { + revision: version.clone(), + context: gateway_sphere_context.clone(), + })?; + + debug!("Giving syndication a moment to complete..."); + + wait(1).await; + + debug!("Looking for blocks..."); + + for _ in 0..3 { + debug!("Sending request to Kubo..."); + + select! { + maybe_block = local_kubo_client.get_block(&version) => { + if maybe_block?.is_some() { + debug!("Found block!"); + return Ok(()); + } + } + _ = tokio::time::sleep(Duration::from_secs(1)) => () + } + + debug!("No block, retrying in one second..."); + + wait(1).await; + } + + unreachable!("Syndicated block should be pinned") + } + + #[tokio::test] + async fn it_advances_syndication_checkpoint_lifetime_with_clock_time() -> Result<()> { + let checkpoint = SyndicationCheckpoint::new()?; + + let lifetime = checkpoint.lifetime()?; + + assert!(lifetime.as_secs() < 1); + + wait(1).await; + + let lifetime = checkpoint.lifetime()?; + + assert!(lifetime.as_secs() >= 1); + assert!(lifetime.as_secs() < 2); + + wait(2).await; + + let lifetime = checkpoint.lifetime()?; + + assert!(lifetime.as_secs() >= 3); + assert!(lifetime.as_secs() < 4); + + Ok(()) + } +} diff --git a/rust/noosphere-into/src/transform/sphere/html.rs b/rust/noosphere-into/src/transform/sphere/html.rs index dc35bd7fb..0eaf06d89 100644 --- a/rust/noosphere-into/src/transform/sphere/html.rs +++ b/rust/noosphere-into/src/transform/sphere/html.rs @@ -48,8 +48,8 @@ where let sphere_file = SphereFile { sphere_identity, - sphere_version: sphere.cid().clone(), - memo_version: sphere.cid().clone(), + sphere_version: *sphere.cid(), + memo_version: *sphere.cid(), memo, contents: TransformStream(sphere_to_subtext_stream(sphere)).into_reader(), }; diff --git a/rust/noosphere-ipfs/Cargo.toml b/rust/noosphere-ipfs/Cargo.toml index 1712125a1..19acc48b5 100644 --- a/rust/noosphere-ipfs/Cargo.toml +++ b/rust/noosphere-ipfs/Cargo.toml @@ -52,4 +52,6 @@ ipfs-api-prelude = "0.6" rand = { workspace = true } iroh-car = { workspace = true } libipld-cbor = { workspace = true } +libipld-json = { workspace = true } +multihash = { workspace = true } noosphere-core = { version = "0.17.0", path = "../noosphere-core" } diff --git a/rust/noosphere-ipfs/examples/car.rs b/rust/noosphere-ipfs/examples/car.rs new file mode 100644 index 000000000..d862b63fa --- /dev/null +++ b/rust/noosphere-ipfs/examples/car.rs @@ -0,0 +1,103 @@ +//! Simple utility to verify the contents of a .car file using the same +//! CAR-reading facilities in use by Noosphere more generally + +#[cfg(not(target_arch = "wasm32"))] +use std::env; + +use anyhow::Result; +use cid::Cid; +use iroh_car::CarReader; +use libipld_cbor::DagCborCodec; +use libipld_core::raw::RawCodec; +use multihash::MultihashDigest; + +#[cfg(not(target_arch = "wasm32"))] +use tokio::fs::File; + +pub fn hash_for(cid: Cid) -> &'static str { + match multihash::Code::try_from(cid.hash().code()) { + Ok(multihash::Code::Blake3_256) => "BLAKE3", + Ok(multihash::Code::Sha2_256) => "SHA-256", + Ok(_) => "Other", + Err(error) => { + println!("ERROR: {}", error); + "Error reading codec" + } + } +} + +pub fn codec_for(cid: Cid) -> &'static str { + match cid.codec() { + codec if codec == u64::from(DagCborCodec) => "DAG-CBOR", + codec if codec == u64::from(RawCodec) => "Raw", + _ => "Other", + } +} + +#[cfg(target_arch = "wasm32")] +pub fn main() {} + +#[cfg(not(target_arch = "wasm32"))] +#[cfg_attr(not(target_arch = "wasm32"), tokio::main)] +pub async fn main() -> Result<()> { + let file = if let Some(arg) = env::args().nth(1) { + println!("Opening {arg}...\n"); + File::open(arg).await? + } else { + println!("Please specify a path to a CARv1 file"); + std::process::exit(1); + }; + + let mut reader = CarReader::new(file).await?; + + let header = reader.header(); + + println!("=== Header (CARv{}) ===\n", header.version()); + + for root in header.roots() { + println!("{}", root); + } + + println!(); + + let mut index = 0usize; + + while let Some((cid, block)) = reader.next_block().await? { + println!("=== Block {} ===\n", index); + + let verification_sign = if cid.codec() == u64::from(DagCborCodec) { + let hasher = cid::multihash::Code::try_from(cid.hash().code())?; + let multihash = hasher.digest(&block); + let new_cid = Cid::new_v1(cid.codec(), multihash); + + if cid == new_cid { + "✔️" + } else { + "🚫" + } + } else { + "🤷" + }; + + println!( + "{} {} ({:?}, {}, {})\n", + verification_sign, + cid, + cid.version(), + hash_for(cid), + codec_for(cid) + ); + println!( + "{}\n", + block + .iter() + .map(|byte| format!("{:02X?}", byte)) + .collect::>() + .join(" ") + ); + + index += 1; + } + + Ok(()) +} diff --git a/rust/noosphere-ipfs/src/client/kubo.rs b/rust/noosphere-ipfs/src/client/kubo.rs index 0979cbbed..7649afe15 100644 --- a/rust/noosphere-ipfs/src/client/kubo.rs +++ b/rust/noosphere-ipfs/src/client/kubo.rs @@ -1,5 +1,7 @@ #![cfg(not(target_arch = "wasm32"))] +use std::time::Duration; + use super::{IpfsClient, IpfsClientAsyncReadSendSync}; use async_trait::async_trait; @@ -15,6 +17,7 @@ use ipfs_api_prelude::response::PinLsResponse; use libipld_cbor::DagCborCodec; use libipld_core::raw::RawCodec; use serde_json::Value; +use tokio::select; use url::Url; /// Maps a codec defined in a [Cid] to a string @@ -28,6 +31,12 @@ fn get_codec(cid: &Cid) -> Result { } } +// If Kubo's DAG import API receives a block with a reference to another block +// that it cannot find locally or on the network, it hangs indefinitely with no +// feedback +// See: https://github.com/ipfs/kubo/issues/10159 +const KUBO_DAG_IMPORT_TIMEOUT: Duration = Duration::from_secs(10); + /// A high-level HTTP client for accessing IPFS /// [Kubo RPC APIs](https://docs.ipfs.tech/reference/kubo/rpc/) and normalizing /// their expected payloads to Noosphere-friendly formats @@ -115,11 +124,16 @@ impl IpfsClient for KuboClient { let request_builder = Request::builder().method("POST").uri(&api_url.to_string()); let request = form.set_body_convert::(request_builder)?; - let response = self.client.request(request).await?; - - match response.status() { - StatusCode::OK => Ok(()), - other_status => Err(anyhow!("Unexpected status code: {}", other_status)), + select! { + response = self.client.request(request) => { + match response?.status() { + StatusCode::OK => Ok(()), + other_status => Err(anyhow!("Unexpected status code: {}", other_status)), + } + }, + _ = tokio::time::sleep(KUBO_DAG_IMPORT_TIMEOUT) => { + Err(anyhow!("Timed out")) + } } } diff --git a/rust/noosphere-storage/examples/bench/main.rs b/rust/noosphere-storage/examples/bench/main.rs index 7bcd796f9..28e687a8b 100644 --- a/rust/noosphere-storage/examples/bench/main.rs +++ b/rust/noosphere-storage/examples/bench/main.rs @@ -192,7 +192,7 @@ impl BenchmarkStorage { /// wipe any IndexedDb usage here. pub async fn dispose(self) -> Result<()> { #[cfg(target_arch = "wasm32")] - self.storage.to_inner().clear().await?; + self.storage.into_inner().clear().await?; Ok(()) } } diff --git a/rust/noosphere-storage/examples/bench/performance.rs b/rust/noosphere-storage/examples/bench/performance.rs index 09a5f0948..02a88f8ce 100644 --- a/rust/noosphere-storage/examples/bench/performance.rs +++ b/rust/noosphere-storage/examples/bench/performance.rs @@ -98,7 +98,7 @@ where } #[allow(unused)] - pub fn to_inner(self) -> S { + pub fn into_inner(self) -> S { self.storage } }