From b599a96e3a242f2cc0209e34a8cda383ebf2363e Mon Sep 17 00:00:00 2001 From: Jordan Santell Date: Wed, 25 Oct 2023 08:56:54 -0700 Subject: [PATCH] feat: Initial work refactoring noosphere-gateway to be generic over the spheres it manages. --- Cargo.lock | 18 +- Cargo.toml | 2 + .../src/native/commands/serve.rs | 15 +- .../src/native/helpers/workspace.rs | 29 +-- rust/noosphere-core/Cargo.toml | 5 +- rust/noosphere-core/src/api/client.rs | 10 +- rust/noosphere-core/src/api/headers/mod.rs | 9 + rust/noosphere-core/src/api/headers/ucan.rs | 90 +++++++ rust/noosphere-core/src/api/mod.rs | 8 +- rust/noosphere-core/src/data/mod.rs | 2 +- rust/noosphere-gateway/src/authority.rs | 151 ----------- rust/noosphere-gateway/src/context_manager.rs | 74 ++++++ rust/noosphere-gateway/src/gateway.rs | 68 ++--- rust/noosphere-gateway/src/handlers/mod.rs | 5 + .../src/handlers/v0alpha1/did.rs | 24 +- .../src/handlers/v0alpha1/fetch.rs | 42 +-- .../src/handlers/v0alpha1/identify.rs | 23 +- .../src/handlers/v0alpha1/mod.rs | 5 + .../src/handlers/v0alpha1/push.rs | 82 +++--- .../src/handlers/v0alpha1/replicate.rs | 17 +- .../src/handlers/v0alpha2/mod.rs | 5 + .../src/handlers/v0alpha2/push.rs | 75 +++--- rust/noosphere-gateway/src/lib.rs | 4 +- rust/noosphere-gateway/src/middleware.rs | 245 ++++++++++++++++++ rust/noosphere-gateway/src/worker/cleanup.rs | 40 ++- rust/noosphere-gateway/src/worker/mod.rs | 3 + .../src/worker/name_system.rs | 67 +++-- rust/noosphere-storage/Cargo.toml | 2 +- rust/noosphere/tests/gateway.rs | 118 +++++---- 29 files changed, 767 insertions(+), 471 deletions(-) create mode 100644 rust/noosphere-core/src/api/headers/mod.rs create mode 100644 rust/noosphere-core/src/api/headers/ucan.rs delete mode 100644 rust/noosphere-gateway/src/authority.rs create mode 100644 rust/noosphere-gateway/src/context_manager.rs create mode 100644 rust/noosphere-gateway/src/middleware.rs diff --git a/Cargo.lock b/Cargo.lock index e214f9377..408672165 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -531,9 +531,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.4" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "base64ct" @@ -2691,7 +2691,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70b34b6da8165c0bde35c82db8efda39b824776537e73973549e76cadb3a77c5" dependencies = [ "asynchronous-codec", - "base64 0.21.4", + "base64 0.21.5", "byteorder", "bytes", "either", @@ -3521,8 +3521,7 @@ dependencies = [ "async-recursion", "async-stream", "async-trait", - "axum", - "base64 0.21.4", + "base64 0.21.5", "byteorder", "bytes", "cid", @@ -3533,6 +3532,7 @@ dependencies = [ "futures-util", "getrandom 0.2.10", "gloo-net", + "headers", "instant", "iroh-car", "js-sys", @@ -3708,7 +3708,7 @@ dependencies = [ "anyhow", "async-stream", "async-trait", - "base64 0.21.4", + "base64 0.21.5", "cid", "instant", "js-sys", @@ -4472,7 +4472,7 @@ version = "0.11.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bytes", "encoding_rs", "futures-core", @@ -4740,7 +4740,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", ] [[package]] @@ -5848,7 +5848,7 @@ dependencies = [ "anyhow", "async-recursion", "async-trait", - "base64 0.21.4", + "base64 0.21.5", "bs58 0.5.0", "cid", "futures", diff --git a/Cargo.toml b/Cargo.toml index 14ada6e01..af98a87a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ anyhow = { version = "1" } async-recursion = { version = "1" } async-stream = { version = "0.3" } axum = { version = "^0.6.18" } +base64 = { version = "^0.21" } bytes = { version = "^1" } cid = { version = "0.10" } deterministic-bloom = { version = "0.1.0" } @@ -29,6 +30,7 @@ futures = { version = "0.3" } futures-util = { version = "0.3" } gloo-net = { version = "0.4" } gloo-timers = { version = "0.3", features = ["futures"] } +headers = { version = "=0.3.8" } # Match version used by `axum`. ignore = { version = "0.4.20" } instant = { version = "0.1", features = ["wasm-bindgen"] } iroh-car = { version = "^0.3.0" } diff --git a/rust/noosphere-cli/src/native/commands/serve.rs b/rust/noosphere-cli/src/native/commands/serve.rs index 150aa380e..8ea461a3b 100644 --- a/rust/noosphere-cli/src/native/commands/serve.rs +++ b/rust/noosphere-cli/src/native/commands/serve.rs @@ -3,7 +3,7 @@ use crate::native::workspace::Workspace; use anyhow::Result; -use noosphere_gateway::{start_gateway, GatewayScope}; +use noosphere_gateway::{start_gateway, SingleSphereContextManager}; use std::net::{IpAddr, TcpListener}; use url::Url; @@ -20,21 +20,12 @@ pub async fn serve( let listener = TcpListener::bind((interface, port))?; - let counterpart = workspace.counterpart_identity().await?; - - let identity = workspace.sphere_identity().await?; - - let gateway_scope = GatewayScope { - identity, - counterpart, - }; - let sphere_context = workspace.sphere_context().await?; + let context_manager = SingleSphereContextManager::new(sphere_context).await?; start_gateway( listener, - gateway_scope, - sphere_context, + context_manager, ipfs_api, name_resolver_api, cors_origin, diff --git a/rust/noosphere-cli/src/native/helpers/workspace.rs b/rust/noosphere-cli/src/native/helpers/workspace.rs index a4ac4538f..2e5641b87 100644 --- a/rust/noosphere-cli/src/native/helpers/workspace.rs +++ b/rust/noosphere-cli/src/native/helpers/workspace.rs @@ -14,11 +14,8 @@ use noosphere::{ NoosphereContext, NoosphereContextConfiguration, NoosphereNetwork, NoosphereSecurity, NoosphereStorage, NoosphereStorageConfig, NoosphereStoragePath, }; -use noosphere_core::{ - context::HasSphereContext, - data::{Did, Mnemonic}, -}; -use noosphere_gateway::{start_gateway, GatewayScope}; +use noosphere_core::data::{Did, Mnemonic}; +use noosphere_gateway::{start_gateway, SingleSphereContextManager}; use noosphere_ns::{helpers::NameSystemNetwork, server::start_name_system_api_server}; use tokio::{sync::Mutex, task::JoinHandle}; use url::Url; @@ -43,7 +40,7 @@ pub fn temporary_workspace() -> Result<(Workspace, (TempDir, TempDir))> { } async fn start_gateway_for_workspace( workspace: &Workspace, - client_sphere_identity: &Did, + _client_sphere_identity: &Did, ipfs_url: &Url, ns_url: &Url, ) -> Result<(Url, JoinHandle<()>)> { @@ -56,25 +53,14 @@ async fn start_gateway_for_workspace( ))?; let gateway_sphere_context = workspace.sphere_context().await?; - - let client_sphere_identity = client_sphere_identity.clone(); let ns_url = ns_url.clone(); let ipfs_url = ipfs_url.clone(); + let context_manager = SingleSphereContextManager::new(gateway_sphere_context).await?; let join_handle = tokio::spawn(async move { - start_gateway( - gateway_listener, - GatewayScope { - identity: gateway_sphere_context.identity().await.unwrap(), - counterpart: client_sphere_identity, - }, - gateway_sphere_context, - ipfs_url, - ns_url, - None, - ) - .await - .unwrap() + start_gateway(gateway_listener, context_manager, ipfs_url, ns_url, None) + .await + .unwrap() }); Ok((gateway_url, join_handle)) @@ -241,6 +227,7 @@ impl SpherePair { if self.gateway_task.is_some() { return Err(anyhow::anyhow!("Gateway already started.")); } + let (gateway_url, gateway_task) = start_gateway_for_workspace( &self.gateway.workspace, &self.client.identity, diff --git a/rust/noosphere-core/Cargo.toml b/rust/noosphere-core/Cargo.toml index e9ff2fd80..aeea33833 100644 --- a/rust/noosphere-core/Cargo.toml +++ b/rust/noosphere-core/Cargo.toml @@ -45,7 +45,7 @@ serde = { workspace = true } serde_json = { workspace = true } serde_urlencoded = { workspace = true } byteorder = "^1.5" -base64 = "0.21" +base64 = { workspace = true } ed25519-zebra = "^3" rand = { workspace = true } once_cell = "^1" @@ -57,7 +57,7 @@ reqwest = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } tokio-util = { workspace = true, features = ["io"] } - +headers = { workspace = true } noosphere-common = { version = "0.1.2", path = "../noosphere-common" } noosphere-storage = { version = "0.9.2", path = "../noosphere-storage" } noosphere-collections = { version = "0.6.6", path = "../noosphere-collections" } @@ -74,7 +74,6 @@ noosphere-common = { version = "0.1.2", path = "../noosphere-common", features = [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { workspace = true, features = ["full"] } tracing-subscriber = { workspace = true } -axum = { workspace = true, features = ["headers", "macros"] } [target.'cfg(target_arch = "wasm32")'.dependencies] # NOTE: This is needed so that rand can be included in WASM builds diff --git a/rust/noosphere-core/src/api/client.rs b/rust/noosphere-core/src/api/client.rs index fdf208d0c..e0ad3d2d2 100644 --- a/rust/noosphere-core/src/api/client.rs +++ b/rust/noosphere-core/src/api/client.rs @@ -4,7 +4,6 @@ use crate::{ api::{route::RouteUrl, v0alpha1, v0alpha2}, stream::{from_car_stream, memo_history_stream, put_block_stream, to_car_stream}, }; - use anyhow::{anyhow, Result}; use async_stream::try_stream; use bytes::Bytes; @@ -85,10 +84,11 @@ where let client = reqwest::Client::new(); - let mut url = api_base.clone(); - url.set_path(&v0alpha1::Route::Did.to_string()); - - let did_response = client.get(url).send().await?; + let did_response = { + let mut url = api_base.clone(); + url.set_path(&v0alpha1::Route::Did.to_string()); + client.get(url).send().await? + }; match did_response.status() { StatusCode::OK => (), diff --git a/rust/noosphere-core/src/api/headers/mod.rs b/rust/noosphere-core/src/api/headers/mod.rs new file mode 100644 index 000000000..7cd3ea774 --- /dev/null +++ b/rust/noosphere-core/src/api/headers/mod.rs @@ -0,0 +1,9 @@ +//! A collection of typed [headers::Header] implementations +//! used in gateway APIs. + +#[cfg(doc)] +use headers; + +mod ucan; + +pub use self::ucan::*; diff --git a/rust/noosphere-core/src/api/headers/ucan.rs b/rust/noosphere-core/src/api/headers/ucan.rs new file mode 100644 index 000000000..885103b4d --- /dev/null +++ b/rust/noosphere-core/src/api/headers/ucan.rs @@ -0,0 +1,90 @@ +use crate::data::Jwt; +use anyhow::anyhow; +use cid::Cid; +use headers::{self, Header, HeaderName, HeaderValue}; +use once_cell::sync::Lazy; +use ucan::{chain::ProofChain, crypto::did::DidParser, store::UcanJwtStore}; + +static UCAN_NAME: Lazy = Lazy::new(|| HeaderName::from_static("ucan")); + +/// A typed header for the `ucan` header, a tuple of [Cid] and [Jwt]. +/// Can represent a request with multiple `ucan` headers. +/// +/// The values are **not** validated during parsing beyond legitimate +/// looking [Cid]s and [Jwt]s here. Use [Ucan::as_proof_chain] to +/// validate and construct a [ProofChain]. +pub struct Ucan(Vec<(Cid, Jwt)>); + +impl Header for Ucan { + fn name() -> &'static HeaderName { + &UCAN_NAME + } + + fn decode<'i, I>(values: &mut I) -> Result + where + I: Iterator, + { + let mut ucans = vec![]; + for header in values.by_ref() { + let value = header.to_str().map_err(|_| headers::Error::invalid())?; + let mut parts: Vec<&str> = value.split_ascii_whitespace().take(2).collect(); + + let jwt: Jwt = parts + .pop() + .ok_or_else(headers::Error::invalid)? + .to_string() + .into(); + let cid: Cid = parts + .pop() + .ok_or_else(headers::Error::invalid)? + .try_into() + .map_err(|_| headers::Error::invalid())?; + + ucans.push((cid, jwt)); + } + + Ok(Ucan(ucans)) + } + + fn encode(&self, values: &mut E) + where + E: Extend, + { + for (cid, jwt) in self.0.iter() { + let value = HeaderValue::from_str(&format!("{} {}", cid, jwt)).unwrap(); + values.extend(std::iter::once(value)); + } + } +} + +impl Ucan { + /// Construct a [ProofChain] from the collected `ucan` header + /// values. This is where validation of the data occurs. + pub async fn as_proof_chain( + &self, + bearer: headers::authorization::Bearer, + mut db: impl UcanJwtStore, + did_parser: &mut DidParser, + ) -> anyhow::Result { + for (cid, jwt) in self.0.iter() { + // TODO(#261): We need a worker process that purges garbage UCANs + let actual_cid = db.write_token(jwt.into()).await?; + if actual_cid != *cid { + return Err(anyhow!("Cid and Jwt do not match.")); + } + } + + let proof_chain = + ProofChain::try_from_token_string(bearer.token(), None, did_parser, &db).await?; + + proof_chain.ucan().validate(None, did_parser).await?; + + Ok(proof_chain) + } +} + +impl From for Vec<(Cid, Jwt)> { + fn from(value: Ucan) -> Self { + value.0 + } +} diff --git a/rust/noosphere-core/src/api/mod.rs b/rust/noosphere-core/src/api/mod.rs index 2973c1d19..baeab3252 100644 --- a/rust/noosphere-core/src/api/mod.rs +++ b/rust/noosphere-core/src/api/mod.rs @@ -5,9 +5,9 @@ mod client; mod data; mod route; -pub use client::*; -pub use data::*; -pub use route::*; - +pub mod headers; pub mod v0alpha1; pub mod v0alpha2; + +pub use client::*; +pub use data::*; diff --git a/rust/noosphere-core/src/data/mod.rs b/rust/noosphere-core/src/data/mod.rs index 3631f7f4b..5ef8eccd4 100644 --- a/rust/noosphere-core/src/data/mod.rs +++ b/rust/noosphere-core/src/data/mod.rs @@ -14,12 +14,12 @@ mod sphere; mod strings; mod versioned_map; +pub use self::headers::*; pub use address::*; pub use authority::*; pub use body_chunk::*; pub use bundle::*; pub use changelog::*; -pub use headers::*; pub use link::*; pub use memo::*; pub use sphere::*; diff --git a/rust/noosphere-gateway/src/authority.rs b/rust/noosphere-gateway/src/authority.rs deleted file mode 100644 index 696eca6c9..000000000 --- a/rust/noosphere-gateway/src/authority.rs +++ /dev/null @@ -1,151 +0,0 @@ -use std::sync::Arc; - -use anyhow::Result; -use async_trait::async_trait; -use axum::{ - extract::FromRequestParts, - headers::{authorization::Bearer, Authorization}, - http::{request::Parts, StatusCode}, - TypedHeader, -}; -use libipld_core::cid::Cid; -use noosphere_core::authority::{SphereAbility, SphereReference, SPHERE_SEMANTICS}; -use noosphere_core::context::SphereContext; -use noosphere_storage::Storage; -use tokio::sync::Mutex; -use ucan::{capability::CapabilityView, chain::ProofChain, store::UcanJwtStore}; - -use super::GatewayScope; - -/// This is a construct that can be generated on a per-request basis and -/// embodies the authorization status of the request-maker as it is -/// represented by a UCAN. Any request handler can use a GatewayAuthority -/// to test if a required capability is satisfied by the authorization -/// presented by the maker of the request. -pub struct GatewayAuthority { - proof: ProofChain, - scope: GatewayScope, - marker: std::marker::PhantomData, -} - -impl GatewayAuthority -where - S: Storage + 'static, -{ - pub fn try_authorize( - &self, - capability: &CapabilityView, - ) -> Result<(), StatusCode> { - let capability_infos = self.proof.reduce_capabilities(&SPHERE_SEMANTICS); - - for capability_info in capability_infos { - trace!("Checking capability: {:?}", capability_info.capability); - if capability_info - .originators - .contains(self.scope.counterpart.as_str()) - && capability_info.capability.enables(capability) - { - debug!("Authorized!"); - return Ok(()); - } - } - - Err(StatusCode::UNAUTHORIZED) - } -} - -#[async_trait] -impl FromRequestParts for GatewayAuthority -where - State: Send + Sync, - S: Storage + 'static, -{ - type Rejection = StatusCode; - - async fn from_request_parts(parts: &mut Parts, state: &State) -> Result { - let sphere_context = parts - .extensions - .get::>>>() - .ok_or_else(|| { - error!("Could not find DidParser in extensions"); - StatusCode::INTERNAL_SERVER_ERROR - })? - .clone(); - - // Get the scope of this gateway - let gateway_scope = parts - .extensions - .get::() - .ok_or_else(|| { - error!("Could not find GatewayScope in extensions"); - StatusCode::INTERNAL_SERVER_ERROR - })? - .clone(); - - // Extract the bearer token - let TypedHeader(Authorization(bearer)) = - TypedHeader::>::from_request_parts(parts, state) - .await - .map_err(|error| { - error!("{:?}", error); - StatusCode::BAD_REQUEST - })?; - - let mut db = { - let sphere_context = sphere_context.lock().await; - sphere_context.db().clone() - }; - - let ucan_headers = parts.headers.get_all("ucan").into_iter(); - - // TODO: We should write a typed header thing for this: - for header in ucan_headers { - let value = header.to_str().map_err(|_| StatusCode::BAD_REQUEST)?; - let mut parts: Vec<&str> = value.split_ascii_whitespace().take(2).collect(); - - let jwt = parts.pop().ok_or(StatusCode::BAD_REQUEST)?; - - let cid_string = parts.pop().ok_or(StatusCode::BAD_REQUEST)?; - let claimed_cid = Cid::try_from(cid_string).map_err(|_| StatusCode::BAD_REQUEST)?; - - // TODO(#261): We need a worker process that purges garbage UCANs - let actual_cid = db - .write_token(jwt) - .await - .map_err(|_| StatusCode::BAD_REQUEST)?; - - if claimed_cid != actual_cid { - return Err(StatusCode::BAD_REQUEST); - } - } - - let proof_chain = { - let mut sphere_context = sphere_context.lock().await; - let did_parser = sphere_context.did_parser_mut(); - let proof_chain = - ProofChain::try_from_token_string(bearer.token(), None, did_parser, &db) - .await - .map_err(|error| { - error!("{:?}", error); - StatusCode::BAD_REQUEST - })?; - - proof_chain - .ucan() - .validate(None, did_parser) - .await - .map_err(|error| { - error!("{:?}", error); - StatusCode::UNAUTHORIZED - })?; - - proof_chain - }; - - Ok(GatewayAuthority { - scope: gateway_scope.clone(), - proof: proof_chain, - marker: std::marker::PhantomData, - }) - } -} diff --git a/rust/noosphere-gateway/src/context_manager.rs b/rust/noosphere-gateway/src/context_manager.rs new file mode 100644 index 000000000..6202f6c42 --- /dev/null +++ b/rust/noosphere-gateway/src/context_manager.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use async_stream::try_stream; +use async_trait::async_trait; +use noosphere_core::context::HasMutableSphereContext; +use noosphere_core::data::Did; +use noosphere_storage::Storage; +use std::pin::Pin; +use tokio_stream::Stream; + +type ContextManagerSphereStream<'a, C> = dyn Stream> + Send + 'a; + +/// Trait to abstract away accessing sphere contexts in the gateway. +#[async_trait] +pub trait ContextManager: Clone + Send + Sync +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + /// Retrieve a sphere context whose counterpart matches `counterpart`. + async fn get_sphere_context(&self, counterpart: &Did) -> Result; + + /// Iterate over all managed spheres. Currently used by task workers. + /// Will need to rethink this. + fn iter(&self) -> Pin>>; +} + +/// Implements [ContextManager] for a single sphere context, used in the single-sphere +/// gateway workflow. +#[derive(Clone)] +pub struct SingleSphereContextManager +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + context: C, + marker: std::marker::PhantomData, +} + +impl SingleSphereContextManager +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + /// Create a new [SingleSphereContextManager], implementing [ContextManager] for a single sphere `context`. + pub async fn new(context: C) -> Result { + Ok(SingleSphereContextManager { + context, + marker: std::marker::PhantomData, + }) + } +} + +#[async_trait] +impl ContextManager for SingleSphereContextManager +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + async fn get_sphere_context(&self, _counterpart: &Did) -> Result { + // Our original single sphere gateways in the cluster use a hashed identity + // in its hostname, so we won't get the correct counterpart here. Similarly, + // a self-hosted orb server may not have a proper subdomain for the sphere, + // with a gateway already bound to a single sphere (via [SingleSphereContextManager]). + // Always return the only context we have. [GatewayAuthority] ensure requests + // are properly authorized. + Ok(self.context.clone()) + } + + fn iter(&self) -> Pin>> { + Box::pin(try_stream! { + yield self.context.clone(); + }) + } +} diff --git a/rust/noosphere-gateway/src/gateway.rs b/rust/noosphere-gateway/src/gateway.rs index 0b1f2665a..0be320622 100644 --- a/rust/noosphere-gateway/src/gateway.rs +++ b/rust/noosphere-gateway/src/gateway.rs @@ -4,7 +4,6 @@ use axum::http::{HeaderValue, Method}; use axum::routing::{get, put}; use axum::{Extension, Router, Server}; use noosphere_core::context::HasMutableSphereContext; -use noosphere_core::data::Did; use noosphere_ipfs::KuboClient; use noosphere_storage::Storage; use std::net::TcpListener; @@ -14,6 +13,7 @@ use url::Url; use noosphere_core::api::{v0alpha1, v0alpha2}; +use crate::ContextManager; use crate::{ handlers, worker::{ @@ -26,35 +26,21 @@ use noosphere_core::tracing::initialize_tracing; const DEFAULT_BODY_LENGTH_LIMIT: usize = 100 /* MB */ * 1000 * 1000; -/// A [GatewayScope] describes the pairing of a gateway and its designated user -/// via their spheres' respective [Did]s -#[derive(Clone, Debug)] -pub struct GatewayScope { - /// Identity of gateway sphere. - pub identity: Did, - /// Identity of a managed sphere that is being reflected by gateway sphere. - pub counterpart: Did, -} - /// Start a Noosphere Gateway -pub async fn start_gateway( +pub async fn start_gateway( listener: TcpListener, - gateway_scope: GatewayScope, - sphere_context: C, + context_manager: M, ipfs_api: Url, name_resolver_api: Url, cors_origin: Option, ) -> Result<()> where + M: ContextManager + 'static, C: HasMutableSphereContext + 'static, S: Storage + 'static, { initialize_tracing(None); - let gateway_key_did = { - let sphere_context = sphere_context.sphere_context().await?; - sphere_context.author().identity().await? - }; let mut cors = CorsLayer::new(); if let Some(cors_origin) = cors_origin { @@ -79,45 +65,43 @@ where let ipfs_client = KuboClient::new(&ipfs_api)?; let (syndication_tx, syndication_task) = start_ipfs_syndication::(ipfs_api.clone()); - let (name_system_tx, name_system_task) = start_name_system::( + let (name_system_tx, name_system_task) = start_name_system::( NameSystemConfiguration { connection_type: NameSystemConnectionType::Remote(name_resolver_api), ipfs_api, }, - vec![sphere_context.clone()], + context_manager.clone(), ); - let (cleanup_tx, cleanup_task) = start_cleanup::(sphere_context.clone()); + let (cleanup_tx, cleanup_task) = start_cleanup::(context_manager.clone()); let app = Router::new() .route( &v0alpha1::Route::Did.to_string(), - get(handlers::v0alpha1::did_route), + get(handlers::v0alpha1::did_route::), ) .route( &v0alpha1::Route::Replicate(None).to_string(), - get(handlers::v0alpha1::replicate_route::), + get(handlers::v0alpha1::replicate_route::), ) .route( &v0alpha1::Route::Identify.to_string(), - get(handlers::v0alpha1::identify_route::), + get(handlers::v0alpha1::identify_route::), ) .route( &v0alpha1::Route::Push.to_string(), #[allow(deprecated)] - put(handlers::v0alpha1::push_route::), + put(handlers::v0alpha1::push_route::), ) .route( &v0alpha2::Route::Push.to_string(), - put(handlers::v0alpha2::push_route::), + put(handlers::v0alpha2::push_route::), ) .route( &v0alpha1::Route::Fetch.to_string(), - get(handlers::v0alpha1::fetch_route::), + get(handlers::v0alpha1::fetch_route::), ) - .layer(Extension(sphere_context.clone())) - .layer(Extension(gateway_scope.clone())) + .layer(Extension(context_manager)) .layer(Extension(ipfs_client)) - .layer(Extension(gateway_key_did)) .layer(Extension(syndication_tx)) .layer(Extension(name_system_tx)) .layer(Extension(cleanup_tx)) @@ -125,17 +109,19 @@ where .layer(cors) .layer(TraceLayer::new_for_http()); - info!( - r#"A geist is summoned to manage local sphere {} - -It has bound a gateway to {:?} -It awaits updates from sphere {}..."#, - gateway_scope.identity, - listener - .local_addr() - .expect("Unexpected missing listener address"), - gateway_scope.counterpart - ); + /* + info!( + r#"A geist is summoned to manage local sphere {} + + It has bound a gateway to {:?} + It awaits updates from sphere {}..."#, + gateway_scope.identity, + listener + .local_addr() + .expect("Unexpected missing listener address"), + gateway_scope.counterpart + ); + */ Server::from_tcp(listener)? .serve(app.into_make_service()) diff --git a/rust/noosphere-gateway/src/handlers/mod.rs b/rust/noosphere-gateway/src/handlers/mod.rs index 9f424705d..ce7b1b295 100644 --- a/rust/noosphere-gateway/src/handlers/mod.rs +++ b/rust/noosphere-gateway/src/handlers/mod.rs @@ -1,2 +1,7 @@ +//! Stateless [axum] handlers for processing gateway requests. + +#[cfg(doc)] +use axum; + pub mod v0alpha1; pub mod v0alpha2; diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/did.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/did.rs index 97f02cbfa..03a5fd47e 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/did.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/did.rs @@ -1,6 +1,22 @@ -use axum::{http::StatusCode, Extension}; -use noosphere_core::data::Did; +use axum::http::StatusCode; +use noosphere_core::context::HasMutableSphereContext; +use noosphere_storage::Storage; -pub async fn did_route(Extension(gateway_identity): Extension) -> Result { - Ok(gateway_identity.into()) +use crate::{middleware::GatewaySphereContext, ContextManager}; + +pub async fn did_route(sphere: GatewaySphereContext) -> Result +where + M: ContextManager, + C: HasMutableSphereContext, + S: Storage + 'static, +{ + let did = sphere + .sphere_context() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .author() + .identity() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(did.into()) } diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs index 37acb6572..5c3f52417 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs @@ -7,8 +7,8 @@ use bytes::Bytes; use noosphere_core::{ api::v0alpha1::FetchParameters, authority::{generate_capability, SphereAbility}, - context::HasSphereContext, - data::{Link, MemoIpld}, + context::HasMutableSphereContext, + data::{Did, Link, MemoIpld}, stream::{memo_history_stream, to_car_stream}, view::Sphere, }; @@ -16,31 +16,30 @@ use noosphere_ipfs::{IpfsStore, KuboClient}; use noosphere_storage::{BlockStoreRetry, SphereDb, Storage}; use tokio_stream::{Stream, StreamExt}; -use crate::{authority::GatewayAuthority, GatewayScope}; +use crate::{middleware::GatewayAuthority, ContextManager}; -#[instrument(level = "debug", skip(authority, scope, sphere_context, ipfs_client))] -pub async fn fetch_route( - authority: GatewayAuthority, +#[instrument(level = "debug", skip(authority, ipfs_client))] +pub async fn fetch_route( + authority: GatewayAuthority, Query(FetchParameters { since }): Query, - Extension(scope): Extension, Extension(ipfs_client): Extension, - Extension(sphere_context): Extension, ) -> Result>>, StatusCode> where - C: HasSphereContext, + M: ContextManager, + C: HasMutableSphereContext, S: Storage + 'static, { - authority.try_authorize(&generate_capability( - &scope.counterpart, - SphereAbility::Fetch, - ))?; - let sphere_context = sphere_context.sphere_context().await.map_err(|err| { + let counterpart = authority.counterpart(); + + authority.try_authorize(&generate_capability(counterpart, SphereAbility::Fetch))?; + + let sphere_context = authority.sphere_context().await.map_err(|err| { error!("{err}"); StatusCode::INTERNAL_SERVER_ERROR })?; let db = sphere_context.db(); - - let stream = generate_fetch_stream(&scope, since.as_ref(), db, ipfs_client) + let identity = sphere_context.identity(); + let stream = generate_fetch_stream(counterpart, identity, since.as_ref(), db, ipfs_client) .await .map_err(|err| { error!("{err}"); @@ -53,7 +52,8 @@ where /// Generates a CAR stream that can be used as a the streaming body of a /// gateway fetch route response pub async fn generate_fetch_stream( - scope: &GatewayScope, + counterpart: &Did, + identity: &Did, since: Option<&Link>, db: &SphereDb, ipfs_client: KuboClient, @@ -61,7 +61,7 @@ pub async fn generate_fetch_stream( where S: Storage + 'static, { - let latest_local_sphere_cid = db.require_version(&scope.identity).await?.into(); + let latest_local_sphere_cid = db.require_version(identity).await?.into(); debug!("The latest gateway sphere version is {latest_local_sphere_cid}..."); @@ -93,7 +93,7 @@ where match latest_local_sphere .get_content() .await? - .get(&scope.counterpart) + .get(counterpart) .await? { Some(latest_counterpart_sphere_cid) => { @@ -103,7 +103,7 @@ where Some(since_local_sphere_cid) => { let since_local_sphere = Sphere::at(since_local_sphere_cid, db); let links = since_local_sphere.get_content().await?; - links.get(&scope.counterpart).await?.cloned() + links.get(counterpart).await?.cloned() } None => None, }; @@ -128,7 +128,7 @@ where ))); } None => { - warn!("No revisions found for counterpart {}!", scope.counterpart); + warn!("No revisions found for counterpart {}!", counterpart); Ok(Box::pin(to_car_stream( vec![latest_local_sphere_cid.into()], stream, diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/identify.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/identify.rs index 818f3a15b..d41195543 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/identify.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/identify.rs @@ -1,31 +1,32 @@ -use crate::{authority::GatewayAuthority, GatewayScope}; -use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; +use crate::middleware::GatewayAuthority; +use crate::ContextManager; +use axum::{http::StatusCode, response::IntoResponse, Json}; use noosphere_core::api::v0alpha1::IdentifyResponse; use noosphere_core::authority::{generate_capability, SphereAbility}; -use noosphere_core::context::HasSphereContext; +use noosphere_core::context::HasMutableSphereContext; use noosphere_storage::Storage; -pub async fn identify_route( - Extension(scope): Extension, - Extension(sphere_context): Extension, - authority: GatewayAuthority, +pub async fn identify_route( + authority: GatewayAuthority, ) -> Result where - C: HasSphereContext, + M: ContextManager, + C: HasMutableSphereContext, S: Storage + 'static, { debug!("Invoking identify route..."); authority.try_authorize(&generate_capability( - &scope.counterpart, + authority.counterpart(), SphereAbility::Fetch, ))?; - let sphere_context = sphere_context + let sphere_context = authority .sphere_context() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let db = sphere_context.db(); + let identity = sphere_context.identity(); let gateway_key = &sphere_context.author().key; let gateway_authorization = sphere_context @@ -42,7 +43,7 @@ where })?; Ok(Json( - IdentifyResponse::sign(&scope.identity, gateway_key, &ucan) + IdentifyResponse::sign(identity, gateway_key, &ucan) .await .map_err(|error| { error!("{:?}", error); diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/mod.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/mod.rs index 13b47ccc2..15ae48999 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/mod.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/mod.rs @@ -1,3 +1,8 @@ +//! v0alpha1 stateless [axum] handlers. + +#[cfg(doc)] +use axum; + mod did; mod fetch; mod identify; diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs index 3ebd6f93f..1c82b85e9 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/push.rs @@ -15,54 +15,42 @@ use noosphere_storage::Storage; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::StreamExt; +use crate::ContextManager; use crate::{ - authority::GatewayAuthority, extractor::Cbor, + middleware::GatewayAuthority, worker::{NameSystemJob, SyndicationJob}, - GatewayScope, }; // #[debug_handler] #[deprecated(since = "0.8.1", note = "Please migrate to v0alpha2")] #[instrument( level = "debug", - skip( - authority, - gateway_scope, - sphere_context, - syndication_tx, - name_system_tx, - request_body - ) + skip(authority, syndication_tx, name_system_tx, request_body) )] -pub async fn push_route( - authority: GatewayAuthority, - Extension(sphere_context): Extension, - Extension(gateway_scope): Extension, +pub async fn push_route( + authority: GatewayAuthority, Extension(syndication_tx): Extension>>, Extension(name_system_tx): Extension>>, Cbor(request_body): Cbor, ) -> Result, StatusCode> where + M: ContextManager, C: HasMutableSphereContext, S: Storage + 'static, { debug!("Invoking push route..."); let sphere_identity = &request_body.sphere; - - if sphere_identity != &gateway_scope.counterpart { + let counterpart = authority.counterpart(); + if sphere_identity != counterpart { return Err(StatusCode::FORBIDDEN); } - authority.try_authorize(&generate_capability( - &gateway_scope.counterpart, - SphereAbility::Push, - ))?; + authority.try_authorize(&generate_capability(counterpart, SphereAbility::Push))?; let gateway_push_routine = GatewayPushRoutine { - sphere_context, - gateway_scope, + authority, syndication_tx, name_system_tx, request_body, @@ -72,21 +60,22 @@ where Ok(Cbor(gateway_push_routine.invoke().await?)) } -pub struct GatewayPushRoutine +pub struct GatewayPushRoutine where + M: ContextManager, C: HasMutableSphereContext, S: Storage + 'static, { - sphere_context: C, - gateway_scope: GatewayScope, + authority: GatewayAuthority, syndication_tx: UnboundedSender>, name_system_tx: UnboundedSender>, request_body: PushBody, storage_type: PhantomData, } -impl GatewayPushRoutine +impl GatewayPushRoutine where + M: ContextManager, C: HasMutableSphereContext, S: Storage + 'static, { @@ -117,7 +106,8 @@ where async fn verify_history(&self) -> Result<(), PushError> { debug!("Verifying pushed sphere history..."); - let gateway_sphere_tip = self.sphere_context.version().await?; + let gateway_sphere_context = self.authority.sphere_context().await?; + let gateway_sphere_tip = gateway_sphere_context.version().await?; if Some(&gateway_sphere_tip) != self.request_body.counterpart_tip.as_ref() { warn!( "Gateway sphere conflict; we have {gateway_sphere_tip}, they have {:?}", @@ -127,7 +117,6 @@ where } let sphere_identity = &self.request_body.sphere; - let gateway_sphere_context = self.sphere_context.sphere_context().await?; let db = gateway_sphere_context.db(); let local_sphere_base_cid = db.get_version(sphere_identity).await?.map(|cid| cid.into()); @@ -172,9 +161,10 @@ where /// revision in the history as we go. Then, update our local pointer to the /// tip of the pushed history. async fn incorporate_history(&mut self) -> Result<(), PushError> { + let counterpart = self.authority.counterpart().to_owned(); { debug!("Merging pushed sphere history..."); - let mut sphere_context = self.sphere_context.sphere_context_mut().await?; + let mut sphere_context = self.authority.sphere_context_mut().await?; self.request_body .blocks @@ -199,22 +189,17 @@ where sphere.hydrate().await?; } - debug!( - "Setting {} tip to {}...", - self.gateway_scope.counterpart, tip - ); + debug!("Setting {} tip to {}...", counterpart, tip); sphere_context .db_mut() - .set_version(&self.gateway_scope.counterpart, tip) + .set_version(&counterpart, tip) .await?; } - self.sphere_context - .link_raw( - &self.gateway_scope.counterpart, - &self.request_body.local_tip, - ) + self.authority + .sphere_mut() + .link_raw(&counterpart, &self.request_body.local_tip) .await?; Ok(()) @@ -223,7 +208,7 @@ where async fn synchronize_names(&mut self) -> Result<(), PushError> { debug!("Synchronizing name changes to local sphere..."); - let my_sphere = self.sphere_context.to_sphere().await?; + let my_sphere = self.authority.sphere().to_sphere().await?; let my_names = my_sphere.get_address_book().await?.get_identities().await?; let sphere = Sphere::at(&self.request_body.local_tip, my_sphere.store()); @@ -262,7 +247,7 @@ where // on the client due to a previous sync if my_value != Some(&value) { debug!("Adding name '{}' ({})...", key, value.did); - self.sphere_context + self.authority .sphere_context_mut() .await? .mutation_mut() @@ -281,7 +266,7 @@ where } debug!("Removing name '{}'...", key); - self.sphere_context + self.authority .sphere_context_mut() .await? .mutation_mut() @@ -306,13 +291,14 @@ where // NOTE CDATA: "Previous version" doesn't cover all cases; this needs to be a version given // in the push body, or else we don't know how far back we actually have to go (e.g., the name // system may have created a new version in the mean time. - let previous_version = self.sphere_context.version().await?; - let next_version = SphereCursor::latest(self.sphere_context.clone()) + let previous_version = self.authority.sphere().version().await?; + let next_version = SphereCursor::latest(self.authority.sphere().clone()) .save(None) .await?; let blocks = self - .sphere_context + .authority + .sphere() .to_sphere() .await? .bundle_until_ancestor(Some(&previous_version)) @@ -325,7 +311,7 @@ where async fn notify_name_resolver(&self) -> Result<()> { if let Some(name_record) = &self.request_body.name_record { if let Err(error) = self.name_system_tx.send(NameSystemJob::Publish { - context: self.sphere_context.clone(), + context: self.authority.sphere().clone(), record: LinkRecord::try_from(name_record)?, republish: false, }) { @@ -334,7 +320,7 @@ where } if let Err(error) = self.name_system_tx.send(NameSystemJob::ResolveSince { - context: self.sphere_context.clone(), + context: self.authority.sphere().clone(), since: self.request_body.local_base, }) { warn!("Failed to request name system resolutions: {}", error); @@ -350,7 +336,7 @@ where // have added it to the gateway. if let Err(error) = self.syndication_tx.send(SyndicationJob { revision: next_version, - context: self.sphere_context.clone(), + context: self.authority.sphere().clone(), }) { warn!("Failed to queue IPFS syndication job: {}", error); }; diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs index 8d97f3c28..3936a55cd 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs @@ -21,7 +21,7 @@ use noosphere_ipfs::{IpfsStore, KuboClient}; use noosphere_storage::{BlockStore, BlockStoreRetry, Storage}; use tokio_stream::Stream; -use crate::{authority::GatewayAuthority, GatewayScope}; +use crate::{middleware::GatewayAuthority, ContextManager}; pub type ReplicationCarStreamBody = StreamBody> + Send>>>; @@ -32,18 +32,17 @@ pub type ReplicationCarStreamBody = /// header is used to determine how to gather the associated blocks to be /// streamed by to the requesting client. Invoker must have authorization to /// fetch from the gateway. -#[instrument(level = "debug", skip(authority, scope, sphere_context,))] -pub async fn replicate_route( - authority: GatewayAuthority, +#[instrument(level = "debug", skip(authority))] +pub async fn replicate_route( + authority: GatewayAuthority, // NOTE: Cannot go from string to CID via serde Path(memo_version): Path, Query(ReplicateParameters { since }): Query, - Extension(scope): Extension, Extension(ipfs_client): Extension, - Extension(sphere_context): Extension, ) -> Result where - C: HasMutableSphereContext + 'static, + M: ContextManager, + C: HasMutableSphereContext, S: Storage + 'static, { debug!("Invoking replicate route..."); @@ -54,11 +53,11 @@ where })?; authority.try_authorize(&generate_capability( - &scope.counterpart, + authority.counterpart(), SphereAbility::Fetch, ))?; - let db = sphere_context + let db = authority .sphere_context() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? diff --git a/rust/noosphere-gateway/src/handlers/v0alpha2/mod.rs b/rust/noosphere-gateway/src/handlers/v0alpha2/mod.rs index 621b20604..5d79540dc 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha2/mod.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha2/mod.rs @@ -1,3 +1,8 @@ +//! v0alpha2 stateless [axum] handlers. + +#[cfg(doc)] +use axum; + mod push; pub use push::*; diff --git a/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs b/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs index 06fdc442d..405d99bf3 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs @@ -22,47 +22,38 @@ use noosphere_storage::{block_deserialize, block_serialize, Storage}; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::{Stream, StreamExt}; +use crate::ContextManager; use crate::{ - authority::GatewayAuthority, error::GatewayErrorResponse, + middleware::GatewayAuthority, worker::{NameSystemJob, SyndicationJob}, - GatewayScope, }; // #[debug_handler] #[instrument( level = "debug", - skip( - authority, - gateway_scope, - sphere_context, - syndication_tx, - name_system_tx, - stream - ) + skip(authority, syndication_tx, name_system_tx, stream) )] -pub async fn push_route( - authority: GatewayAuthority, - Extension(sphere_context): Extension, - Extension(gateway_scope): Extension, +pub async fn push_route( + authority: GatewayAuthority, Extension(syndication_tx): Extension>>, Extension(name_system_tx): Extension>>, stream: BodyStream, ) -> Result>>, GatewayErrorResponse> where - C: HasMutableSphereContext, + M: ContextManager + 'static, + C: HasMutableSphereContext + 'static, S: Storage + 'static, { debug!("Invoking push route..."); authority.try_authorize(&generate_capability( - &gateway_scope.counterpart, + authority.counterpart(), SphereAbility::Push, ))?; let gateway_push_routine = GatewayPushRoutine { - sphere_context, - gateway_scope, + authority, syndication_tx, name_system_tx, block_stream: Box::pin(from_car_stream(stream)), @@ -72,22 +63,23 @@ where Ok(StreamBody::new(gateway_push_routine.invoke().await?)) } -pub struct GatewayPushRoutine +pub struct GatewayPushRoutine where - C: HasMutableSphereContext + 'static, + M: ContextManager, + C: HasMutableSphereContext, S: Storage + 'static, St: Stream)>> + Unpin + 'static, { - sphere_context: C, - gateway_scope: GatewayScope, + authority: GatewayAuthority, syndication_tx: UnboundedSender>, name_system_tx: UnboundedSender>, block_stream: St, storage_type: PhantomData, } -impl GatewayPushRoutine +impl GatewayPushRoutine where + M: ContextManager + 'static, C: HasMutableSphereContext + 'static, S: Storage + 'static, St: Stream)>> + Unpin + 'static, @@ -141,7 +133,8 @@ where return Err(PushError::UnexpectedBody); }; - let gateway_sphere_tip = self.sphere_context.version().await?; + let gateway_sphere_context = self.authority.sphere_context().await?; + let gateway_sphere_tip = gateway_sphere_context.version().await?; if Some(&gateway_sphere_tip) != push_body.counterpart_tip.as_ref() { warn!( "Gateway sphere conflict; we have {gateway_sphere_tip}, they have {:?}", @@ -151,7 +144,6 @@ where } let sphere_identity = &push_body.sphere; - let gateway_sphere_context = self.sphere_context.sphere_context().await?; let db = gateway_sphere_context.db(); let local_sphere_base_cid = db.get_version(sphere_identity).await?.map(|cid| cid.into()); @@ -196,9 +188,10 @@ where /// revision in the history as we go. Then, update our local pointer to the /// tip of the pushed history. async fn incorporate_history(&mut self, push_body: &PushBody) -> Result<(), PushError> { + let counterpart = self.authority.counterpart().to_owned(); { debug!("Merging pushed sphere history..."); - let mut sphere_context = self.sphere_context.sphere_context_mut().await?; + let mut sphere_context = self.authority.sphere_context_mut().await?; put_block_stream(sphere_context.db_mut().clone(), &mut self.block_stream).await?; @@ -220,19 +213,17 @@ where sphere.hydrate().await?; } - debug!( - "Setting {} tip to {}...", - self.gateway_scope.counterpart, tip - ); + debug!("Setting {} tip to {}...", counterpart, tip); sphere_context .db_mut() - .set_version(&self.gateway_scope.counterpart, tip) + .set_version(&counterpart, tip) .await?; } - self.sphere_context - .link_raw(&self.gateway_scope.counterpart, &push_body.local_tip) + self.authority + .sphere_mut() + .link_raw(&counterpart, &push_body.local_tip) .await?; Ok(()) @@ -241,7 +232,7 @@ where async fn synchronize_names(&mut self, push_body: &PushBody) -> Result<(), PushError> { debug!("Synchronizing name changes to local sphere..."); - let my_sphere = self.sphere_context.to_sphere().await?; + let my_sphere = self.authority.sphere().to_sphere().await?; let my_names = my_sphere.get_address_book().await?.get_identities().await?; let sphere = Sphere::at(&push_body.local_tip, my_sphere.store()); @@ -280,7 +271,7 @@ where // on the client due to a previous sync if my_value != Some(&value) { debug!("Adding name '{}' ({})...", key, value.did); - self.sphere_context + self.authority .sphere_context_mut() .await? .mutation_mut() @@ -299,7 +290,7 @@ where } debug!("Removing name '{}'...", key); - self.sphere_context + self.authority .sphere_context_mut() .await? .mutation_mut() @@ -326,12 +317,12 @@ where // NOTE CDATA: "Previous version" doesn't cover all cases; this needs to be a version given // in the push body, or else we don't know how far back we actually have to go (e.g., the name // system may have created a new version in the mean time. - let previous_version = self.sphere_context.version().await?; - let next_version = SphereCursor::latest(self.sphere_context.clone()) + let previous_version = self.authority.sphere().version().await?; + let next_version = SphereCursor::latest(self.authority.sphere().clone()) .save(None) .await?; - let db = self.sphere_context.sphere_context().await?.db().clone(); + let db = self.authority.sphere_context().await?.db().clone(); let block_stream = memo_history_stream(db, &next_version, Some(&previous_version), false); Ok((next_version, block_stream)) @@ -341,7 +332,7 @@ where async fn notify_name_resolver(&self, push_body: &PushBody) -> Result<()> { if let Some(name_record) = &push_body.name_record { if let Err(error) = self.name_system_tx.send(NameSystemJob::Publish { - context: self.sphere_context.clone(), + context: self.authority.sphere().clone(), record: LinkRecord::try_from(name_record)?, republish: false, }) { @@ -350,7 +341,7 @@ where } if let Err(error) = self.name_system_tx.send(NameSystemJob::ResolveSince { - context: self.sphere_context.clone(), + context: self.authority.sphere().clone(), since: push_body.local_base, }) { warn!("Failed to request name system resolutions: {}", error); @@ -366,7 +357,7 @@ where // have added it to the gateway. if let Err(error) = self.syndication_tx.send(SyndicationJob { revision: next_version, - context: self.sphere_context.clone(), + context: self.authority.sphere().clone(), }) { warn!("Failed to queue IPFS syndication job: {}", error); }; diff --git a/rust/noosphere-gateway/src/lib.rs b/rust/noosphere-gateway/src/lib.rs index 528470c9a..fe8dc6dd5 100644 --- a/rust/noosphere-gateway/src/lib.rs +++ b/rust/noosphere-gateway/src/lib.rs @@ -8,12 +8,14 @@ #[macro_use] extern crate tracing; -mod authority; +mod context_manager; mod error; mod extractor; mod gateway; mod handlers; +mod middleware; mod try_or_reset; mod worker; +pub use context_manager::*; pub use gateway::*; diff --git a/rust/noosphere-gateway/src/middleware.rs b/rust/noosphere-gateway/src/middleware.rs new file mode 100644 index 000000000..1146f8780 --- /dev/null +++ b/rust/noosphere-gateway/src/middleware.rs @@ -0,0 +1,245 @@ +use anyhow::Result; +use async_trait::async_trait; +use axum::{ + extract::FromRequestParts, + headers::{authorization::Bearer, Authorization, Host}, + http::{request::Parts, StatusCode}, + TypedHeader, +}; +use noosphere_core::{ + api::headers as noosphere_headers, + authority::{SphereAbility, SphereReference, SPHERE_SEMANTICS, SUPPORTED_KEYS}, + context::{HasMutableSphereContext, COUNTERPART}, + data::Did, +}; +use noosphere_storage::{KeyValueStore, Storage}; +use ucan::{capability::CapabilityView, chain::ProofChain, crypto::did::DidParser}; + +use crate::ContextManager; + +/// Represents the scope of a gateway requests' access to a sphere. +/// Use this in handlers to access a sphere context without authorizing. +/// Otherwise, use [GatewayAuthority] which contains authorization +/// as well as [GatewaySphereContext] functionality. +pub struct GatewaySphereContext { + counterpart: Did, + sphere: C, + marker_m: std::marker::PhantomData, + marker_c: std::marker::PhantomData, + marker_s: std::marker::PhantomData, +} + +impl GatewaySphereContext +where + M: ContextManager, + C: HasMutableSphereContext, + S: Storage + 'static, +{ + pub fn sphere(&self) -> &C { + &self.sphere + } + + pub fn sphere_mut(&mut self) -> &mut C { + &mut self.sphere + } + + pub fn counterpart(&self) -> &Did { + &self.counterpart + } + + pub async fn sphere_context(&self) -> Result { + self.sphere.sphere_context().await + } + + pub async fn sphere_context_mut(&mut self) -> Result { + self.sphere.sphere_context_mut().await + } +} + +#[async_trait] +impl FromRequestParts for GatewaySphereContext +where + State: Send + Sync, + M: ContextManager + 'static, + C: HasMutableSphereContext, + S: Storage + 'static, +{ + type Rejection = StatusCode; + + async fn from_request_parts(parts: &mut Parts, state: &State) -> Result { + let context_manager = parts + .extensions + .get::() + .ok_or_else(|| { + error!("Could not find ContextManager in extensions"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .clone(); + + // Read the counterpart from the hostname, which may not be a did:key, + // in the case of single-serve gateways in the cluster, or a self-hosted + // gateway without subdomain configuration. + // In the event of an invalid counterpart from hostname in single-serve settings, + // the context manager will still retrieve the sphere context, but we need to + // fetch the correct counterpart stored in the sphere DB. + let counterpart = get_counterpart_from_hostname(parts, state).await?; + let sphere = context_manager + .get_sphere_context(&counterpart) + .await + .map_err(map_bad_request)?; + let counterpart = { + let sphere_context = sphere.sphere_context().await.map_err(map_bad_request)?; + println!("Getting counterpart..."); + sphere_context + .db() + .require_key(COUNTERPART) + .await + .map_err(map_bad_request)? + }; + println!("counterpart...{}", counterpart); + + Ok(GatewaySphereContext { + counterpart, + sphere, + marker_m: std::marker::PhantomData, + marker_c: std::marker::PhantomData, + marker_s: std::marker::PhantomData, + }) + } +} + +/// Represents the scope of a gateway request's authorization and sphere +/// access. +/// +/// Embodies the authorization status of the request-maker as it is +/// represented by its `ucan` headers. Any request handler can use a [GatewayAuthority] +/// to test if a required capability is satisfied by the authorization +/// presented by the maker of the request, and provide access to the scoped +/// sphere context. +pub struct GatewayAuthority { + proof_chain: ProofChain, + sphere: GatewaySphereContext, +} + +impl GatewayAuthority +where + M: ContextManager, + C: HasMutableSphereContext, + S: Storage + 'static, +{ + pub fn try_authorize( + &self, + capability: &CapabilityView, + ) -> Result<(), StatusCode> { + let capability_infos = self.proof_chain.reduce_capabilities(&SPHERE_SEMANTICS); + + for capability_info in capability_infos { + trace!("Checking capability: {:?}", capability_info.capability); + if capability_info + .originators + .contains(self.counterpart().as_str()) + && capability_info.capability.enables(capability) + { + debug!("Authorized!"); + return Ok(()); + } + } + + Err(StatusCode::UNAUTHORIZED) + } + + pub fn sphere(&self) -> &C { + self.sphere.sphere() + } + + pub fn sphere_mut(&mut self) -> &mut C { + self.sphere.sphere_mut() + } + + pub fn counterpart(&self) -> &Did { + self.sphere.counterpart() + } + + pub async fn sphere_context(&self) -> Result { + self.sphere.sphere_context().await + } + + pub async fn sphere_context_mut(&mut self) -> Result { + self.sphere.sphere_context_mut().await + } +} + +#[async_trait] +impl FromRequestParts for GatewayAuthority +where + State: Send + Sync, + M: ContextManager + 'static, + C: HasMutableSphereContext, + S: Storage + 'static, +{ + type Rejection = StatusCode; + + async fn from_request_parts(parts: &mut Parts, state: &State) -> Result { + let sphere = GatewaySphereContext::from_request_parts(parts, state) + .await + .map_err(map_bad_request)?; + + let TypedHeader(Authorization(bearer)) = + TypedHeader::>::from_request_parts(parts, state) + .await + .map_err(map_bad_request)?; + + let ucans: noosphere_headers::Ucan = + TypedHeader::::from_request_parts(parts, state) + .await + .map_err(map_bad_request)? + .0; + + let db = { + let sphere_context: C::SphereContext = + sphere.sphere_context().await.map_err(map_bad_request)?; + sphere_context.db().clone() + }; + + let mut did_parser = DidParser::new(SUPPORTED_KEYS); + let proof_chain = ucans + .as_proof_chain(bearer, db, &mut did_parser) + .await + .map_err(map_bad_request)?; + + Ok(GatewayAuthority { + proof_chain, + sphere, + }) + } +} + +fn map_bad_request(error: E) -> StatusCode { + error!("{:?}", error); + StatusCode::BAD_REQUEST +} + +/// Takes arguments from a [FromRequestParts::from_request_parts] call and parses out +/// the counterpart sphere identity from hostname. +/// e.g. `did_key_abc.subconscious.cloud` -> `Did(String::from("did_key_abc"))` +async fn get_counterpart_from_hostname( + parts: &mut Parts, + state: &State, +) -> Result +where + State: Send + Sync, +{ + let host: Host = TypedHeader::::from_request_parts(parts, state) + .await + .map_err(map_bad_request)? + .0 + .into(); + host.hostname() + .split('.') + .nth(0) + .map(|subdomain| Did(String::from(subdomain))) + .ok_or_else(|| { + error!("Could not find sphere identity in subdomain"); + StatusCode::INTERNAL_SERVER_ERROR + }) +} diff --git a/rust/noosphere-gateway/src/worker/cleanup.rs b/rust/noosphere-gateway/src/worker/cleanup.rs index 3396bba87..b8a74042f 100644 --- a/rust/noosphere-gateway/src/worker/cleanup.rs +++ b/rust/noosphere-gateway/src/worker/cleanup.rs @@ -13,7 +13,10 @@ use tokio::{ }; use tokio_stream::StreamExt; -// Once an hour +use crate::ContextManager; + +/// Seconds between finishing all compaction tasks, and +/// starting a new cycle. const PERIODIC_CLEANUP_INTERVAL_SECONDS: u64 = 60 * 60; #[derive(EnumDisplay)] @@ -21,10 +24,11 @@ pub enum CleanupJob { CompactHistory(C), } -pub fn start_cleanup( - gateway_sphere: C, +pub fn start_cleanup( + context_manager: M, ) -> (UnboundedSender>, JoinHandle>) where + M: ContextManager + 'static, C: HasMutableSphereContext + 'static, S: Storage + 'static, { @@ -34,7 +38,7 @@ where tokio::task::spawn(async move { let _ = tokio::join!( cleanup_task(rx), - periodic_compaction_task(tx, gateway_sphere), + periodic_compaction_task(tx, context_manager), ); Ok(()) }) @@ -57,16 +61,27 @@ where Ok(()) } -async fn periodic_compaction_task(tx: UnboundedSender>, gateway_sphere: C) +async fn periodic_compaction_task(tx: UnboundedSender>, context_manager: M) where + M: ContextManager, C: HasMutableSphereContext, S: Storage + 'static, { loop { - if let Err(error) = tx.send(CleanupJob::CompactHistory(gateway_sphere.clone())) { - error!("Periodic compaction failed: {}", error); + let mut stream = context_manager.iter(); + loop { + match stream.try_next().await { + Ok(Some(local_sphere)) => { + if let Err(error) = tx.send(CleanupJob::CompactHistory(local_sphere)) { + error!("Periodic compaction failed: {}", error); + } + } + Ok(None) => break, + Err(error) => { + error!("Could not iterate on managed spheres: {}", error); + } + } } - tokio::time::sleep(Duration::from_secs(PERIODIC_CLEANUP_INTERVAL_SECONDS)).await; } } @@ -187,7 +202,10 @@ mod tests { }; use noosphere_storage::KeyValueStore; - use crate::worker::{start_cleanup, CleanupJob}; + use crate::{ + worker::{start_cleanup, CleanupJob}, + SingleSphereContextManager, + }; #[tokio::test] async fn it_compacts_excess_name_record_changes_in_a_gateway_sphere() -> Result<()> { @@ -227,7 +245,9 @@ mod tests { .collect::>() ); - let (tx, cleanup_worker) = start_cleanup(gateway_sphere_context.clone()); + let context_manager = + SingleSphereContextManager::new(gateway_sphere_context.clone()).await?; + let (tx, cleanup_worker) = start_cleanup(context_manager); wait(1).await; diff --git a/rust/noosphere-gateway/src/worker/mod.rs b/rust/noosphere-gateway/src/worker/mod.rs index 07aab2f0e..ad0f365ae 100644 --- a/rust/noosphere-gateway/src/worker/mod.rs +++ b/rust/noosphere-gateway/src/worker/mod.rs @@ -1,3 +1,6 @@ +//! Task runners that process jobs in a thread, communicating via +//! message channels. + mod cleanup; mod name_system; mod syndication; diff --git a/rust/noosphere-gateway/src/worker/name_system.rs b/rust/noosphere-gateway/src/worker/name_system.rs index a64c4a11b..d71b93a4a 100644 --- a/rust/noosphere-gateway/src/worker/name_system.rs +++ b/rust/noosphere-gateway/src/worker/name_system.rs @@ -1,4 +1,5 @@ use crate::try_or_reset::TryOrReset; +use crate::ContextManager; use anyhow::anyhow; use anyhow::Result; use noosphere_core::{ @@ -31,9 +32,11 @@ use tokio::{ use tokio_stream::{Stream, StreamExt}; use url::Url; +/// How many seconds between publishing all managed sphere +/// records, and the next cycle. const PERIODIC_PUBLISH_INTERVAL_SECONDS: u64 = 5 * 60; -/// How many seconds between queueing up an address -/// to resolve from the name system. +/// How many seconds between resolving managed spheres' +/// address books, and the next cycle. const PERIODIC_RESOLVER_INTERVAL_SECONDS: u64 = 60; pub struct NameSystemConfiguration { @@ -93,11 +96,12 @@ pub enum NameSystemJob { }, } -pub fn start_name_system( +pub fn start_name_system( configuration: NameSystemConfiguration, - local_spheres: Vec, + context_manager: M, ) -> (UnboundedSender>, JoinHandle>) where + M: ContextManager + 'static, C: HasMutableSphereContext + 'static, S: Storage + 'static, { @@ -108,9 +112,9 @@ where tokio::task::spawn(async move { let _ = tokio::join!( - periodic_publisher_task(tx.clone(), local_spheres.clone()), + periodic_publisher_task(tx.clone(), context_manager.clone()), name_system_task(configuration, rx), - periodic_resolver_task(tx, local_spheres) + periodic_resolver_task(tx, context_manager) ); Ok(()) }) @@ -122,16 +126,28 @@ where /// Run once on gateway start and every PERIODIC_PUBLISH_INTERVAL_SECONDS, /// republish all stored link records in gateway spheres that map to /// counterpart managed spheres. -async fn periodic_publisher_task(tx: UnboundedSender>, local_spheres: Vec) +async fn periodic_publisher_task(tx: UnboundedSender>, context_manager: M) where + M: ContextManager, C: HasMutableSphereContext, S: Storage + 'static, { loop { - for local_sphere in local_spheres.iter() { - if let Err(error) = periodic_publish_record(&tx, local_sphere).await { - error!("Could not publish record: {}", error); - }; + let mut stream = context_manager.iter(); + loop { + match stream.try_next().await { + Ok(Some(local_sphere)) => { + if let Err(error) = periodic_publish_record(&tx, &local_sphere).await { + error!("Could not publish record: {}", error); + }; + } + Ok(None) => { + break; + } + Err(error) => { + error!("Could not iterate on managed spheres: {}", error); + } + } } tokio::time::sleep(Duration::from_secs(PERIODIC_PUBLISH_INTERVAL_SECONDS)).await; } @@ -163,21 +179,32 @@ where Ok(()) } -async fn periodic_resolver_task(tx: UnboundedSender>, local_spheres: Vec) +async fn periodic_resolver_task(tx: UnboundedSender>, context_manager: M) where + M: ContextManager, C: HasMutableSphereContext, S: Storage + 'static, { - for sphere in local_spheres.iter().cycle() { - match tx.send(NameSystemJob::ResolveAll { - context: sphere.clone(), - }) { - Ok(_) => (), - Err(error) => { - warn!("Failed to request updated name resolutions: {}", error); + loop { + let mut stream = context_manager.iter(); + loop { + match stream.try_next().await { + Ok(Some(local_sphere)) => match tx.send(NameSystemJob::ResolveAll { + context: local_sphere, + }) { + Ok(_) => (), + Err(error) => { + warn!("Failed to request updated name resolutions: {}", error); + } + }, + Ok(None) => { + break; + } + Err(error) => { + error!("Could not iterate on managed spheres: {}", error); + } } } - tokio::time::sleep(Duration::from_secs(PERIODIC_RESOLVER_INTERVAL_SECONDS)).await; } } diff --git a/rust/noosphere-storage/Cargo.toml b/rust/noosphere-storage/Cargo.toml index b47f66c7f..a901d8464 100644 --- a/rust/noosphere-storage/Cargo.toml +++ b/rust/noosphere-storage/Cargo.toml @@ -29,7 +29,7 @@ ucan = { workspace = true } libipld-core = { workspace = true } libipld-cbor = { workspace = true } serde = { workspace = true } -base64 = "=0.21.4" +base64 = { workspace = true } url = { version = "^2" } [dev-dependencies] diff --git a/rust/noosphere/tests/gateway.rs b/rust/noosphere/tests/gateway.rs index 86f0c4354..61303b5ac 100644 --- a/rust/noosphere/tests/gateway.rs +++ b/rust/noosphere/tests/gateway.rs @@ -11,22 +11,6 @@ extern crate noosphere_gateway_dev as noosphere_gateway; use anyhow::{anyhow, Result}; use noosphere::key::KeyStorage; use noosphere::sphere::SphereContextBuilder; - -use noosphere_core::context::{ - HasMutableSphereContext, HasSphereContext, SphereAuthorityWrite, SphereContentRead, - SphereContentWrite, SphereCursor, SphereSync, -}; -use noosphere_storage::BlockStore; -use std::net::TcpListener; -use tokio::io::AsyncReadExt; -use tokio_stream::StreamExt; -use url::Url; - -use noosphere_core::api::v0alpha1; -use noosphere_core::data::{ContentType, Did}; - -use ucan::crypto::KeyMaterial; - use noosphere_cli::{ commands::{ key::key_create, @@ -34,8 +18,20 @@ use noosphere_cli::{ }, helpers::{temporary_workspace, SpherePair}, }; +use noosphere_core::context::{ + HasMutableSphereContext, HasSphereContext, SphereAuthorityWrite, SphereContentRead, + SphereContentWrite, SphereCursor, SphereSync, +}; +use noosphere_core::data::{ContentType, Did}; use noosphere_core::tracing::initialize_tracing; -use noosphere_gateway::{start_gateway, GatewayScope}; +use noosphere_core::{api::v0alpha1, context::COUNTERPART}; +use noosphere_gateway::{start_gateway, SingleSphereContextManager}; +use noosphere_storage::{BlockStore, KeyValueStore}; +use std::net::TcpListener; +use tokio::io::AsyncReadExt; +use tokio_stream::StreamExt; +use ucan::crypto::KeyMaterial; +use url::Url; #[tokio::test] async fn gateway_tells_you_its_identity() -> Result<()> { @@ -55,22 +51,24 @@ async fn gateway_tells_you_its_identity() -> Result<()> { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let gateway_address = listener.local_addr().unwrap(); - - let gateway_sphere_identity = gateway_workspace.sphere_identity().await.unwrap(); let client_sphere_identity = client_workspace.sphere_identity().await.unwrap(); - - let gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + let mut gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + { + let mut sphere_context = gateway_sphere_context.sphere_context_mut().await?; + sphere_context + .db_mut() + .set_key(COUNTERPART, client_sphere_identity) + .await?; + } + let context_manager = SingleSphereContextManager::new(gateway_sphere_context) + .await + .unwrap(); let server_task = tokio::spawn({ - let gateway_sphere_identity = gateway_sphere_identity.clone(); async move { start_gateway( listener, - GatewayScope { - identity: gateway_sphere_identity, - counterpart: client_sphere_identity, - }, - gateway_sphere_context, + context_manager, Url::parse("http://127.0.0.1:5001").unwrap(), Url::parse("http://127.0.0.1:6667").unwrap(), None, @@ -119,22 +117,24 @@ async fn gateway_identity_can_be_verified_by_the_client_of_its_owner() -> Result let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let gateway_address = listener.local_addr().unwrap(); - - let gateway_sphere_identity = gateway_workspace.sphere_identity().await.unwrap(); let client_sphere_identity = client_workspace.sphere_identity().await.unwrap(); - - let gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + let mut gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + { + let mut sphere_context = gateway_sphere_context.sphere_context_mut().await?; + sphere_context + .db_mut() + .set_key(COUNTERPART, client_sphere_identity) + .await?; + } + let context_manager = SingleSphereContextManager::new(gateway_sphere_context) + .await + .unwrap(); let server_task = tokio::spawn({ - let gateway_sphere_identity = gateway_sphere_identity.clone(); async move { start_gateway( listener, - GatewayScope { - identity: gateway_sphere_identity, - counterpart: client_sphere_identity, - }, - gateway_sphere_context, + context_manager, Url::parse("http://127.0.0.1:5001").unwrap(), Url::parse("http://127.0.0.1:6667").unwrap(), None, @@ -191,22 +191,24 @@ async fn gateway_receives_a_newly_initialized_sphere_from_the_client() -> Result let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let gateway_address = listener.local_addr().unwrap(); - let gateway_sphere_identity = gateway_workspace.sphere_identity().await.unwrap(); let client_sphere_identity = client_workspace.sphere_identity().await.unwrap(); - - let gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + let mut gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + { + let mut sphere_context = gateway_sphere_context.sphere_context_mut().await?; + sphere_context + .db_mut() + .set_key(COUNTERPART, client_sphere_identity) + .await?; + } + let context_manager = SingleSphereContextManager::new(gateway_sphere_context.clone()) + .await + .unwrap(); let server_task = { - let gateway_sphere_context = gateway_sphere_context.clone(); - let client_sphere_identity = client_sphere_identity.clone(); tokio::spawn(async move { start_gateway( listener, - GatewayScope { - identity: gateway_sphere_identity, - counterpart: client_sphere_identity, - }, - gateway_sphere_context, + context_manager, Url::parse("http://127.0.0.1:5001").unwrap(), Url::parse("http://127.0.0.1:6667").unwrap(), None, @@ -283,22 +285,24 @@ async fn gateway_updates_an_existing_sphere_with_changes_from_the_client() -> Re let gateway_identity = gateway_workspace.author().await?.did().await?; - let gateway_sphere_identity = gateway_workspace.sphere_identity().await.unwrap(); let client_sphere_identity = client_workspace.sphere_identity().await.unwrap(); - - let gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + let mut gateway_sphere_context = gateway_workspace.sphere_context().await.unwrap(); + { + let mut sphere_context = gateway_sphere_context.sphere_context_mut().await?; + sphere_context + .db_mut() + .set_key(COUNTERPART, client_sphere_identity) + .await?; + } + let context_manager = SingleSphereContextManager::new(gateway_sphere_context.clone()) + .await + .unwrap(); let server_task = { - let gateway_sphere_context = gateway_sphere_context.clone(); - let client_sphere_identity = client_sphere_identity.clone(); tokio::spawn(async move { start_gateway( listener, - GatewayScope { - identity: gateway_sphere_identity, - counterpart: client_sphere_identity, - }, - gateway_sphere_context, + context_manager, Url::parse("http://127.0.0.1:5001").unwrap(), Url::parse("http://127.0.0.1:6667").unwrap(), None,