Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
fix: Recovery only uses latest version of sphere (#703)
Browse files Browse the repository at this point in the history
* fix: Recovery only uses latest version of sphere

* chore: "Distributed" integration test for recovery

* chore: Make new test pass
  • Loading branch information
cdata authored Nov 1, 2023
1 parent eaee1d9 commit 500bd69
Show file tree
Hide file tree
Showing 12 changed files with 372 additions and 95 deletions.
76 changes: 49 additions & 27 deletions rust/noosphere-core/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::str::FromStr;

use crate::{
api::{route::RouteUrl, v0alpha1, v0alpha2},
error::NoosphereError,
stream::{from_car_stream, memo_history_stream, put_block_stream, to_car_stream},
};

Expand Down Expand Up @@ -33,6 +34,8 @@ use url::Url;
#[cfg(doc)]
use crate::data::Did;

use super::v0alpha1::ReplicationMode;

/// A [Client] is a simple, portable HTTP client for the Noosphere gateway REST
/// API. It embodies the intended usage of the REST API, which includes an
/// opening handshake (with associated key verification) and various
Expand Down Expand Up @@ -204,23 +207,35 @@ where
}

/// Replicate content from Noosphere, streaming its blocks from the
/// configured gateway. If the gateway doesn't have the desired content, it
/// will look it up from other sources such as IPFS if they are available.
/// Note that this means this call can potentially block on upstream
/// access to an IPFS node (which, depending on the node's network
/// configuration and peering status, can be quite slow).
pub async fn replicate(
/// configured gateway.
///
/// If [v0alpha1::ReplicateParameters] are specified, then the replication
/// will represent incremental history going back to the `since` version.
///
/// Otherwise, the full [crate::data::SphereIpld] will be replicated
/// (excluding any history).
///
/// If the gateway doesn't have the desired content, it will look it up from
/// other sources such as IPFS if they are available. Note that this means
/// this call can potentially block on upstream access to an IPFS node
/// (which, depending on the node's network configuration and peering
/// status, can be quite slow).
pub async fn replicate<R>(
&self,
memo_version: &Cid,
mode: R,
params: Option<&v0alpha1::ReplicateParameters>,
) -> Result<impl Stream<Item = Result<(Cid, Vec<u8>)>>> {
) -> Result<(Cid, impl Stream<Item = Result<(Cid, Vec<u8>)>>)>
where
R: Into<ReplicationMode>,
{
let mode: ReplicationMode = mode.into();
let url = Url::try_from(RouteUrl(
&self.api_base,
v0alpha1::Route::Replicate(Some(*memo_version)),
v0alpha1::Route::Replicate(Some(mode.clone())),
params,
))?;

debug!("Client replicating {} from {}", memo_version, url);
debug!("Client replicating {} from {}", mode, url);

let capability = generate_capability(&self.sphere_identity, SphereAbility::Fetch);

Expand All @@ -240,26 +255,33 @@ where
.send()
.await?;

Ok(
CarReader::new(StreamReader::new(response.bytes_stream().map(
|item| match item {
Ok(item) => Ok(item),
Err(error) => {
error!("Failed to read CAR stream: {}", error);
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
},
)))
.await?
.stream()
.map(|block| match block {
Ok(block) => Ok(block),
let reader = CarReader::new(StreamReader::new(response.bytes_stream().map(
|item| match item {
Ok(item) => Ok(item),
Err(error) => {
warn!("Replication stream ended prematurely");
Err(anyhow!(error))
error!("Failed to read CAR stream: {}", error);
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
},
)))
.await?;

let root = reader.header().roots().first().cloned().ok_or_else(|| {
anyhow!(NoosphereError::UnexpectedGatewayResponse(
"Missing replication root".into()
))
})?;

Ok((
root,
reader.stream().map(|block| match block {
Ok(block) => Ok(block),
Err(error) => Err(anyhow!(NoosphereError::UnexpectedGatewayResponse(format!(
"Replication stream ended prematurely: {}",
error
)))),
}),
)
))
}

/// Fetch the latest, canonical history of the client's sphere from the
Expand Down
59 changes: 56 additions & 3 deletions rust/noosphere-core/src/api/v0alpha1/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,71 @@ use ucan::{
Ucan,
};

/// The query parameters expected for the "replicate" API route
/// The query parameters expected for the "replicate" API route.
#[derive(Debug, Serialize, Deserialize)]
pub struct ReplicateParameters {
/// This is the last revision of the content that is being fetched that is
/// already fully available to the caller of the API
/// already fully available to the caller of the API.
#[serde(default, deserialize_with = "empty_string_as_none")]
pub since: Option<Link<MemoIpld>>,

/// If true, all content in the sphere's content space as of the associated
/// version will be replicated along with the sphere itself. If this field
/// is used without a specific `since`, then the replication request is
/// assumed to be for the whole of a single version of a sphere (and not its
/// history).
#[serde(default)]
pub include_content: bool,
}

impl AsQuery for ReplicateParameters {
fn as_query(&self) -> Result<Option<String>> {
Ok(self.since.as_ref().map(|since| format!("since={since}")))
let mut params = Vec::new();
if let Some(since) = self.since {
params.push(format!("since={since}"));
}
if self.include_content {
params.push(String::from("include_content=true"))
}

let query = if !params.is_empty() {
Some(params.join("&"))
} else {
None
};

Ok(query)
}
}

/// Allowed types in the route fragment for selecting a replication target.
#[derive(Clone)]
pub enum ReplicationMode {
/// Replicate by [Cid]; the specific version will be replicated
Cid(Cid),
/// Replicate by [Did]; gives up authority to the gateway to decide what
/// version ought to be replicated
Did(Did),
}

impl From<Cid> for ReplicationMode {
fn from(value: Cid) -> Self {
ReplicationMode::Cid(value)
}
}

impl From<Did> for ReplicationMode {
fn from(value: Did) -> Self {
ReplicationMode::Did(value)
}
}

impl Display for ReplicationMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ReplicationMode::Cid(cid) => Display::fmt(cid, f),
ReplicationMode::Did(did) => Display::fmt(did, f),
}
}
}

Expand Down
11 changes: 6 additions & 5 deletions rust/noosphere-core/src/api/v0alpha1/route.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::api::route::RouteSignature;
use crate::route_display;
use cid::Cid;

use super::ReplicationMode;

/// The version of the API represented by this module
pub const API_VERSION: &str = "v0alpha1";
Expand All @@ -16,7 +17,7 @@ pub enum Route {
/// Get a signed verification of the gateway's credentials
Identify,
/// Replicate content from the broader Noosphere network
Replicate(Option<Cid>),
Replicate(Option<ReplicationMode>),
}

route_display!(Route);
Expand All @@ -28,9 +29,9 @@ impl RouteSignature for Route {
Route::Push => "push".into(),
Route::Did => "did".into(),
Route::Identify => "identify".into(),
Route::Replicate(cid) => match cid {
Some(cid) => format!("replicate/{cid}"),
None => "replicate/:memo".into(),
Route::Replicate(mode) => match mode {
Some(mode) => format!("replicate/{mode}"),
None => "replicate/:link_or_did".into(),
},
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/noosphere-core/src/context/content/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ where
C: HasSphereContext<S>,
S: Storage + 'static,
{
#[instrument(level = "debug", skip(self))]
async fn read(&self, slug: &str) -> Result<Option<SphereFile<Box<dyn AsyncFileBody>>>> {
let revision = self.version().await?;
let sphere = self.to_sphere().await?;
Expand Down
5 changes: 3 additions & 2 deletions rust/noosphere-core/src/context/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,14 @@ where
async move {
let replicate_parameters = since.as_ref().map(|since| ReplicateParameters {
since: Some(*since),
include_content: false,
});
let (db, client) = {
let sphere_context = cursor.sphere_context().await?;
(sphere_context.db().clone(), sphere_context.client().await?)
};
let stream = client
.replicate(&version, replicate_parameters.as_ref())
let (_, stream) = client
.replicate(*version, replicate_parameters.as_ref())
.await?;

tokio::pin!(stream);
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-core/src/context/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ where
// "read-only" context. Technically this should be acceptable
// because our mutation here is propagating immutable blocks
// into the local DB
let stream = client.replicate(&memo_link, None).await?;
let (_, stream) = client.replicate(*memo_link, None).await?;

put_block_stream(db.clone(), stream).await?;
}
Expand Down
4 changes: 4 additions & 0 deletions rust/noosphere-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pub enum NoosphereError {
#[allow(missing_docs)]
#[error("The provided authorization {0} is invalid: {1}")]
InvalidAuthorization(Authorization, String),

#[allow(missing_docs)]
#[error("The gateway gave an unexpected or malformed response. {0}")]
UnexpectedGatewayResponse(String),
}

impl From<anyhow::Error> for NoosphereError {
Expand Down
Loading

0 comments on commit 500bd69

Please sign in to comment.