Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
fix!: Enable incremental sphere replication (#409)
Browse files Browse the repository at this point in the history
- Most use of `Cid` where it refers to a sphere has been refactored as
   `Link<MemoIpld>`
 - Gateway Fetch API block `Bundle` is replaced with a CAR stream
 - Peer `LinkRecord` proofs are replicated along with the rest of a
   given sphere's blocks
 - `MemoIpld` now records a Lamport timestamp to establish a causal
   order between related versions of its body content
 - `MemoIpld` signatures previously referred to a proof by CID, but now
   inline an invoked UCAN JWT
 - Traversal logic is moved out of `SphereContext`, and the core of it
   is defined as part of `Sphere`; additional business logic pertaining
   to replication is implemented as a trait over `HasSphereContext`
 - Sync now has a configurable retry behavior to cover edge cases when
   the gateway sphere changes in the time between a fetch and a push (a
   condition that results in a recoverable "conflict" error on the
   client)
  • Loading branch information
cdata authored Jun 8, 2023
1 parent 65da3d6 commit 8812a1e
Show file tree
Hide file tree
Showing 77 changed files with 3,364 additions and 1,960 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@
"async-trait": [
"async_trait"
]
}
},
"rust-analyzer.cargo.features": [
"test_kubo"
]
}
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ tracing = { version = "0.1" }
tracing-subscriber = { version = "~0.3.16", features = ["env-filter", "tracing-log"] }
thiserror = { version = "1" }
gloo-timers = { version = "0.2", features = ["futures"] }
ucan = { version = "0.3.0" }
ucan-key-support = { version = "0.1.4" }
ucan = { version = "0.3.2" }
ucan-key-support = { version = "0.1.6" }
libipld = { version = "0.16" }
libipld-core = { version = "0.16" }
libipld-cbor = { version = "0.16" }
Expand Down
100 changes: 75 additions & 25 deletions rust/noosphere-api/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::str::FromStr;

use crate::{
data::{FetchParameters, FetchResponse, IdentifyResponse, PushBody, PushResponse},
data::{
FetchParameters, IdentifyResponse, PushBody, PushError, PushResponse, ReplicateParameters,
},
route::{Route, RouteUrl},
};

Expand All @@ -10,7 +12,10 @@ use cid::Cid;
use libipld_cbor::DagCborCodec;
use noosphere_car::CarReader;

use noosphere_core::authority::{Author, SphereAction, SphereReference};
use noosphere_core::{
authority::{Author, SphereAction, SphereReference},
data::{Link, MemoIpld},
};
use noosphere_storage::{block_deserialize, block_serialize};
use reqwest::{header::HeaderMap, Body, StatusCode};
use tokio_stream::{Stream, StreamExt};
Expand Down Expand Up @@ -136,19 +141,25 @@ where

