diff --git a/Cargo.lock b/Cargo.lock index ec3d9c974..77532fa02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -694,9 +694,9 @@ checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" [[package]] name = "byteorder" -version = "1.5.0" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" @@ -3521,9 +3521,7 @@ dependencies = [ "async-recursion", "async-stream", "async-trait", - "axum", "base64 0.21.4", - "byteorder", "bytes", "cid", "console_error_panic_hook", @@ -3533,6 +3531,7 @@ dependencies = [ "futures-util", "getrandom 0.2.10", "gloo-net", + "headers", "instant", "iroh-car", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index 0bcc129c7..9a4569e6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,8 @@ anyhow = { version = "1" } async-recursion = { version = "1" } async-stream = { version = "0.3" } axum = { version = "^0.6.18" } +base64 = { version = "^0.21" } +byteorder = { version = "~1.4" } # keep in sync with pinned libipld-* crates bytes = { version = "^1" } cid = { version = "0.10" } deterministic-bloom = { version = "0.1.0" } @@ -29,6 +31,7 @@ futures = { version = "0.3" } futures-util = { version = "0.3" } gloo-net = { version = "0.4" } gloo-timers = { version = "0.3", features = ["futures"] } +headers = { version = "=0.3.8" } # Match version used by `axum`. ignore = { version = "0.4.20" } instant = { version = "0.1", features = ["wasm-bindgen"] } iroh-car = { version = "^0.3.0" } diff --git a/rust/noosphere-cli/src/native/commands/serve.rs b/rust/noosphere-cli/src/native/commands/serve.rs index 150aa380e..0d37bc9b9 100644 --- a/rust/noosphere-cli/src/native/commands/serve.rs +++ b/rust/noosphere-cli/src/native/commands/serve.rs @@ -3,7 +3,8 @@ use crate::native::workspace::Workspace; use anyhow::Result; -use noosphere_gateway::{start_gateway, GatewayScope}; +use noosphere_core::context::HasSphereContext; +use noosphere_gateway::{Gateway, SingleTenantGatewayManager}; use std::net::{IpAddr, TcpListener}; use url::Url; @@ -19,25 +20,30 @@ pub async fn serve( workspace.ensure_sphere_initialized()?; let listener = TcpListener::bind((interface, port))?; - let counterpart = workspace.counterpart_identity().await?; - - let identity = workspace.sphere_identity().await?; - - let gateway_scope = GatewayScope { - identity, - counterpart, - }; - let sphere_context = workspace.sphere_context().await?; - - start_gateway( - listener, - gateway_scope, - sphere_context, - ipfs_api, - name_resolver_api, - cors_origin, - ) - .await + let gateway_identity = sphere_context + .sphere_context() + .await? + .author() + .did() + .await?; + let manager = SingleTenantGatewayManager::new(sphere_context, counterpart.clone()).await?; + + let gateway = Gateway::new(manager, ipfs_api, name_resolver_api, cors_origin)?; + + info!( + r#"A geist is summoned to manage local sphere {} + + It has bound a gateway to {:?} + It awaits updates from sphere {}..."#, + gateway_identity, + listener + .local_addr() + .expect("Unexpected missing listener address"), + counterpart + ); + + gateway.start(listener).await?; + Ok(()) } diff --git a/rust/noosphere-cli/src/native/helpers/workspace.rs b/rust/noosphere-cli/src/native/helpers/workspace.rs index a4ac4538f..2e9826d0b 100644 --- a/rust/noosphere-cli/src/native/helpers/workspace.rs +++ b/rust/noosphere-cli/src/native/helpers/workspace.rs @@ -14,11 +14,8 @@ use noosphere::{ NoosphereContext, NoosphereContextConfiguration, NoosphereNetwork, NoosphereSecurity, NoosphereStorage, NoosphereStorageConfig, NoosphereStoragePath, }; -use noosphere_core::{ - context::HasSphereContext, - data::{Did, Mnemonic}, -}; -use noosphere_gateway::{start_gateway, GatewayScope}; +use noosphere_core::data::{Did, Mnemonic}; +use noosphere_gateway::{Gateway, SingleTenantGatewayManager}; use noosphere_ns::{helpers::NameSystemNetwork, server::start_name_system_api_server}; use tokio::{sync::Mutex, task::JoinHandle}; use url::Url; @@ -56,25 +53,15 @@ async fn start_gateway_for_workspace( ))?; let gateway_sphere_context = workspace.sphere_context().await?; - - let client_sphere_identity = client_sphere_identity.clone(); + let client_sphere_identity = client_sphere_identity.to_owned(); let ns_url = ns_url.clone(); let ipfs_url = ipfs_url.clone(); + let manager = + SingleTenantGatewayManager::new(gateway_sphere_context, client_sphere_identity).await?; let join_handle = tokio::spawn(async move { - start_gateway( - gateway_listener, - GatewayScope { - identity: gateway_sphere_context.identity().await.unwrap(), - counterpart: client_sphere_identity, - }, - gateway_sphere_context, - ipfs_url, - ns_url, - None, - ) - .await - .unwrap() + let gateway = Gateway::new(manager, ipfs_url, ns_url, None).unwrap(); + gateway.start(gateway_listener).await.unwrap() }); Ok((gateway_url, join_handle)) @@ -241,6 +228,7 @@ impl SpherePair { if self.gateway_task.is_some() { return Err(anyhow::anyhow!("Gateway already started.")); } + let (gateway_url, gateway_task) = start_gateway_for_workspace( &self.gateway.workspace, &self.client.identity, diff --git a/rust/noosphere-collections/Cargo.toml b/rust/noosphere-collections/Cargo.toml index 0d4c2ae1f..778ee001b 100644 --- a/rust/noosphere-collections/Cargo.toml +++ b/rust/noosphere-collections/Cargo.toml @@ -23,7 +23,7 @@ cid = { workspace = true } forest_hash_utils = "0.1.0" serde = { workspace = true } serde_bytes = "0.11" -byteorder = "^1.5" +byteorder = { workspace = true } async-recursion = { workspace = true } libipld-core = { workspace = true } libipld-cbor = { workspace = true } diff --git a/rust/noosphere-core/Cargo.toml b/rust/noosphere-core/Cargo.toml index bcd2cba57..d312f5626 100644 --- a/rust/noosphere-core/Cargo.toml +++ b/rust/noosphere-core/Cargo.toml @@ -44,8 +44,7 @@ futures-util = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_urlencoded = { workspace = true } -byteorder = "^1.5" -base64 = "0.21" +base64 = { workspace = true } ed25519-zebra = "^3" rand = { workspace = true } once_cell = "^1" @@ -57,7 +56,7 @@ reqwest = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } tokio-util = { workspace = true, features = ["io"] } - +headers = { workspace = true } noosphere-common = { version = "0.1.2", path = "../noosphere-common" } noosphere-storage = { version = "0.9.3", path = "../noosphere-storage" } noosphere-collections = { version = "0.6.7", path = "../noosphere-collections" } @@ -74,7 +73,6 @@ noosphere-common = { version = "0.1.2", path = "../noosphere-common", features = [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { workspace = true, features = ["full"] } tracing-subscriber = { workspace = true } -axum = { workspace = true, features = ["headers", "macros"] } [target.'cfg(target_arch = "wasm32")'.dependencies] # NOTE: This is needed so that rand can be included in WASM builds diff --git a/rust/noosphere-core/src/api/client.rs b/rust/noosphere-core/src/api/client.rs index 0e29addb7..9c618e000 100644 --- a/rust/noosphere-core/src/api/client.rs +++ b/rust/noosphere-core/src/api/client.rs @@ -5,7 +5,6 @@ use crate::{ error::NoosphereError, stream::{from_car_stream, memo_history_stream, put_block_stream, to_car_stream}, }; - use anyhow::{anyhow, Result}; use async_stream::try_stream; use bytes::Bytes; @@ -88,10 +87,11 @@ where let client = reqwest::Client::new(); - let mut url = api_base.clone(); - url.set_path(&v0alpha1::Route::Did.to_string()); - - let did_response = client.get(url).send().await?; + let did_response = { + let mut url = api_base.clone(); + url.set_path(&v0alpha1::Route::Did.to_string()); + client.get(url).send().await? + }; match did_response.status() { StatusCode::OK => (), diff --git a/rust/noosphere-core/src/api/headers/mod.rs b/rust/noosphere-core/src/api/headers/mod.rs new file mode 100644 index 000000000..7cd3ea774 --- /dev/null +++ b/rust/noosphere-core/src/api/headers/mod.rs @@ -0,0 +1,9 @@ +//! A collection of typed [headers::Header] implementations +//! used in gateway APIs. + +#[cfg(doc)] +use headers; + +mod ucan; + +pub use self::ucan::*; diff --git a/rust/noosphere-core/src/api/headers/ucan.rs b/rust/noosphere-core/src/api/headers/ucan.rs new file mode 100644 index 000000000..730e62b2e --- /dev/null +++ b/rust/noosphere-core/src/api/headers/ucan.rs @@ -0,0 +1,96 @@ +use crate::{authority::SUPPORTED_KEYS, data::Jwt}; +use anyhow::anyhow; +use cid::Cid; +use headers::{self, Header, HeaderName, HeaderValue}; +use once_cell::sync::Lazy; +use ucan::{chain::ProofChain, crypto::did::DidParser, store::UcanJwtStore}; + +static UCAN_NAME: Lazy = Lazy::new(|| HeaderName::from_static("ucan")); + +/// A typed header for the `ucan` header, a tuple of [Cid] and [Jwt], +/// adhering to the [UCAN as Bearer Token](https://github.com/ucan-wg/ucan-http-bearer-token) +/// specification. +/// +/// TODO(#708): Note that in the 0.3.0 spec, a single `ucans` header is used. +/// This implementation is based on an earlier version with multiple `ucan` +/// headers. +/// +/// The values are **not** validated during parsing beyond legitimate +/// looking [Cid]s and [Jwt]s here. Use [Ucan::as_proof_chain] to +/// validate and construct a [ProofChain]. +pub struct Ucan(Vec<(Cid, Jwt)>); + +impl Header for Ucan { + fn name() -> &'static HeaderName { + &UCAN_NAME + } + + fn decode<'i, I>(values: &mut I) -> Result + where + I: Iterator, + { + let mut ucans = vec![]; + for header in values.by_ref() { + let value = header.to_str().map_err(|_| headers::Error::invalid())?; + let mut parts: Vec<&str> = value.split_ascii_whitespace().take(2).collect(); + + let jwt: Jwt = parts + .pop() + .ok_or_else(headers::Error::invalid)? + .to_string() + .into(); + let cid: Cid = parts + .pop() + .ok_or_else(headers::Error::invalid)? + .try_into() + .map_err(|_| headers::Error::invalid())?; + + ucans.push((cid, jwt)); + } + + Ok(Ucan(ucans)) + } + + fn encode(&self, values: &mut E) + where + E: Extend, + { + for (cid, jwt) in self.0.iter() { + let value = HeaderValue::from_str(&format!("{} {}", cid, jwt)).unwrap(); + values.extend(std::iter::once(value)); + } + } +} + +impl Ucan { + /// Construct a [ProofChain] from the collected `ucan` header + /// values, validating the cryptographic integrity and time bounds + /// of the UCANs. Capabilities can then be assessed using the [ProofChain]. + pub async fn as_proof_chain( + &self, + bearer: &headers::authorization::Bearer, + mut db: impl UcanJwtStore, + ) -> anyhow::Result { + for (cid, jwt) in self.0.iter() { + // TODO(#261): We need a worker process that purges garbage UCANs + let actual_cid = db.write_token(jwt.into()).await?; + if actual_cid != *cid { + return Err(anyhow!("Cid and Jwt do not match.")); + } + } + + let mut did_parser = DidParser::new(SUPPORTED_KEYS); + let proof_chain = + ProofChain::try_from_token_string(bearer.token(), None, &mut did_parser, &db).await?; + + proof_chain.ucan().validate(None, &mut did_parser).await?; + + Ok(proof_chain) + } +} + +impl From for Vec<(Cid, Jwt)> { + fn from(value: Ucan) -> Self { + value.0 + } +} diff --git a/rust/noosphere-core/src/api/mod.rs b/rust/noosphere-core/src/api/mod.rs index 2973c1d19..baeab3252 100644 --- a/rust/noosphere-core/src/api/mod.rs +++ b/rust/noosphere-core/src/api/mod.rs @@ -5,9 +5,9 @@ mod client; mod data; mod route; -pub use client::*; -pub use data::*; -pub use route::*; - +pub mod headers; pub mod v0alpha1; pub mod v0alpha2; + +pub use client::*; +pub use data::*; diff --git a/rust/noosphere-core/src/data/mod.rs b/rust/noosphere-core/src/data/mod.rs index 3631f7f4b..5ef8eccd4 100644 --- a/rust/noosphere-core/src/data/mod.rs +++ b/rust/noosphere-core/src/data/mod.rs @@ -14,12 +14,12 @@ mod sphere; mod strings; mod versioned_map; +pub use self::headers::*; pub use address::*; pub use authority::*; pub use body_chunk::*; pub use bundle::*; pub use changelog::*; -pub use headers::*; pub use link::*; pub use memo::*; pub use sphere::*; diff --git a/rust/noosphere-gateway/src/authority.rs b/rust/noosphere-gateway/src/authority.rs deleted file mode 100644 index 696eca6c9..000000000 --- a/rust/noosphere-gateway/src/authority.rs +++ /dev/null @@ -1,151 +0,0 @@ -use std::sync::Arc; - -use anyhow::Result; -use async_trait::async_trait; -use axum::{ - extract::FromRequestParts, - headers::{authorization::Bearer, Authorization}, - http::{request::Parts, StatusCode}, - TypedHeader, -}; -use libipld_core::cid::Cid; -use noosphere_core::authority::{SphereAbility, SphereReference, SPHERE_SEMANTICS}; -use noosphere_core::context::SphereContext; -use noosphere_storage::Storage; -use tokio::sync::Mutex; -use ucan::{capability::CapabilityView, chain::ProofChain, store::UcanJwtStore}; - -use super::GatewayScope; - -/// This is a construct that can be generated on a per-request basis and -/// embodies the authorization status of the request-maker as it is -/// represented by a UCAN. Any request handler can use a GatewayAuthority -/// to test if a required capability is satisfied by the authorization -/// presented by the maker of the request. -pub struct GatewayAuthority { - proof: ProofChain, - scope: GatewayScope, - marker: std::marker::PhantomData, -} - -impl GatewayAuthority -where - S: Storage + 'static, -{ - pub fn try_authorize( - &self, - capability: &CapabilityView, - ) -> Result<(), StatusCode> { - let capability_infos = self.proof.reduce_capabilities(&SPHERE_SEMANTICS); - - for capability_info in capability_infos { - trace!("Checking capability: {:?}", capability_info.capability); - if capability_info - .originators - .contains(self.scope.counterpart.as_str()) - && capability_info.capability.enables(capability) - { - debug!("Authorized!"); - return Ok(()); - } - } - - Err(StatusCode::UNAUTHORIZED) - } -} - -#[async_trait] -impl FromRequestParts for GatewayAuthority -where - State: Send + Sync, - S: Storage + 'static, -{ - type Rejection = StatusCode; - - async fn from_request_parts(parts: &mut Parts, state: &State) -> Result { - let sphere_context = parts - .extensions - .get::>>>() - .ok_or_else(|| { - error!("Could not find DidParser in extensions"); - StatusCode::INTERNAL_SERVER_ERROR - })? - .clone(); - - // Get the scope of this gateway - let gateway_scope = parts - .extensions - .get::() - .ok_or_else(|| { - error!("Could not find GatewayScope in extensions"); - StatusCode::INTERNAL_SERVER_ERROR - })? - .clone(); - - // Extract the bearer token - let TypedHeader(Authorization(bearer)) = - TypedHeader::>::from_request_parts(parts, state) - .await - .map_err(|error| { - error!("{:?}", error); - StatusCode::BAD_REQUEST - })?; - - let mut db = { - let sphere_context = sphere_context.lock().await; - sphere_context.db().clone() - }; - - let ucan_headers = parts.headers.get_all("ucan").into_iter(); - - // TODO: We should write a typed header thing for this: - for header in ucan_headers { - let value = header.to_str().map_err(|_| StatusCode::BAD_REQUEST)?; - let mut parts: Vec<&str> = value.split_ascii_whitespace().take(2).collect(); - - let jwt = parts.pop().ok_or(StatusCode::BAD_REQUEST)?; - - let cid_string = parts.pop().ok_or(StatusCode::BAD_REQUEST)?; - let claimed_cid = Cid::try_from(cid_string).map_err(|_| StatusCode::BAD_REQUEST)?; - - // TODO(#261): We need a worker process that purges garbage UCANs - let actual_cid = db - .write_token(jwt) - .await - .map_err(|_| StatusCode::BAD_REQUEST)?; - - if claimed_cid != actual_cid { - return Err(StatusCode::BAD_REQUEST); - } - } - - let proof_chain = { - let mut sphere_context = sphere_context.lock().await; - let did_parser = sphere_context.did_parser_mut(); - let proof_chain = - ProofChain::try_from_token_string(bearer.token(), None, did_parser, &db) - .await - .map_err(|error| { - error!("{:?}", error); - StatusCode::BAD_REQUEST - })?; - - proof_chain - .ucan() - .validate(None, did_parser) - .await - .map_err(|error| { - error!("{:?}", error); - StatusCode::UNAUTHORIZED - })?; - - proof_chain - }; - - Ok(GatewayAuthority { - scope: gateway_scope.clone(), - proof: proof_chain, - marker: std::marker::PhantomData, - }) - } -} diff --git a/rust/noosphere-gateway/src/extractors/authority.rs b/rust/noosphere-gateway/src/extractors/authority.rs new file mode 100644 index 000000000..abb19fa94 --- /dev/null +++ b/rust/noosphere-gateway/src/extractors/authority.rs @@ -0,0 +1,94 @@ +use anyhow::Result; +use async_trait::async_trait; +use axum::{ + extract::FromRequestParts, + headers::{authorization::Bearer, Authorization}, + http::{request::Parts, StatusCode}, + TypedHeader, +}; +use noosphere_core::{ + api::headers::{self as noosphere_headers}, + authority::{SphereAbility, SphereReference, SPHERE_SEMANTICS}, + context::HasMutableSphereContext, + data::Did, +}; +use noosphere_storage::Storage; +use ucan::capability::CapabilityView; + +use crate::extractors::map_bad_request; + +/// Represents the scope of a gateway request's authorization and sphere +/// access. +/// +/// Embodies the authorization status of the request-maker as it is +/// represented by its `ucan` headers. Any request handler can use a [GatewayAuthority] +/// to test if a required capability is satisfied by the authorization +/// presented by the maker of the request. +pub struct GatewayAuthority { + bearer: Bearer, + ucans: noosphere_headers::Ucan, +} + +impl GatewayAuthority { + pub async fn try_authorize( + &self, + sphere_context: &mut C, + counterpart: &Did, + capability: &CapabilityView, + ) -> Result<(), StatusCode> + where + C: HasMutableSphereContext, + S: Storage + 'static, + { + let db = { + let sphere_context: C::SphereContext = sphere_context + .sphere_context() + .await + .map_err(map_bad_request)?; + sphere_context.db().clone() + }; + + let proof_chain = self + .ucans + .as_proof_chain(&self.bearer, db) + .await + .map_err(map_bad_request)?; + + let capability_infos = proof_chain.reduce_capabilities(&SPHERE_SEMANTICS); + + for capability_info in capability_infos { + trace!("Checking capability: {:?}", capability_info.capability); + if capability_info.originators.contains(counterpart.as_str()) + && capability_info.capability.enables(capability) + { + debug!("Authorized!"); + return Ok(()); + } + } + + Err(StatusCode::UNAUTHORIZED) + } +} + +#[async_trait] +impl FromRequestParts for GatewayAuthority +where + State: Send + Sync, +{ + type Rejection = StatusCode; + + async fn from_request_parts(parts: &mut Parts, state: &State) -> Result { + let TypedHeader(Authorization(bearer)) = + TypedHeader::>::from_request_parts(parts, state) + .await + .map_err(map_bad_request)?; + + let ucans: noosphere_headers::Ucan = + TypedHeader::::from_request_parts(parts, state) + .await + .map_err(map_bad_request)? + .0; + + Ok(GatewayAuthority { bearer, ucans }) + } +} diff --git a/rust/noosphere-gateway/src/extractor.rs b/rust/noosphere-gateway/src/extractors/cbor.rs similarity index 100% rename from rust/noosphere-gateway/src/extractor.rs rename to rust/noosphere-gateway/src/extractors/cbor.rs diff --git a/rust/noosphere-gateway/src/extractors/mod.rs b/rust/noosphere-gateway/src/extractors/mod.rs new file mode 100644 index 000000000..076e0e7b2 --- /dev/null +++ b/rust/noosphere-gateway/src/extractors/mod.rs @@ -0,0 +1,16 @@ +//! Axum extractors, used to create arguments in routes from a request. + +mod authority; +mod cbor; +mod scope; +mod sphere_extractor; + +pub use authority::*; +pub use cbor::*; +pub use scope::*; +pub use sphere_extractor::*; + +pub(crate) fn map_bad_request(error: E) -> axum::http::StatusCode { + tracing::error!("{:?}", error); + axum::http::StatusCode::BAD_REQUEST +} diff --git a/rust/noosphere-gateway/src/extractors/scope.rs b/rust/noosphere-gateway/src/extractors/scope.rs new file mode 100644 index 000000000..897ee3ec5 --- /dev/null +++ b/rust/noosphere-gateway/src/extractors/scope.rs @@ -0,0 +1,57 @@ +use std::{marker::PhantomData, sync::Arc}; + +use crate::GatewayManager; +use anyhow::Result; +use async_trait::async_trait; +use axum::{ + extract::FromRequestParts, + http::{request::Parts, StatusCode}, +}; +use noosphere_core::{context::HasMutableSphereContext, data::Did}; +use noosphere_storage::Storage; + +#[cfg(doc)] +use noosphere_core::context::SphereContext; + +use super::map_bad_request; + +/// Represents the scope of a gateway request as a counterpart [Did], +/// and the corresponding managed sphere's author/device key, +/// the gateway identity. +/// +/// Extracting a [GatewayScope] is efficient, and does not open +/// a [SphereContext]. +pub struct GatewayScope { + pub counterpart: Did, + pub gateway_identity: Did, + sphere_context_marker: PhantomData, + storage_marker: PhantomData, +} + +#[async_trait] +impl FromRequestParts> for GatewayScope +where + M: GatewayManager + 'static, + C: HasMutableSphereContext, + S: Storage + 'static, +{ + type Rejection = StatusCode; + + async fn from_request_parts( + parts: &mut Parts, + state: &Arc, + ) -> Result { + let counterpart = state.extract_counterpart(parts).await?; + let gateway_identity = state + .get_gateway_identity(&counterpart) + .await + .map_err(map_bad_request)?; + + Ok(GatewayScope { + counterpart, + gateway_identity, + sphere_context_marker: PhantomData, + storage_marker: PhantomData, + }) + } +} diff --git a/rust/noosphere-gateway/src/extractors/sphere_extractor.rs b/rust/noosphere-gateway/src/extractors/sphere_extractor.rs new file mode 100644 index 000000000..096d12528 --- /dev/null +++ b/rust/noosphere-gateway/src/extractors/sphere_extractor.rs @@ -0,0 +1,63 @@ +use std::{marker::PhantomData, sync::Arc}; + +use crate::{extractors::map_bad_request, GatewayManager}; +use anyhow::Result; +use async_trait::async_trait; +use axum::{ + extract::FromRequestParts, + http::{request::Parts, StatusCode}, +}; +use noosphere_core::context::HasMutableSphereContext; +use noosphere_storage::Storage; + +#[cfg(doc)] +use noosphere_core::context::SphereContext; + +/// A wrapper type around a [SphereContext] scoped by the counterpart +/// parsed by [GatewayManager::extract_counterpart]. +#[derive(Clone)] +pub struct SphereExtractor +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + sphere: C, + storage_marker: PhantomData, +} + +impl SphereExtractor +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + /// Returns the inner sphere. + pub fn into_inner(self) -> C { + self.sphere + } +} + +#[async_trait] +impl FromRequestParts> for SphereExtractor +where + M: GatewayManager + 'static, + C: HasMutableSphereContext, + S: Storage + 'static, +{ + type Rejection = StatusCode; + + async fn from_request_parts( + parts: &mut Parts, + state: &Arc, + ) -> Result { + let counterpart = state.extract_counterpart(parts).await?; + let sphere = state + .get_sphere_context(&counterpart) + .await + .map_err(map_bad_request)?; + + Ok(SphereExtractor { + sphere, + storage_marker: PhantomData, + }) + } +} diff --git a/rust/noosphere-gateway/src/gateway.rs b/rust/noosphere-gateway/src/gateway.rs index 57e7f8145..587f3585d 100644 --- a/rust/noosphere-gateway/src/gateway.rs +++ b/rust/noosphere-gateway/src/gateway.rs @@ -3,17 +3,18 @@ use axum::extract::DefaultBodyLimit; use axum::http::{HeaderValue, Method}; use axum::routing::{get, put}; use axum::{Extension, Router, Server}; +use noosphere_core::api::{v0alpha1, v0alpha2}; use noosphere_core::context::HasMutableSphereContext; -use noosphere_core::data::Did; use noosphere_ipfs::KuboClient; use noosphere_storage::Storage; use std::net::TcpListener; +use std::sync::Arc; +use tokio::task::JoinHandle; use tower_http::cors::{Any, CorsLayer}; use tower_http::trace::TraceLayer; use url::Url; -use noosphere_core::api::{v0alpha1, v0alpha2}; - +use crate::GatewayManager; use crate::{ handlers, worker::{ @@ -21,130 +22,118 @@ use crate::{ NameSystemConnectionType, }, }; - use noosphere_core::tracing::initialize_tracing; const DEFAULT_BODY_LENGTH_LIMIT: usize = 100 /* MB */ * 1000 * 1000; -/// A [GatewayScope] describes the pairing of a gateway and its designated user -/// via their spheres' respective [Did]s -#[derive(Clone, Debug)] -pub struct GatewayScope { - /// Identity of gateway sphere. - pub identity: Did, - /// Identity of a managed sphere that is being reflected by gateway sphere. - pub counterpart: Did, -} - -/// Start a Noosphere Gateway -pub async fn start_gateway( - listener: TcpListener, - gateway_scope: GatewayScope, - sphere_context: C, - ipfs_api: Url, - name_resolver_api: Url, - cors_origin: Option, -) -> Result<()> -where - C: HasMutableSphereContext + 'static, - S: Storage + 'static, -{ - initialize_tracing(None); +type WorkerHandles = Vec>>; - let gateway_key_did = { - let sphere_context = sphere_context.sphere_context().await?; - sphere_context.author().identity().await? - }; - let mut cors = CorsLayer::new(); +/// Represents a Noosphere gateway server. +pub struct Gateway { + app: Router, + worker_handles: WorkerHandles, +} - if let Some(cors_origin) = cors_origin { - cors = cors - .allow_origin( - cors_origin - .origin() - .unicode_serialization() - .as_str() - .parse::()?, +impl Gateway { + /// Create a new Noosphere `Gateway`, initializing worker threads + /// and router configurations. Use [Gateway::start] to start the server. + pub fn new( + manager: M, + ipfs_api: Url, + name_resolver_api: Url, + cors_origin: Option, + ) -> Result + where + M: GatewayManager + 'static, + C: HasMutableSphereContext + 'static, + S: Storage + 'static, + { + initialize_tracing(None); + + let mut cors = CorsLayer::new(); + + if let Some(cors_origin) = cors_origin { + cors = cors + .allow_origin( + cors_origin + .origin() + .unicode_serialization() + .as_str() + .parse::()?, + ) + .allow_headers(Any) + .allow_methods(vec![ + Method::GET, + Method::POST, + Method::PATCH, + Method::PUT, + Method::DELETE, + ]); + } + + let ipfs_client = KuboClient::new(&ipfs_api)?; + + let (syndication_tx, syndication_task) = + start_ipfs_syndication::(ipfs_api.clone(), manager.clone()); + let (name_system_tx, name_system_task) = start_name_system::( + NameSystemConfiguration { + connection_type: NameSystemConnectionType::Remote(name_resolver_api), + ipfs_api, + }, + manager.clone(), + ); + let (cleanup_tx, cleanup_task) = start_cleanup::(manager.clone()); + + let app = Router::new() + .route( + &v0alpha1::Route::Did.to_string(), + get(handlers::v0alpha1::did_route), + ) + .route( + &v0alpha1::Route::Replicate(None).to_string(), + get(handlers::v0alpha1::replicate_route::), + ) + .route( + &v0alpha1::Route::Identify.to_string(), + get(handlers::v0alpha1::identify_route::), + ) + .route( + &v0alpha1::Route::Push.to_string(), + #[allow(deprecated)] + put(handlers::v0alpha1::push_route::), ) - .allow_headers(Any) - .allow_methods(vec![ - Method::GET, - Method::POST, - Method::PATCH, - Method::PUT, - Method::DELETE, - ]); + .route( + &v0alpha2::Route::Push.to_string(), + put(handlers::v0alpha2::push_route::), + ) + .route( + &v0alpha1::Route::Fetch.to_string(), + get(handlers::v0alpha1::fetch_route::), + ) + .layer(Extension(ipfs_client)) + .layer(Extension(syndication_tx)) + .layer(Extension(name_system_tx)) + .layer(Extension(cleanup_tx)) + .layer(DefaultBodyLimit::max(DEFAULT_BODY_LENGTH_LIMIT)) + .layer(cors) + .layer(TraceLayer::new_for_http()) + .with_state(Arc::new(manager)); + + Ok(Self { + app, + worker_handles: vec![syndication_task, name_system_task, cleanup_task], + }) } - let ipfs_client = KuboClient::new(&ipfs_api)?; - - let (syndication_tx, syndication_task) = - start_ipfs_syndication::(ipfs_api.clone(), vec![sphere_context.clone()]); - let (name_system_tx, name_system_task) = start_name_system::( - NameSystemConfiguration { - connection_type: NameSystemConnectionType::Remote(name_resolver_api), - ipfs_api, - }, - vec![sphere_context.clone()], - ); - let (cleanup_tx, cleanup_task) = start_cleanup::(sphere_context.clone()); - - let app = Router::new() - .route( - &v0alpha1::Route::Did.to_string(), - get(handlers::v0alpha1::did_route), - ) - .route( - &v0alpha1::Route::Replicate(None).to_string(), - get(handlers::v0alpha1::replicate_route::), - ) - .route( - &v0alpha1::Route::Identify.to_string(), - get(handlers::v0alpha1::identify_route::), - ) - .route( - &v0alpha1::Route::Push.to_string(), - #[allow(deprecated)] - put(handlers::v0alpha1::push_route::), - ) - .route( - &v0alpha2::Route::Push.to_string(), - put(handlers::v0alpha2::push_route::), - ) - .route( - &v0alpha1::Route::Fetch.to_string(), - get(handlers::v0alpha1::fetch_route::), - ) - .layer(Extension(sphere_context.clone())) - .layer(Extension(gateway_scope.clone())) - .layer(Extension(ipfs_client)) - .layer(Extension(gateway_key_did)) - .layer(Extension(syndication_tx)) - .layer(Extension(name_system_tx)) - .layer(Extension(cleanup_tx)) - .layer(DefaultBodyLimit::max(DEFAULT_BODY_LENGTH_LIMIT)) - .layer(cors) - .layer(TraceLayer::new_for_http()); - - info!( - r#"A geist is summoned to manage local sphere {} - -It has bound a gateway to {:?} -It awaits updates from sphere {}..."#, - gateway_scope.identity, - listener - .local_addr() - .expect("Unexpected missing listener address"), - gateway_scope.counterpart - ); - - Server::from_tcp(listener)? - .serve(app.into_make_service()) - .await?; - - syndication_task.abort(); - name_system_task.abort(); - cleanup_task.abort(); - - Ok(()) + /// Start the gateway server with `listener`, consuming the [Gateway] + /// object until the process terminates or has an unrecoverable error. + pub async fn start(self, listener: TcpListener) -> Result<()> { + Server::from_tcp(listener)? + .serve(self.app.into_make_service()) + .await?; + for handle in self.worker_handles { + handle.abort(); + } + Ok(()) + } } diff --git a/rust/noosphere-gateway/src/gateway_manager.rs b/rust/noosphere-gateway/src/gateway_manager.rs new file mode 100644 index 000000000..4912aa28b --- /dev/null +++ b/rust/noosphere-gateway/src/gateway_manager.rs @@ -0,0 +1,41 @@ +use anyhow::Result; +use async_trait::async_trait; +use axum::http::{request::Parts, StatusCode}; +use noosphere_core::{context::HasMutableSphereContext, data::Did}; +use noosphere_storage::Storage; +use std::pin::Pin; +use tokio_stream::Stream; + +#[cfg(doc)] +use noosphere_core::context::SphereContext; + +/// [Stream] of [SphereContext] from a [GatewayManager]. +pub type GatewayManagerSphereStream<'a, C> = dyn Stream> + Send + 'a; + +/// [GatewayManager] implementations are used to provide access to managed +/// hosted sphere data and customizations in a Noosphere gateway. +#[async_trait] +pub trait GatewayManager: Clone + Send + Sync +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + /// Retrieve a sphere context whose counterpart matches `counterpart`. + async fn get_sphere_context(&self, counterpart: &Did) -> Result; + + /// Retrieve the identity of the managed sphere's device key from provided `counterpart`, + /// as a lightweight alternative to fetching the entire sphere context + /// in [GatewayManager::get_sphere_context]. + async fn get_gateway_identity(&self, counterpart: &Did) -> Result; + + /// /!\ Iterate over all managed spheres. + /// /!\ This method is only for the embedded worker implementations in the gateway, + /// /!\ and not to be used in routes, and will be superceded via #720. + /// TODO(#720) + fn experimental_worker_only_iter(&self) -> Pin>>; + + /// Extract the specified counterpart identity from an [axum] request. + /// This function should be deterministic in order to take advantage + /// of caching. + async fn extract_counterpart(&self, parts: &mut Parts) -> Result; +} diff --git a/rust/noosphere-gateway/src/handlers/mod.rs b/rust/noosphere-gateway/src/handlers/mod.rs index 9f424705d..ce7b1b295 100644 --- a/rust/noosphere-gateway/src/handlers/mod.rs +++ b/rust/noosphere-gateway/src/handlers/mod.rs @@ -1,2 +1,7 @@ +//! Stateless [axum] handlers for processing gateway requests. + +#[cfg(doc)] +use axum; + pub mod v0alpha1; pub mod v0alpha2; diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/did.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/did.rs index 97f02cbfa..5b534c6a2 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/did.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/did.rs @@ -1,6 +1,12 @@ -use axum::{http::StatusCode, Extension}; -use noosphere_core::data::Did; +use crate::extractors::GatewayScope; +use axum::http::StatusCode; +use noosphere_core::context::HasMutableSphereContext; +use noosphere_storage::Storage; -pub async fn did_route(Extension(gateway_identity): Extension) -> Result { - Ok(gateway_identity.into()) +pub async fn did_route(scope: GatewayScope) -> Result +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + Ok(scope.gateway_identity.into()) } diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs index 37acb6572..5352e321a 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs @@ -7,8 +7,8 @@ use bytes::Bytes; use noosphere_core::{ api::v0alpha1::FetchParameters, authority::{generate_capability, SphereAbility}, - context::HasSphereContext, - data::{Link, MemoIpld}, + context::HasMutableSphereContext, + data::{Did, Link, MemoIpld}, stream::{memo_history_stream, to_car_stream}, view::Sphere, }; @@ -16,31 +16,40 @@ use noosphere_ipfs::{IpfsStore, KuboClient}; use noosphere_storage::{BlockStoreRetry, SphereDb, Storage}; use tokio_stream::{Stream, StreamExt}; -use crate::{authority::GatewayAuthority, GatewayScope}; +use crate::extractors::{GatewayAuthority, GatewayScope, SphereExtractor}; -#[instrument(level = "debug", skip(authority, scope, sphere_context, ipfs_client))] +#[instrument( + level = "debug", + skip(authority, sphere_extractor, gateway_scope, ipfs_client) +)] pub async fn fetch_route( - authority: GatewayAuthority, + authority: GatewayAuthority, + sphere_extractor: SphereExtractor, + gateway_scope: GatewayScope, Query(FetchParameters { since }): Query, - Extension(scope): Extension, Extension(ipfs_client): Extension, - Extension(sphere_context): Extension, ) -> Result>>, StatusCode> where - C: HasSphereContext, + C: HasMutableSphereContext, S: Storage + 'static, { - authority.try_authorize(&generate_capability( - &scope.counterpart, - SphereAbility::Fetch, - ))?; - let sphere_context = sphere_context.sphere_context().await.map_err(|err| { + let mut gateway_sphere = sphere_extractor.into_inner(); + let counterpart = &gateway_scope.counterpart; + authority + .try_authorize( + &mut gateway_sphere, + counterpart, + &generate_capability(counterpart.as_str(), SphereAbility::Fetch), + ) + .await?; + + let sphere_context = gateway_sphere.sphere_context().await.map_err(|err| { error!("{err}"); StatusCode::INTERNAL_SERVER_ERROR })?; let db = sphere_context.db(); - - let stream = generate_fetch_stream(&scope, since.as_ref(), db, ipfs_client) + let identity = sphere_context.identity(); + let stream = generate_fetch_stream(counterpart, identity, since.as_ref(), db, ipfs_client) .await .map_err(|err| { error!("{err}"); @@ -53,7 +62,8 @@ where /// Generates a CAR stream that can be used as a the streaming body of a /// gateway fetch route response pub async fn generate_fetch_stream( - scope: &GatewayScope, + counterpart: &Did, + identity: &Did, since: Option<&Link>, db: &SphereDb, ipfs_client: KuboClient, @@ -61,7 +71,7 @@ pub async fn generate_fetch_stream( where S: Storage + 'static, { - let latest_local_sphere_cid = db.require_version(&scope.identity).await?.into(); + let latest_local_sphere_cid = db.require_version(identity).await?.into(); debug!("The latest gateway sphere version is {latest_local_sphere_cid}..."); @@ -93,7 +103,7 @@ where match latest_local_sphere .get_content() .await? - .get(&scope.counterpart) + .get(counterpart) .await? { Some(latest_counterpart_sphere_cid) => { @@ -103,7 +113,7 @@ where Some(since_local_sphere_cid) => { let since_local_sphere = Sphere::at(since_local_sphere_cid, db); let links = since_local_sphere.get_content().await?; - links.get(&scope.counterpart).await?.cloned() + links.get(counterpart).await?.cloned() } None => None, }; @@ -128,7 +138,7 @@ where ))); } None => { - warn!("No revisions found for counterpart {}!", scope.counterpart); + warn!("No revisions found for counterpart {}!", counterpart); Ok(Box::pin(to_car_stream( vec![latest_local_sphere_cid.into()], stream, diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/identify.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/identify.rs index 818f3a15b..c824ffaac 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/identify.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/identify.rs @@ -1,31 +1,36 @@ -use crate::{authority::GatewayAuthority, GatewayScope}; -use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; +use crate::extractors::{GatewayAuthority, GatewayScope, SphereExtractor}; +use axum::{http::StatusCode, response::IntoResponse, Json}; use noosphere_core::api::v0alpha1::IdentifyResponse; use noosphere_core::authority::{generate_capability, SphereAbility}; -use noosphere_core::context::HasSphereContext; +use noosphere_core::context::HasMutableSphereContext; use noosphere_storage::Storage; pub async fn identify_route( - Extension(scope): Extension, - Extension(sphere_context): Extension, - authority: GatewayAuthority, + authority: GatewayAuthority, + sphere_extractor: SphereExtractor, + gateway_scope: GatewayScope, ) -> Result where - C: HasSphereContext, + C: HasMutableSphereContext, S: Storage + 'static, { debug!("Invoking identify route..."); + let mut gateway_sphere = sphere_extractor.into_inner(); + let counterpart = &gateway_scope.counterpart; + authority + .try_authorize( + &mut gateway_sphere, + counterpart, + &generate_capability(counterpart.as_str(), SphereAbility::Fetch), + ) + .await?; - authority.try_authorize(&generate_capability( - &scope.counterpart, - SphereAbility::Fetch, - ))?; - - let sphere_context = sphere_context + let sphere_context = gateway_sphere .sphere_context() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let db = sphere_context.db(); + let identity = sphere_context.identity(); let gateway_key = &sphere_context.author().key; let gateway_authorization = sphere_context @@ -42,7 +47,7 @@ where })?; Ok(Json( - IdentifyResponse::sign(&scope.identity, gateway_key, &ucan) + IdentifyResponse::sign(identity, gateway_key, &ucan) .await .map_err(|error| { error!("{:?}", error); diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/mod.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/mod.rs index 13b47ccc2..15ae48999 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/mod.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/mod.rs @@ -1,3 +1,8 @@ +//! v0alpha1 stateless [axum] handlers. + +#[cfg(doc)] +use axum; + mod did; mod fetch; mod identify; diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs index 3ebd6f93f..f612a330f 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs @@ -1,9 +1,9 @@ -use std::{collections::BTreeSet, marker::PhantomData}; - +use crate::{ + extractors::{Cbor, GatewayAuthority, GatewayScope, SphereExtractor}, + worker::{NameSystemJob, SyndicationJob}, +}; use anyhow::Result; - use axum::{http::StatusCode, Extension}; - use noosphere_core::api::v0alpha1::{PushBody, PushError, PushResponse}; use noosphere_core::context::{HasMutableSphereContext, SphereContentWrite, SphereCursor}; use noosphere_core::{ @@ -12,33 +12,27 @@ use noosphere_core::{ view::Sphere, }; use noosphere_storage::Storage; +use std::collections::BTreeSet; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::StreamExt; -use crate::{ - authority::GatewayAuthority, - extractor::Cbor, - worker::{NameSystemJob, SyndicationJob}, - GatewayScope, -}; - // #[debug_handler] #[deprecated(since = "0.8.1", note = "Please migrate to v0alpha2")] #[instrument( level = "debug", skip( authority, + sphere_extractor, gateway_scope, - sphere_context, syndication_tx, name_system_tx, request_body ) )] pub async fn push_route( - authority: GatewayAuthority, - Extension(sphere_context): Extension, - Extension(gateway_scope): Extension, + authority: GatewayAuthority, + sphere_extractor: SphereExtractor, + gateway_scope: GatewayScope, Extension(syndication_tx): Extension>>, Extension(name_system_tx): Extension>>, Cbor(request_body): Cbor, @@ -50,23 +44,26 @@ where debug!("Invoking push route..."); let sphere_identity = &request_body.sphere; - - if sphere_identity != &gateway_scope.counterpart { + let counterpart = &gateway_scope.counterpart; + if sphere_identity != counterpart { return Err(StatusCode::FORBIDDEN); } - authority.try_authorize(&generate_capability( - &gateway_scope.counterpart, - SphereAbility::Push, - ))?; + let mut gateway_sphere = sphere_extractor.into_inner(); + authority + .try_authorize( + &mut gateway_sphere, + counterpart, + &generate_capability(counterpart.as_str(), SphereAbility::Push), + ) + .await?; let gateway_push_routine = GatewayPushRoutine { - sphere_context, + gateway_sphere, gateway_scope, syndication_tx, name_system_tx, request_body, - storage_type: PhantomData, }; Ok(Cbor(gateway_push_routine.invoke().await?)) @@ -77,12 +74,11 @@ where C: HasMutableSphereContext, S: Storage + 'static, { - sphere_context: C, - gateway_scope: GatewayScope, + gateway_sphere: C, + gateway_scope: GatewayScope, syndication_tx: UnboundedSender>, name_system_tx: UnboundedSender>, request_body: PushBody, - storage_type: PhantomData, } impl GatewayPushRoutine @@ -117,7 +113,8 @@ where async fn verify_history(&self) -> Result<(), PushError> { debug!("Verifying pushed sphere history..."); - let gateway_sphere_tip = self.sphere_context.version().await?; + let gateway_sphere_context = self.gateway_sphere.sphere_context().await?; + let gateway_sphere_tip = gateway_sphere_context.version().await?; if Some(&gateway_sphere_tip) != self.request_body.counterpart_tip.as_ref() { warn!( "Gateway sphere conflict; we have {gateway_sphere_tip}, they have {:?}", @@ -127,7 +124,6 @@ where } let sphere_identity = &self.request_body.sphere; - let gateway_sphere_context = self.sphere_context.sphere_context().await?; let db = gateway_sphere_context.db(); let local_sphere_base_cid = db.get_version(sphere_identity).await?.map(|cid| cid.into()); @@ -172,9 +168,10 @@ where /// revision in the history as we go. Then, update our local pointer to the /// tip of the pushed history. async fn incorporate_history(&mut self) -> Result<(), PushError> { + let counterpart = self.gateway_scope.counterpart.to_owned(); { debug!("Merging pushed sphere history..."); - let mut sphere_context = self.sphere_context.sphere_context_mut().await?; + let mut sphere_context = self.gateway_sphere.sphere_context_mut().await?; self.request_body .blocks @@ -199,22 +196,16 @@ where sphere.hydrate().await?; } - debug!( - "Setting {} tip to {}...", - self.gateway_scope.counterpart, tip - ); + debug!("Setting {} tip to {}...", counterpart, tip); sphere_context .db_mut() - .set_version(&self.gateway_scope.counterpart, tip) + .set_version(&counterpart, tip) .await?; } - self.sphere_context - .link_raw( - &self.gateway_scope.counterpart, - &self.request_body.local_tip, - ) + self.gateway_sphere + .link_raw(&counterpart, &self.request_body.local_tip) .await?; Ok(()) @@ -223,7 +214,7 @@ where async fn synchronize_names(&mut self) -> Result<(), PushError> { debug!("Synchronizing name changes to local sphere..."); - let my_sphere = self.sphere_context.to_sphere().await?; + let my_sphere = self.gateway_sphere.to_sphere().await?; let my_names = my_sphere.get_address_book().await?.get_identities().await?; let sphere = Sphere::at(&self.request_body.local_tip, my_sphere.store()); @@ -262,7 +253,7 @@ where // on the client due to a previous sync if my_value != Some(&value) { debug!("Adding name '{}' ({})...", key, value.did); - self.sphere_context + self.gateway_sphere .sphere_context_mut() .await? .mutation_mut() @@ -281,7 +272,7 @@ where } debug!("Removing name '{}'...", key); - self.sphere_context + self.gateway_sphere .sphere_context_mut() .await? .mutation_mut() @@ -306,13 +297,13 @@ where // NOTE CDATA: "Previous version" doesn't cover all cases; this needs to be a version given // in the push body, or else we don't know how far back we actually have to go (e.g., the name // system may have created a new version in the mean time. - let previous_version = self.sphere_context.version().await?; - let next_version = SphereCursor::latest(self.sphere_context.clone()) + let previous_version = self.gateway_sphere.version().await?; + let next_version = SphereCursor::latest(self.gateway_sphere.clone()) .save(None) .await?; let blocks = self - .sphere_context + .gateway_sphere .to_sphere() .await? .bundle_until_ancestor(Some(&previous_version)) @@ -325,7 +316,7 @@ where async fn notify_name_resolver(&self) -> Result<()> { if let Some(name_record) = &self.request_body.name_record { if let Err(error) = self.name_system_tx.send(NameSystemJob::Publish { - context: self.sphere_context.clone(), + context: self.gateway_sphere.clone(), record: LinkRecord::try_from(name_record)?, republish: false, }) { @@ -334,7 +325,7 @@ where } if let Err(error) = self.name_system_tx.send(NameSystemJob::ResolveSince { - context: self.sphere_context.clone(), + context: self.gateway_sphere.clone(), since: self.request_body.local_base, }) { warn!("Failed to request name system resolutions: {}", error); @@ -350,7 +341,7 @@ where // have added it to the gateway. if let Err(error) = self.syndication_tx.send(SyndicationJob { revision: next_version, - context: self.sphere_context.clone(), + context: self.gateway_sphere.clone(), }) { warn!("Failed to queue IPFS syndication job: {}", error); }; diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs index 4d1202894..1b3ba55e7 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs @@ -21,7 +21,7 @@ use noosphere_ipfs::{IpfsStore, KuboClient}; use noosphere_storage::{BlockStore, BlockStoreRetry, Storage}; use tokio_stream::Stream; -use crate::{authority::GatewayAuthority, GatewayScope}; +use crate::extractors::{GatewayAuthority, GatewayScope, SphereExtractor}; pub type ReplicationCarStreamBody = StreamBody> + Send>>>; @@ -38,32 +38,37 @@ pub type ReplicationCarStreamBody = /// version. /// /// If `include_content` is `true`, the `since` parameter will be ignored. -#[instrument(level = "debug", skip(authority, scope, sphere_context,))] +#[instrument(level = "debug", skip(authority, sphere_extractor, gateway_scope))] pub async fn replicate_route( - authority: GatewayAuthority, + authority: GatewayAuthority, + sphere_extractor: SphereExtractor, + gateway_scope: GatewayScope, // NOTE: Cannot go from string to CID via serde Path(link_or_did): Path, Query(ReplicateParameters { since, include_content, }): Query, - Extension(scope): Extension, Extension(ipfs_client): Extension, - Extension(sphere_context): Extension, ) -> Result where - C: HasMutableSphereContext + 'static, + C: HasMutableSphereContext, S: Storage + 'static, { - authority.try_authorize(&generate_capability( - &scope.counterpart, - SphereAbility::Fetch, - ))?; + let mut gateway_sphere = sphere_extractor.into_inner(); + let counterpart = &gateway_scope.counterpart; + authority + .try_authorize( + &mut gateway_sphere, + counterpart, + &generate_capability(counterpart.as_str(), SphereAbility::Fetch), + ) + .await?; debug!("Invoking replicate route..."); let memo_version = if link_or_did.starts_with("did:") { - sphere_context + gateway_sphere .sphere_context() .await .map_err(|error| { @@ -84,7 +89,7 @@ where })? }; - let db = sphere_context + let db = gateway_sphere .sphere_context() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? diff --git a/rust/noosphere-gateway/src/handlers/v0alpha2/mod.rs b/rust/noosphere-gateway/src/handlers/v0alpha2/mod.rs index 621b20604..5d79540dc 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha2/mod.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha2/mod.rs @@ -1,3 +1,8 @@ +//! v0alpha2 stateless [axum] handlers. + +#[cfg(doc)] +use axum; + mod push; pub use push::*; diff --git a/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs b/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs index eaa9c027f..0cd80a12a 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeSet, marker::PhantomData}; +use std::collections::BTreeSet; use anyhow::Result; @@ -22,11 +22,11 @@ use noosphere_storage::{block_deserialize, block_serialize, Storage}; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::{Stream, StreamExt}; +use crate::extractors::{GatewayScope, SphereExtractor}; use crate::{ - authority::GatewayAuthority, error::GatewayErrorResponse, + extractors::GatewayAuthority, worker::{NameSystemJob, SyndicationJob}, - GatewayScope, }; // #[debug_handler] @@ -34,39 +34,43 @@ use crate::{ level = "debug", skip( authority, + sphere_extractor, gateway_scope, - sphere_context, syndication_tx, name_system_tx, stream ) )] pub async fn push_route( - authority: GatewayAuthority, - Extension(sphere_context): Extension, - Extension(gateway_scope): Extension, + authority: GatewayAuthority, + sphere_extractor: SphereExtractor, + gateway_scope: GatewayScope, Extension(syndication_tx): Extension>>, Extension(name_system_tx): Extension>>, stream: BodyStream, ) -> Result>>, GatewayErrorResponse> where for<'a> C: HasMutableSphereContext + 'a, - S: Storage + 'static, + for<'a> S: Storage + 'a, { debug!("Invoking push route..."); - authority.try_authorize(&generate_capability( - &gateway_scope.counterpart, - SphereAbility::Push, - ))?; + let mut gateway_sphere = sphere_extractor.into_inner(); + let counterpart = &gateway_scope.counterpart; + authority + .try_authorize( + &mut gateway_sphere, + counterpart, + &generate_capability(counterpart.as_str(), SphereAbility::Push), + ) + .await?; let gateway_push_routine = GatewayPushRoutine { - sphere_context, + gateway_sphere, gateway_scope, syndication_tx, name_system_tx, block_stream: Box::pin(from_car_stream(stream)), - storage_type: PhantomData, }; Ok(StreamBody::new(gateway_push_routine.invoke().await?)) @@ -74,16 +78,15 @@ where pub struct GatewayPushRoutine where - C: HasMutableSphereContext + 'static, + C: HasMutableSphereContext, S: Storage + 'static, St: Stream)>> + Unpin + 'static, { - sphere_context: C, - gateway_scope: GatewayScope, + gateway_sphere: C, + gateway_scope: GatewayScope, syndication_tx: UnboundedSender>, name_system_tx: UnboundedSender>, block_stream: St, - storage_type: PhantomData, } impl GatewayPushRoutine @@ -149,7 +152,8 @@ where return Err(PushError::UnexpectedBody); }; - let gateway_sphere_tip = self.sphere_context.version().await?; + let gateway_sphere_context = self.gateway_sphere.sphere_context().await?; + let gateway_sphere_tip = gateway_sphere_context.version().await?; if Some(&gateway_sphere_tip) != push_body.counterpart_tip.as_ref() { warn!( "Gateway sphere conflict; we have {gateway_sphere_tip}, they have {:?}", @@ -159,7 +163,6 @@ where } let sphere_identity = &push_body.sphere; - let gateway_sphere_context = self.sphere_context.sphere_context().await?; let db = gateway_sphere_context.db(); let local_sphere_base_cid = db.get_version(sphere_identity).await?.map(|cid| cid.into()); @@ -204,9 +207,10 @@ where /// revision in the history as we go. Then, update our local pointer to the /// tip of the pushed history. async fn incorporate_history(&mut self, push_body: &PushBody) -> Result<(), PushError> { + let counterpart = self.gateway_scope.counterpart.to_owned(); { debug!("Merging pushed sphere history..."); - let mut sphere_context = self.sphere_context.sphere_context_mut().await?; + let mut sphere_context = self.gateway_sphere.sphere_context_mut().await?; put_block_stream(sphere_context.db_mut().clone(), &mut self.block_stream).await?; @@ -228,19 +232,16 @@ where sphere.hydrate().await?; } - debug!( - "Setting {} tip to {}...", - self.gateway_scope.counterpart, tip - ); + debug!("Setting {} tip to {}...", counterpart, tip); sphere_context .db_mut() - .set_version(&self.gateway_scope.counterpart, tip) + .set_version(&counterpart, tip) .await?; } - self.sphere_context - .link_raw(&self.gateway_scope.counterpart, &push_body.local_tip) + self.gateway_sphere + .link_raw(&counterpart, &push_body.local_tip) .await?; Ok(()) @@ -249,7 +250,7 @@ where async fn synchronize_names(&mut self, push_body: &PushBody) -> Result<(), PushError> { debug!("Synchronizing name changes to local sphere..."); - let my_sphere = self.sphere_context.to_sphere().await?; + let my_sphere = self.gateway_sphere.to_sphere().await?; let my_names = my_sphere.get_address_book().await?.get_identities().await?; let sphere = Sphere::at(&push_body.local_tip, my_sphere.store()); @@ -288,7 +289,7 @@ where // on the client due to a previous sync if my_value != Some(&value) { debug!("Adding name '{}' ({})...", key, value.did); - self.sphere_context + self.gateway_sphere .sphere_context_mut() .await? .mutation_mut() @@ -307,7 +308,7 @@ where } debug!("Removing name '{}'...", key); - self.sphere_context + self.gateway_sphere .sphere_context_mut() .await? .mutation_mut() @@ -333,11 +334,11 @@ where debug!("Updating the gateway's sphere..."); let previous_version = push_body.counterpart_tip.as_ref(); - let next_version = SphereCursor::latest(self.sphere_context.clone()) + let next_version = SphereCursor::latest(self.gateway_sphere.clone()) .save(None) .await?; - let db = self.sphere_context.sphere_context().await?.db().clone(); + let db = self.gateway_sphere.sphere_context().await?.db().clone(); let block_stream = memo_history_stream(db, &next_version, previous_version, false); Ok((next_version, block_stream)) @@ -348,7 +349,7 @@ where debug!("Notifying name system of new link record..."); if let Some(name_record) = &push_body.name_record { if let Err(error) = self.name_system_tx.send(NameSystemJob::Publish { - context: self.sphere_context.clone(), + context: self.gateway_sphere.clone(), record: LinkRecord::try_from(name_record)?, republish: false, }) { @@ -357,7 +358,7 @@ where } if let Err(error) = self.name_system_tx.send(NameSystemJob::ResolveAll { - context: self.sphere_context.clone(), + context: self.gateway_sphere.clone(), }) { warn!("Failed to request name system resolutions: {}", error); }; @@ -373,7 +374,7 @@ where // have added it to the gateway. if let Err(error) = self.syndication_tx.send(SyndicationJob { revision: next_version, - context: self.sphere_context.clone(), + context: self.gateway_sphere.clone(), }) { warn!("Failed to queue IPFS syndication job: {}", error); }; diff --git a/rust/noosphere-gateway/src/lib.rs b/rust/noosphere-gateway/src/lib.rs index 528470c9a..79a1863f4 100644 --- a/rust/noosphere-gateway/src/lib.rs +++ b/rust/noosphere-gateway/src/lib.rs @@ -8,12 +8,15 @@ #[macro_use] extern crate tracing; -mod authority; mod error; -mod extractor; +mod extractors; mod gateway; +mod gateway_manager; mod handlers; +mod single_tenant; mod try_or_reset; mod worker; pub use gateway::*; +pub use gateway_manager::*; +pub use single_tenant::*; diff --git a/rust/noosphere-gateway/src/single_tenant.rs b/rust/noosphere-gateway/src/single_tenant.rs new file mode 100644 index 000000000..f4ec84adc --- /dev/null +++ b/rust/noosphere-gateway/src/single_tenant.rs @@ -0,0 +1,79 @@ +use crate::{GatewayManager, GatewayManagerSphereStream}; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use axum::http::{request::Parts, StatusCode}; +use noosphere_core::context::HasMutableSphereContext; +use noosphere_core::data::Did; +use noosphere_storage::Storage; +use std::pin::Pin; +use tokio_stream::once; + +/// Implements [GatewayManager] for a single sphere context, used in the single-tenant +/// gateway workflow in `orb`. +/// +/// Scoping a request to a specific sphere can be handled in different ways, such as +/// subdomain or HTTP header. As a single-tenant gateway only hosts a single sphere, +/// and configuring the server may be unnecessary overhead for an independent operator, +/// the [SingleTenantGatewayManager] counterpart extraction always returns the `counterpart` +/// provided to the constructor. +#[derive(Clone)] +pub struct SingleTenantGatewayManager +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + context: C, + identity: Did, + counterpart: Did, + marker: std::marker::PhantomData, +} + +impl SingleTenantGatewayManager +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + /// Create a new [SingleTenantGatewayManager], implementing [GatewayManager] for a single sphere `context`. + pub async fn new(context: C, counterpart: Did) -> Result { + let identity = context.sphere_context().await?.author().did().await?; + Ok(SingleTenantGatewayManager { + identity, + context, + counterpart, + marker: std::marker::PhantomData, + }) + } +} + +#[async_trait] +impl GatewayManager for SingleTenantGatewayManager +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + async fn get_sphere_context(&self, counterpart: &Did) -> Result { + match &self.counterpart == counterpart { + true => Ok(self.context.clone()), + false => Err(anyhow!( + "No sphere context found for counterpart: {counterpart}." + )), + } + } + + async fn get_gateway_identity(&self, counterpart: &Did) -> Result { + match &self.counterpart == counterpart { + true => Ok(self.identity.clone()), + false => Err(anyhow!( + "No sphere identity found for counterpart: {counterpart}." + )), + } + } + + fn experimental_worker_only_iter(&self) -> Pin>> { + Box::pin(once(Ok(self.context.clone()))) + } + + async fn extract_counterpart(&self, _: &mut Parts) -> Result { + Ok(self.counterpart.clone()) + } +} diff --git a/rust/noosphere-gateway/src/worker/cleanup.rs b/rust/noosphere-gateway/src/worker/cleanup.rs index 94c29f57b..ec840467f 100644 --- a/rust/noosphere-gateway/src/worker/cleanup.rs +++ b/rust/noosphere-gateway/src/worker/cleanup.rs @@ -1,11 +1,11 @@ -use std::time::Duration; - +use crate::GatewayManager; use anyhow::{anyhow, Result}; use noosphere_core::{ context::{HasMutableSphereContext, HasSphereContext, SphereCursor, COUNTERPART}, data::Did, }; use noosphere_storage::{KeyValueStore, Storage}; +use std::time::Duration; use strum_macros::Display as EnumDisplay; use tokio::{ sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, @@ -13,7 +13,8 @@ use tokio::{ }; use tokio_stream::StreamExt; -// Once an hour +/// Seconds between finishing all compaction tasks, and +/// starting a new cycle. const PERIODIC_CLEANUP_INTERVAL_SECONDS: u64 = 60 * 60; #[derive(EnumDisplay)] @@ -21,10 +22,11 @@ pub enum CleanupJob { CompactHistory(C), } -pub fn start_cleanup( - gateway_sphere: C, +pub fn start_cleanup( + gateway_manager: M, ) -> (UnboundedSender>, JoinHandle>) where + M: GatewayManager + 'static, C: HasMutableSphereContext + 'static, S: Storage + 'static, { @@ -34,7 +36,7 @@ where tokio::task::spawn(async move { let _ = tokio::join!( cleanup_task(rx), - periodic_compaction_task(tx, gateway_sphere), + periodic_compaction_task(tx, gateway_manager), ); Ok(()) }) @@ -57,16 +59,27 @@ where Ok(()) } -async fn periodic_compaction_task(tx: UnboundedSender>, gateway_sphere: C) +async fn periodic_compaction_task(tx: UnboundedSender>, gateway_manager: M) where + M: GatewayManager, C: HasMutableSphereContext, S: Storage + 'static, { loop { - if let Err(error) = tx.send(CleanupJob::CompactHistory(gateway_sphere.clone())) { - error!("Periodic compaction failed: {}", error); + let mut stream = gateway_manager.experimental_worker_only_iter(); + loop { + match stream.try_next().await { + Ok(Some(local_sphere)) => { + if let Err(error) = tx.send(CleanupJob::CompactHistory(local_sphere)) { + error!("Periodic compaction failed: {}", error); + } + } + Ok(None) => break, + Err(error) => { + error!("Could not iterate on managed spheres: {}", error); + } + } } - tokio::time::sleep(Duration::from_secs(PERIODIC_CLEANUP_INTERVAL_SECONDS)).await; } } @@ -188,7 +201,10 @@ mod tests { }; use noosphere_storage::KeyValueStore; - use crate::worker::{start_cleanup, CleanupJob}; + use crate::{ + worker::{start_cleanup, CleanupJob}, + SingleTenantGatewayManager, + }; #[tokio::test] async fn it_compacts_excess_name_record_changes_in_a_gateway_sphere() -> Result<()> { @@ -228,7 +244,12 @@ mod tests { .collect::>() ); - let (tx, cleanup_worker) = start_cleanup(gateway_sphere_context.clone()); + let manager = SingleTenantGatewayManager::new( + gateway_sphere_context.clone(), + user_sphere_identity.clone(), + ) + .await?; + let (tx, cleanup_worker) = start_cleanup(manager); wait(1).await; diff --git a/rust/noosphere-gateway/src/worker/mod.rs b/rust/noosphere-gateway/src/worker/mod.rs index 07aab2f0e..ad0f365ae 100644 --- a/rust/noosphere-gateway/src/worker/mod.rs +++ b/rust/noosphere-gateway/src/worker/mod.rs @@ -1,3 +1,6 @@ +//! Task runners that process jobs in a thread, communicating via +//! message channels. + mod cleanup; mod name_system; mod syndication; diff --git a/rust/noosphere-gateway/src/worker/name_system.rs b/rust/noosphere-gateway/src/worker/name_system.rs index 8ef135ae6..ce0583889 100644 --- a/rust/noosphere-gateway/src/worker/name_system.rs +++ b/rust/noosphere-gateway/src/worker/name_system.rs @@ -1,4 +1,5 @@ use crate::try_or_reset::TryOrReset; +use crate::GatewayManager; use anyhow::anyhow; use anyhow::Result; use noosphere_core::{ @@ -31,9 +32,11 @@ use tokio::{ use tokio_stream::{Stream, StreamExt}; use url::Url; +/// How many seconds between publishing all managed sphere +/// records, and the next cycle. const PERIODIC_PUBLISH_INTERVAL_SECONDS: u64 = 5 * 60; -/// How many seconds between queueing up an address -/// to resolve from the name system. +/// How many seconds between resolving managed spheres' +/// address books, and the next cycle. const PERIODIC_RESOLVER_INTERVAL_SECONDS: u64 = 60; pub struct NameSystemConfiguration { @@ -93,11 +96,12 @@ pub enum NameSystemJob { }, } -pub fn start_name_system( +pub fn start_name_system( configuration: NameSystemConfiguration, - local_spheres: Vec, + gateway_manager: M, ) -> (UnboundedSender>, JoinHandle>) where + M: GatewayManager + 'static, C: HasMutableSphereContext + 'static, S: Storage + 'static, { @@ -108,9 +112,9 @@ where tokio::task::spawn(async move { let _ = tokio::join!( - periodic_publisher_task(tx.clone(), local_spheres.clone()), + periodic_publisher_task(tx.clone(), gateway_manager.clone()), name_system_task(configuration, rx), - periodic_resolver_task(tx, local_spheres) + periodic_resolver_task(tx, gateway_manager) ); Ok(()) }) @@ -122,16 +126,28 @@ where /// Run once on gateway start and every PERIODIC_PUBLISH_INTERVAL_SECONDS, /// republish all stored link records in gateway spheres that map to /// counterpart managed spheres. -async fn periodic_publisher_task(tx: UnboundedSender>, local_spheres: Vec) +async fn periodic_publisher_task(tx: UnboundedSender>, gateway_manager: M) where + M: GatewayManager, C: HasMutableSphereContext, S: Storage + 'static, { loop { - for local_sphere in local_spheres.iter() { - if let Err(error) = periodic_publish_record(&tx, local_sphere).await { - error!("Periodic re-publish of link record failed: {}", error); - }; + let mut stream = gateway_manager.experimental_worker_only_iter(); + loop { + match stream.try_next().await { + Ok(Some(local_sphere)) => { + if let Err(error) = periodic_publish_record(&tx, &local_sphere).await { + error!("Periodic re-publish of link record failed: {}", error); + }; + } + Ok(None) => { + break; + } + Err(error) => { + error!("Could not iterate on managed spheres: {}", error); + } + } } tokio::time::sleep(Duration::from_secs(PERIODIC_PUBLISH_INTERVAL_SECONDS)).await; } @@ -163,21 +179,32 @@ where Ok(()) } -async fn periodic_resolver_task(tx: UnboundedSender>, local_spheres: Vec) +async fn periodic_resolver_task(tx: UnboundedSender>, gateway_manager: M) where + M: GatewayManager, C: HasMutableSphereContext, S: Storage + 'static, { - for sphere in local_spheres.iter().cycle() { - match tx.send(NameSystemJob::ResolveAll { - context: sphere.clone(), - }) { - Ok(_) => (), - Err(error) => { - warn!("Failed to request updated name resolutions: {}", error); + loop { + let mut stream = gateway_manager.experimental_worker_only_iter(); + loop { + match stream.try_next().await { + Ok(Some(local_sphere)) => match tx.send(NameSystemJob::ResolveAll { + context: local_sphere, + }) { + Ok(_) => (), + Err(error) => { + warn!("Failed to request updated name resolutions: {}", error); + } + }, + Ok(None) => { + break; + } + Err(error) => { + error!("Could not iterate on managed spheres: {}", error); + } } } - tokio::time::sleep(Duration::from_secs(PERIODIC_RESOLVER_INTERVAL_SECONDS)).await; } } diff --git a/rust/noosphere-gateway/src/worker/syndication.rs b/rust/noosphere-gateway/src/worker/syndication.rs index bb9739bec..5210380b0 100644 --- a/rust/noosphere-gateway/src/worker/syndication.rs +++ b/rust/noosphere-gateway/src/worker/syndication.rs @@ -1,6 +1,4 @@ -use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use std::{io::Cursor, sync::Arc}; - +use crate::GatewayManager; use anyhow::Result; use cid::Cid; use libipld_cbor::DagCborCodec; @@ -17,12 +15,15 @@ use noosphere_core::{ use noosphere_ipfs::{IpfsClient, KuboClient}; use noosphere_storage::{block_deserialize, block_serialize, KeyValueStore, Storage}; use serde::{Deserialize, Serialize}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{io::Cursor, sync::Arc}; use tokio::sync::Mutex; use tokio::{ io::AsyncReadExt, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, task::JoinHandle, }; +use tokio_stream::StreamExt; use tokio_util::io::StreamReader; use url::Url; @@ -77,11 +78,12 @@ const PERIODIC_SYNDICATION_INTERVAL_SECONDS: Duration = Duration::from_secs(5 * /// Start a Tokio task that waits for [SyndicationJob] messages and then /// attempts to syndicate to the configured IPFS RPC. Currently only Kubo IPFS /// backends are supported. -pub fn start_ipfs_syndication( +pub fn start_ipfs_syndication( ipfs_api: Url, - local_spheres: Vec, + gateway_manager: M, ) -> (UnboundedSender>, JoinHandle>) where + M: GatewayManager + 'static, C: HasMutableSphereContext + 'static, S: Storage + 'static, { @@ -91,7 +93,7 @@ where let tx = tx.clone(); tokio::task::spawn(async move { let (_, syndication_result) = tokio::join!( - periodic_syndication_task(tx, local_spheres), + periodic_syndication_task(tx, gateway_manager), ipfs_syndication_task(ipfs_api, rx) ); syndication_result?; @@ -102,18 +104,30 @@ where (tx, task) } -async fn periodic_syndication_task( +async fn periodic_syndication_task( tx: UnboundedSender>, - local_spheres: Vec, + gateway_manager: M, ) where + M: GatewayManager + 'static, C: HasMutableSphereContext, S: Storage + 'static, { loop { - for local_sphere in &local_spheres { - if let Err(error) = periodic_syndication(&tx, local_sphere).await { - error!("Periodic syndication of sphere history failed: {}", error); - }; + let mut stream = gateway_manager.experimental_worker_only_iter(); + loop { + match stream.try_next().await { + Ok(Some(local_sphere)) => { + if let Err(error) = periodic_syndication(&tx, local_sphere).await { + error!("Periodic syndication of sphere history failed: {}", error); + }; + } + Ok(None) => { + break; + } + Err(error) => { + error!("Could not iterate on managed spheres: {}", error); + } + } tokio::time::sleep(Duration::from_secs(5)).await; } tokio::time::sleep(PERIODIC_SYNDICATION_INTERVAL_SECONDS).await; @@ -122,7 +136,7 @@ async fn periodic_syndication_task( async fn periodic_syndication( tx: &UnboundedSender>, - local_sphere: &C, + local_sphere: C, ) -> Result<()> where C: HasMutableSphereContext, @@ -316,7 +330,10 @@ mod tests { use tokio::select; use url::Url; - use crate::worker::{start_ipfs_syndication, SyndicationCheckpoint, SyndicationJob}; + use crate::{ + worker::{start_ipfs_syndication, SyndicationCheckpoint, SyndicationJob}, + SingleTenantGatewayManager, + }; #[tokio::test(flavor = "multi_thread")] async fn it_syndicates_a_sphere_revision_to_kubo() -> Result<()> { @@ -342,9 +359,14 @@ mod tests { let ipfs_url = Url::parse("http://127.0.0.1:5001")?; let local_kubo_client = KuboClient::new(&ipfs_url.clone())?; + let manager = SingleTenantGatewayManager::new( + gateway_sphere_context.clone(), + user_sphere_identity.clone(), + ) + .await?; let (syndication_tx, _syndication_join_handle) = - start_ipfs_syndication::<_, _>(ipfs_url, vec![user_sphere_context.clone()]); + start_ipfs_syndication::<_, _, _>(ipfs_url, manager); user_sphere_context .write("foo", &ContentType::Text, b"bar".as_ref(), None) diff --git a/rust/noosphere-storage/Cargo.toml b/rust/noosphere-storage/Cargo.toml index bd6ff87bf..bb50986a7 100644 --- a/rust/noosphere-storage/Cargo.toml +++ b/rust/noosphere-storage/Cargo.toml @@ -29,7 +29,7 @@ ucan = { workspace = true } libipld-core = { workspace = true } libipld-cbor = { workspace = true } serde = { workspace = true } -base64 = "=0.21.4" +base64 = { workspace = true } url = { version = "^2" } [dev-dependencies] diff --git a/rust/noosphere/tests/client_to_gateway.rs b/rust/noosphere/tests/client_to_gateway.rs index 86f0c4354..d8e425a88 100644 --- a/rust/noosphere/tests/client_to_gateway.rs +++ b/rust/noosphere/tests/client_to_gateway.rs @@ -11,31 +11,28 @@ extern crate noosphere_gateway_dev as noosphere_gateway; use anyhow::{anyhow, Result}; use noosphere::key::KeyStorage; use noosphere::sphere::SphereContextBuilder; - +use noosphere_cli::{ + cli::ConfigSetCommand, + commands::{ + key::key_create, + sphere::{config_set, sphere_create, sphere_join}, + }, + helpers::{temporary_workspace, SpherePair}, +}; +use noosphere_core::api::v0alpha1; use noosphere_core::context::{ HasMutableSphereContext, HasSphereContext, SphereAuthorityWrite, SphereContentRead, SphereContentWrite, SphereCursor, SphereSync, }; +use noosphere_core::data::{ContentType, Did}; +use noosphere_core::tracing::initialize_tracing; +use noosphere_gateway::{Gateway, SingleTenantGatewayManager}; use noosphere_storage::BlockStore; use std::net::TcpListener; use tokio::io::AsyncReadExt; use tokio_stream::StreamExt; -use url::Url; - -use noosphere_core::api::v0alpha1; -use noosphere_core::data::{ContentType, Did}; - use ucan::crypto::KeyMaterial; - -use noosphere_cli::{ - commands::{ - key::key_create, - sphere::{sphere_create, sphere_join}, - }, - helpers::{temporary_workspace, SpherePair}, -}; -use noosphere_core::tracing::initialize_tracing; -use noosphere_gateway::{start_gateway, GatewayScope}; +use url::Url; #[tokio::test] async fn gateway_tells_you_its_identity() -> Result<()> { @@ -55,28 +52,31 @@ async fn gateway_tells_you_its_identity() -> Result<()> { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let gateway_address = listener.local_addr().unwrap(); - - let gateway_sphere_identity = gateway_workspace.sphere_identity().await.unwrap(); let client_sphere_identity = client_workspace.sphere_identity().await.unwrap(); - let gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + config_set( + ConfigSetCommand::Counterpart { + did: client_sphere_identity.clone().0, + }, + &mut gateway_workspace, + ) + .await?; + + let manager = + SingleTenantGatewayManager::new(gateway_sphere_context.clone(), client_sphere_identity) + .await?; + let server_task = tokio::spawn({ - let gateway_sphere_identity = gateway_sphere_identity.clone(); async move { - start_gateway( - listener, - GatewayScope { - identity: gateway_sphere_identity, - counterpart: client_sphere_identity, - }, - gateway_sphere_context, + let gateway = Gateway::new( + manager, Url::parse("http://127.0.0.1:5001").unwrap(), Url::parse("http://127.0.0.1:6667").unwrap(), None, ) - .await - .unwrap() + .unwrap(); + gateway.start(listener).await.unwrap() } }); @@ -119,28 +119,31 @@ async fn gateway_identity_can_be_verified_by_the_client_of_its_owner() -> Result let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let gateway_address = listener.local_addr().unwrap(); - - let gateway_sphere_identity = gateway_workspace.sphere_identity().await.unwrap(); let client_sphere_identity = client_workspace.sphere_identity().await.unwrap(); - let gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + config_set( + ConfigSetCommand::Counterpart { + did: client_sphere_identity.clone().0, + }, + &mut gateway_workspace, + ) + .await?; + + let manager = + SingleTenantGatewayManager::new(gateway_sphere_context.clone(), client_sphere_identity) + .await?; + let server_task = tokio::spawn({ - let gateway_sphere_identity = gateway_sphere_identity.clone(); async move { - start_gateway( - listener, - GatewayScope { - identity: gateway_sphere_identity, - counterpart: client_sphere_identity, - }, - gateway_sphere_context, + let gateway = Gateway::new( + manager, Url::parse("http://127.0.0.1:5001").unwrap(), Url::parse("http://127.0.0.1:6667").unwrap(), None, ) - .await - .unwrap() + .unwrap(); + gateway.start(listener).await.unwrap() } }); @@ -191,28 +194,31 @@ async fn gateway_receives_a_newly_initialized_sphere_from_the_client() -> Result let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let gateway_address = listener.local_addr().unwrap(); - let gateway_sphere_identity = gateway_workspace.sphere_identity().await.unwrap(); let client_sphere_identity = client_workspace.sphere_identity().await.unwrap(); - let gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + config_set( + ConfigSetCommand::Counterpart { + did: client_sphere_identity.clone().0, + }, + &mut gateway_workspace, + ) + .await?; + + let manager = + SingleTenantGatewayManager::new(gateway_sphere_context.clone(), client_sphere_identity) + .await?; + let server_task = { - let gateway_sphere_context = gateway_sphere_context.clone(); - let client_sphere_identity = client_sphere_identity.clone(); tokio::spawn(async move { - start_gateway( - listener, - GatewayScope { - identity: gateway_sphere_identity, - counterpart: client_sphere_identity, - }, - gateway_sphere_context, + let gateway = Gateway::new( + manager, Url::parse("http://127.0.0.1:5001").unwrap(), Url::parse("http://127.0.0.1:6667").unwrap(), None, ) - .await - .unwrap() + .unwrap(); + gateway.start(listener).await.unwrap() }) }; @@ -283,28 +289,32 @@ async fn gateway_updates_an_existing_sphere_with_changes_from_the_client() -> Re let gateway_identity = gateway_workspace.author().await?.did().await?; - let gateway_sphere_identity = gateway_workspace.sphere_identity().await.unwrap(); let client_sphere_identity = client_workspace.sphere_identity().await.unwrap(); - let gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + config_set( + ConfigSetCommand::Counterpart { + did: client_sphere_identity.clone().0, + }, + &mut gateway_workspace, + ) + .await?; + + let manager = + SingleTenantGatewayManager::new(gateway_sphere_context.clone(), client_sphere_identity) + .await + .unwrap(); + let server_task = { - let gateway_sphere_context = gateway_sphere_context.clone(); - let client_sphere_identity = client_sphere_identity.clone(); tokio::spawn(async move { - start_gateway( - listener, - GatewayScope { - identity: gateway_sphere_identity, - counterpart: client_sphere_identity, - }, - gateway_sphere_context, + let gateway = Gateway::new( + manager, Url::parse("http://127.0.0.1:5001").unwrap(), Url::parse("http://127.0.0.1:6667").unwrap(), None, ) - .await - .unwrap() + .unwrap(); + gateway.start(listener).await.unwrap() }) };