diff --git a/Cargo.lock b/Cargo.lock index 8f9a1eeda..9ba11f34a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3392,6 +3392,7 @@ dependencies = [ "noosphere-sphere", "noosphere-storage", "pathdiff", + "rand 0.8.5", "reqwest", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 464a76374..c7a5a0087 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ libipld = { version = "0.16" } libipld-core = { version = "0.16" } libipld-cbor = { version = "0.16" } pathdiff = { version = "0.2.1" } +rand = { version = "0.8" } sentry-tracing = { version = "0.31.5" } serde = { version = "^1" } serde_json = { version = "^1" } diff --git a/rust/noosphere-cli/Cargo.toml b/rust/noosphere-cli/Cargo.toml index aa4a88786..6a8d97a7d 100644 --- a/rust/noosphere-cli/Cargo.toml +++ b/rust/noosphere-cli/Cargo.toml @@ -58,6 +58,7 @@ cid = { workspace = true } symlink = { workspace = true } pathdiff = { workspace = true } subtext = "0.3.2" +rand = {workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/rust/noosphere-cli/tests/helpers/mod.rs b/rust/noosphere-cli/tests/helpers/mod.rs index 6887b1924..e8babb3a3 100644 --- a/rust/noosphere-cli/tests/helpers/mod.rs +++ b/rust/noosphere-cli/tests/helpers/mod.rs @@ -1,7 +1,9 @@ #![allow(dead_code)] mod cli; +mod random; +pub use crate::helpers::random::*; pub use cli::*; use anyhow::Result; diff --git a/rust/noosphere-cli/tests/helpers/random.rs b/rust/noosphere-cli/tests/helpers/random.rs new file mode 100644 index 000000000..54bf5cc77 --- /dev/null +++ b/rust/noosphere-cli/tests/helpers/random.rs @@ -0,0 +1,35 @@ +use rand::{rngs::StdRng, Rng, SeedableRng}; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// A helper to support consistent use of seeded randomness in tests. +/// Its primary purpose is to retain and report the seed in use for a +/// random number generator that can be shared across threads in tests. +/// This is probably not suitable for use outside of tests. +pub struct TestEntropy { + seed: [u8; 32], + rng: Arc>, +} + +impl Default for TestEntropy { + fn default() -> Self { + Self::from_seed(rand::thread_rng().gen::<[u8; 32]>()) + } +} + +impl TestEntropy { + pub fn from_seed(seed: [u8; 32]) -> Self { + tracing::info!(?seed, "Initializing test entropy..."); + + let rng = Arc::new(Mutex::new(SeedableRng::from_seed(seed.clone()))); + Self { seed, rng } + } + + pub fn to_rng(&self) -> Arc> { + self.rng.clone() + } + + pub fn seed(&self) -> &[u8] { + &self.seed + } +} diff --git a/rust/noosphere-cli/tests/peer_to_peer.rs b/rust/noosphere-cli/tests/peer_to_peer.rs index bb5c5adc8..991e6b1e1 100644 --- a/rust/noosphere-cli/tests/peer_to_peer.rs +++ b/rust/noosphere-cli/tests/peer_to_peer.rs @@ -14,12 +14,15 @@ use noosphere_sphere::{ SpherePetnameRead, SpherePetnameWrite, SphereReplicaRead, SphereSync, SphereWalker, SyncRecovery, }; +use rand::Rng; use std::collections::BTreeSet; use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio_stream::StreamExt; use url::Url; +use crate::helpers::TestEntropy; + #[tokio::test] async fn gateway_publishes_and_resolves_petnames_configured_by_the_client() -> Result<()> { initialize_tracing(None); @@ -669,3 +672,113 @@ async fn local_lineage_remains_sparse_as_graph_changes_accrue_over_time() -> Res ns_task.abort(); Ok(()) } + +#[tokio::test(flavor = "multi_thread")] +async fn clients_can_sync_when_there_is_a_lot_of_content() -> Result<()> { + initialize_tracing(None); + + let entropy = TestEntropy::default(); + let rng = entropy.to_rng(); + + let ipfs_url = Url::parse("http://127.0.0.1:5001").unwrap(); + let (ns_url, ns_task) = start_name_system_server(&ipfs_url).await.unwrap(); + + let mut pair_1 = SpherePair::new("ONE", &ipfs_url, &ns_url).await?; + let mut pair_2 = SpherePair::new("TWO", &ipfs_url, &ns_url).await?; + + pair_1.start_gateway().await?; + pair_2.start_gateway().await?; + + let peer_2_identity = pair_2.client.workspace.sphere_identity().await?; + let pair_2_rng = rng.clone(); + + pair_2 + .spawn(|mut ctx| async move { + let mut rng = pair_2_rng.lock().await; + + // Long history, small-ish files + for _ in 0..1000 { + let random_index = rng.gen_range(0..100); + let mut random_bytes = Vec::from(rng.gen::<[u8; 32]>()); + let slug = format!("slug{}", random_index); + + let next_bytes = if let Some(mut file) = ctx.read(&slug).await? { + let mut file_bytes = Vec::new(); + file.contents.read_to_end(&mut file_bytes).await?; + file_bytes.append(&mut random_bytes); + file_bytes + } else { + random_bytes + }; + + ctx.write(&slug, &ContentType::Bytes, next_bytes.as_ref(), None) + .await?; + ctx.save(None).await?; + } + + Ok(ctx.sync(SyncRecovery::Retry(3)).await?) + }) + .await?; + + let pair_1_rng = rng.clone(); + + pair_1 + .spawn(|mut ctx| async move { + let mut rng = pair_1_rng.lock().await; + + // Modest history, large-ish files + for _ in 0..100 { + let mut random_bytes = (0..1000).fold(Vec::new(), |mut bytes, _| { + bytes.append(&mut Vec::from(rng.gen::<[u8; 32]>())); + bytes + }); + let random_index = rng.gen_range(0..10); + let slug = format!("slug{}", random_index); + + let next_bytes = if let Some(mut file) = ctx.read(&slug).await? { + let mut file_bytes = Vec::new(); + file.contents.read_to_end(&mut file_bytes).await?; + file_bytes.append(&mut random_bytes); + file_bytes + } else { + random_bytes + }; + + ctx.write(&slug, &ContentType::Bytes, next_bytes.as_ref(), None) + .await?; + + ctx.save(None).await?; + } + + ctx.sync(SyncRecovery::Retry(3)).await?; + + ctx.set_petname("peer2", Some(peer_2_identity)).await?; + + ctx.save(None).await?; + + ctx.sync(SyncRecovery::Retry(3)).await?; + + // TODO(#606): Implement this part of the test when we "fix" latency asymmetry between + // name system and syndication workers. We should be able to test traversing to a peer + // after a huge update as been added to the name system. + /* + wait(1).await; + + ctx.sync(SyncRecovery::Retry(3)).await?; + + wait(1).await; + + let cursor = SphereCursor::latest(ctx); + let _peer2_ctx = cursor + .traverse_by_petnames(&["peer2".into()]) + .await? + .unwrap(); + */ + Ok(()) + }) + .await?; + + ns_task.abort(); + + Ok(()) +} diff --git a/rust/noosphere-gateway/src/gateway.rs b/rust/noosphere-gateway/src/gateway.rs index 114a4152a..f72914e77 100644 --- a/rust/noosphere-gateway/src/gateway.rs +++ b/rust/noosphere-gateway/src/gateway.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use axum::extract::DefaultBodyLimit; use axum::http::{HeaderValue, Method}; use axum::routing::{get, put}; use axum::{Extension, Router, Server}; @@ -23,6 +24,8 @@ use crate::{ use noosphere_core::tracing::initialize_tracing; +pub const DEFAULT_BODY_LENGTH_LIMIT: usize = 100 /* MB */ * 1000 * 1000; + #[derive(Clone, Debug)] pub struct GatewayScope { /// Identity of gateway sphere. @@ -99,6 +102,7 @@ where .layer(Extension(gateway_key_did)) .layer(Extension(syndication_tx)) .layer(Extension(name_system_tx)) + .layer(DefaultBodyLimit::max(DEFAULT_BODY_LENGTH_LIMIT)) .layer(cors) .layer(TraceLayer::new_for_http()); diff --git a/rust/noosphere-ipfs/Cargo.toml b/rust/noosphere-ipfs/Cargo.toml index 77695e8be..646e9ba44 100644 --- a/rust/noosphere-ipfs/Cargo.toml +++ b/rust/noosphere-ipfs/Cargo.toml @@ -50,7 +50,7 @@ ipfs-api-prelude = "0.6" [dev-dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -rand = "~0.8" +rand = { workspace = true } iroh-car = { workspace = true } libipld-cbor = { workspace = true } noosphere-core = { version = "0.15.1", path = "../noosphere-core" } diff --git a/rust/noosphere-ns/Cargo.toml b/rust/noosphere-ns/Cargo.toml index 7a7763d49..d15807b84 100644 --- a/rust/noosphere-ns/Cargo.toml +++ b/rust/noosphere-ns/Cargo.toml @@ -57,7 +57,7 @@ url = { version = "^2", features = [ "serde" ], optional = true } [dev-dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -rand = { version = "0.8.5" } +rand = { workspace = true } libipld-cbor = { workspace = true } tempfile = { workspace = true } diff --git a/rust/noosphere-sphere/src/sync/strategy.rs b/rust/noosphere-sphere/src/sync/strategy.rs index 1c76157a3..115816b3a 100644 --- a/rust/noosphere-sphere/src/sync/strategy.rs +++ b/rust/noosphere-sphere/src/sync/strategy.rs @@ -359,6 +359,13 @@ where .bundle_until_ancestor(local_sphere_base.as_ref()) .await?; + let mut byte_count = 0; + for bytes in bundle.map().values() { + byte_count += bytes.len(); + } + + trace!("Total bytes in bundle to be pushed: {}", byte_count); + let client = context.client().await?; let local_sphere_identity = context.identity(); diff --git a/rust/noosphere/src/ffi/mod.rs b/rust/noosphere/src/ffi/mod.rs index 061431359..2f9270bbc 100644 --- a/rust/noosphere/src/ffi/mod.rs +++ b/rust/noosphere/src/ffi/mod.rs @@ -1,3 +1,6 @@ +// TODO(getditto/safer_ffi#181): Re-enable this lint +#![allow(clippy::incorrect_clone_impl_on_copy_type)] + mod authority; mod context; mod error;