match authorization.resolve_ucan(store).await {
Ok(ucan) => {
// TODO(ucan-wg/rs-ucan#37): We should integrate a helper for this kind of stuff into rs-ucan
let mut proofs_to_search: Vec<String> = ucan.proofs().clone();
if let Some(ucan_proofs) = ucan.proofs() {
// TODO(ucan-wg/rs-ucan#37): We should integrate a helper for this kind of stuff into rs-ucan
let mut proofs_to_search: Vec<String> = ucan_proofs.clone();

debug!("Making bearer token... {:?}", proofs_to_search);

while let Some(cid_string) = proofs_to_search.pop() {
let cid = Cid::from_str(cid_string.as_str())?;
let jwt = store.require_token(&cid).await?;
let ucan = Ucan::from_str(&jwt)?;

debug!("Making bearer token... {:?}", proofs_to_search);
while let Some(cid_string) = proofs_to_search.pop() {
let cid = Cid::from_str(cid_string.as_str())?;
let jwt = store.require_token(&cid).await?;
let ucan = Ucan::from_str(&jwt)?;
debug!("Adding UCAN header for {}", cid);

debug!("Adding UCAN header for {}", cid);
if let Some(ucan_proofs) = ucan.proofs() {
proofs_to_search.extend(ucan_proofs.clone().into_iter());
}

proofs_to_search.extend(ucan.proofs().clone().into_iter());
ucan_headers.append("ucan", format!("{cid} {jwt}").parse()?);
ucan_headers.append("ucan", format!("{cid} {jwt}").parse()?);
}
}

ucan_headers.append(
Expand Down Expand Up @@ -185,14 +196,15 @@ where
pub async fn replicate(
&self,
memo_version: &Cid,
params: Option<&ReplicateParameters>,
) -> Result<impl Stream<Item = Result<(Cid, Vec<u8>)>>> {
let url = Url::try_from(RouteUrl::<()>(
let url = Url::try_from(RouteUrl(
&self.api_base,
Route::Replicate(Some(*memo_version)),
None,
params,
))?;

debug!("Client replicating memo from {}", url);
debug!("Client replicating {} from {}", memo_version, url);

let capability = Capability {
with: With::Resource {
Expand Down Expand Up @@ -238,9 +250,14 @@ where
)
}

pub async fn fetch(&self, params: &FetchParameters) -> Result<FetchResponse> {
pub async fn fetch(
&self,
params: &FetchParameters,
) -> Result<Option<(Link<MemoIpld>, impl Stream<Item = Result<(Cid, Vec<u8>)>>)>> {
let url = Url::try_from(RouteUrl(&self.api_base, Route::Fetch, Some(params)))?;

debug!("Client fetching blocks from {}", url);

let capability = Capability {
with: With::Resource {
kind: Resource::Scoped(SphereReference {
Expand All @@ -258,20 +275,45 @@ where
)
.await?;

let bytes = self
let response = self
.client
.get(url)
.bearer_auth(token)
.headers(ucan_headers)
.send()
.await?
.bytes()
.await?;

block_deserialize::<DagCborCodec, _>(&bytes)
let reader = CarReader::new(StreamReader::new(response.bytes_stream().map(
|item| match item {
Ok(item) => Ok(item),
Err(error) => {
error!("Failed to read CAR stream: {}", error);
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
},
)))
.await?;

let tip = reader.header().roots().first().cloned();

if let Some(tip) = tip {
Ok(match tip.codec() {
// Identity codec = no changes
0 => None,
_ => Some((
tip.into(),
reader.stream().map(|block| match block {
Ok(block) => Ok(block),
Err(error) => Err(anyhow!(error)),
}),
)),
})
} else {
Ok(None)
}
}

pub async fn push(&self, push_body: &PushBody) -> Result<PushResponse> {
pub async fn push(&self, push_body: &PushBody) -> Result<PushResponse, PushError> {
let url = Url::try_from(RouteUrl::<()>(&self.api_base, Route::Push, None))?;
debug!(
"Client pushing {} blocks for sphere {} to {}",
Expand All @@ -298,18 +340,26 @@ where

let (_, push_body_bytes) = block_serialize::<DagCborCodec, _>(push_body)?;

let bytes = self
let response = self
.client
.put(url)
.bearer_auth(token)
.headers(ucan_headers)
.header("Content-Type", "application/octet-stream")
.body(Body::from(push_body_bytes))
.send()
.await?
.await
.map_err(|err| PushError::Internal(anyhow!(err)))?;

if response.status() == StatusCode::CONFLICT {
return Err(PushError::Conflict);
}

let bytes = response
.bytes()
.await?;
.await
.map_err(|err| PushError::Internal(anyhow!(err)))?;

block_deserialize::<DagCborCodec, _>(bytes.as_ref())
Ok(block_deserialize::<DagCborCodec, _>(bytes.as_ref())?)
}
}
33 changes: 23 additions & 10 deletions rust/noosphere-api/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::{anyhow, Result};
use cid::Cid;
use noosphere_core::{
authority::{SphereAction, SphereReference, SPHERE_SEMANTICS},
data::{Bundle, Did, Jwt},
data::{Bundle, Did, Jwt, Link, MemoIpld},
};
use noosphere_storage::{base64_decode, base64_encode};
use reqwest::StatusCode;
Expand Down Expand Up @@ -45,13 +45,28 @@ where
}
}

/// The parameters expected for the "fetch" API route
/// The query parameters expected for the "replicate" API route
#[derive(Debug, Serialize, Deserialize)]
pub struct ReplicateParameters {
/// This is the last revision of the content that is being fetched that is
/// already fully available to the caller of the API
#[serde(default, deserialize_with = "empty_string_as_none")]
pub since: Option<Link<MemoIpld>>,
}

impl AsQuery for ReplicateParameters {
fn as_query(&self) -> Result<Option<String>> {
Ok(self.since.as_ref().map(|since| format!("since={since}")))
}
}

/// The query parameters expected for the "fetch" API route
#[derive(Debug, Serialize, Deserialize)]
pub struct FetchParameters {
/// This is the last revision of the "counterpart" sphere that is managed
/// by the API host that the client is fetching from
#[serde(default, deserialize_with = "empty_string_as_none")]
pub since: Option<Cid>,
pub since: Option<Link<MemoIpld>>,
}

impl AsQuery for FetchParameters {
Expand All @@ -69,10 +84,6 @@ pub enum FetchResponse {
/// The tip of the "counterpart" sphere that is managed by the API host
/// that the client is fetching from
tip: Cid,
/// All the new blocks of the "counterpart" sphere as well as the new
/// blocks of the local sphere that correspond to remote changes from
/// other clients
blocks: Bundle,
},
/// There are no new revisions since the revision specified in the initial
/// fetch request
Expand All @@ -86,9 +97,11 @@ pub struct PushBody {
pub sphere: Did,
/// The base revision represented by the payload being pushed; if the
/// entire history is being pushed, then this should be None
pub base: Option<Cid>,
pub local_base: Option<Link<MemoIpld>>,
/// The tip of the history represented by the payload being pushed
pub tip: Cid,
pub local_tip: Link<MemoIpld>,
/// The last received tip of the counterpart sphere
pub counterpart_tip: Option<Link<MemoIpld>>,
/// A bundle of all the blocks needed to hydrate the revisions from the
/// base to the tip of history as represented by this payload
pub blocks: Bundle,
Expand All @@ -106,7 +119,7 @@ pub enum PushResponse {
/// at least one revision ahead of the latest revision being tracked
/// by the client (because it points to the newly received tip of the
/// local sphere's history)
new_tip: Cid,
new_tip: Link<MemoIpld>,
/// The blocks needed to hydrate the revisions of the "counterpart"
/// sphere history to the tip represented in this response
blocks: Bundle,
Expand Down
15 changes: 10 additions & 5 deletions rust/noosphere-car/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@ where
}
}

/// Writes header and stream of data to writer in Car format.
pub async fn write<T>(&mut self, cid: Cid, data: T) -> Result<(), Error>
where
T: AsRef<[u8]>,
{
pub async fn write_header(&mut self) -> Result<(), Error> {
if !self.is_header_written {
// Write header bytes
let header_bytes = self.header.encode()?;
self.writer.write_varint_async(header_bytes.len()).await?;
self.writer.write_all(&header_bytes).await?;
self.is_header_written = true;
}
Ok(())
}

/// Writes header and stream of data to writer in Car format.
pub async fn write<T>(&mut self, cid: Cid, data: T) -> Result<(), Error>
where
T: AsRef<[u8]>,
{
self.write_header().await?;

// Write the given block.
self.cid_buffer.clear();
Expand Down
8 changes: 4 additions & 4 deletions rust/noosphere-cli/src/native/commands/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub async fn auth_add(did: &str, name: Option<String>, workspace: &Workspace) ->
let mut db = workspace.db().await?;

let latest_sphere_cid = db.require_version(&sphere_did).await?;
let sphere = Sphere::at(&latest_sphere_cid, &db);
let sphere = Sphere::at(&latest_sphere_cid.into(), &db);

let authority = sphere.get_authority().await?;
let delegations = authority.get_delegations().await?;
Expand Down Expand Up @@ -106,7 +106,7 @@ You will be able to add a new one after the old one is revoked"#,

let delegation = DelegationIpld::register(&name, &jwt, &db).await?;

let sphere = Sphere::at(&latest_sphere_cid, &db);
let sphere = Sphere::at(&latest_sphere_cid.into(), &db);

let mut mutation = SphereMutation::new(&my_did);

Expand Down Expand Up @@ -146,7 +146,7 @@ pub async fn auth_list(as_json: bool, workspace: &Workspace) -> Result<()> {
.await?
.ok_or_else(|| anyhow!("Sphere version pointer is missing or corrupted"))?;

let sphere = Sphere::at(&latest_sphere_cid, &db);
let sphere = Sphere::at(&latest_sphere_cid.into(), &db);

let authorization = sphere.get_authority().await?;

Expand Down Expand Up @@ -204,7 +204,7 @@ pub async fn auth_revoke(name: &str, workspace: &Workspace) -> Result<()> {
let my_key = workspace.key().await?;
let my_did = my_key.get_did().await?;

let sphere = Sphere::at(&latest_sphere_cid, &db);
let sphere = Sphere::at(&latest_sphere_cid.into(), &db);

let authority = sphere.get_authority().await?;

Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-cli/src/native/commands/save.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn save(workspace: &Workspace) -> Result<()> {
.map(|extension| vec![(Header::FileExtension.to_string(), extension.clone())]);

sphere_context
.link(slug, &content_type.to_string(), cid, headers)
.link(slug, content_type, cid, headers)
.await?;
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-cli/src/native/commands/sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::native::workspace::Workspace;
use anyhow::{anyhow, Result};
use noosphere_sphere::SphereSync;
use noosphere_sphere::{SphereSync, SyncRecovery};
use noosphere_storage::MemoryStore;

pub async fn sync(workspace: &Workspace) -> Result<()> {
Expand All @@ -22,7 +22,7 @@ pub async fn sync(workspace: &Workspace) -> Result<()> {

{
let mut context = workspace.sphere_context().await?;
context.sync().await?;
context.sync(SyncRecovery::None).await?;
}

info!("Sync complete, rendering updated workspace...");
Expand Down
Loading

0 comments on commit 8812a1e

Please sign in to comment.