Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

feat!: Traverse the Noosphere vast #284

Merged
merged 3 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 91 additions & 146 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions rust/noosphere-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ serde_urlencoded = "~0.7"
tracing = "~0.1"
noosphere-core = { version = "0.7.0", path = "../noosphere-core" }
noosphere-storage = { version = "0.5.0", path = "../noosphere-storage" }
reqwest = { version = "~0.11", default-features = false, features = ["json", "rustls-tls"] }
noosphere-car = { version = "0.1.0", path = "../noosphere-car" }
reqwest = { version = "0.11.15", default-features = false, features = ["json", "rustls-tls", "stream"] }
tokio-stream = "~0.1"
tokio-util = "0.7.7"

ucan = { version = "0.1.0" }
ucan-key-support = { version = "0.1.0" }


libipld-core = "~0.15"
libipld-cbor = "~0.15"

Expand Down
65 changes: 65 additions & 0 deletions rust/noosphere-api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ use crate::{
use anyhow::{anyhow, Result};
use cid::Cid;
use libipld_cbor::DagCborCodec;
use noosphere_car::CarReader;

use noosphere_core::authority::{Author, SphereAction, SphereReference};
use noosphere_storage::{block_deserialize, block_serialize};
use reqwest::{header::HeaderMap, Body, StatusCode};
use tokio_stream::{Stream, StreamExt};
use tokio_util::io::StreamReader;
use ucan::{
builder::UcanBuilder,
capability::{Capability, Resource, With},
Expand Down Expand Up @@ -173,6 +176,68 @@ where
Ok((jwt, ucan_headers))
}

/// Replicate content from Noosphere, streaming its blocks from the
/// configured gateway. If the gateway doesn't have the desired content, it
/// will look it up from other sources such as IPFS if they are available.
/// Note that this means this call can potentially block on upstream
/// access to an IPFS node (which, depending on the node's network
/// configuration and peering status, can be quite slow).
pub async fn replicate(
&self,
memo_version: &Cid,
) -> Result<impl Stream<Item = Result<(Cid, Vec<u8>)>>> {
let url = Url::try_from(RouteUrl::<()>(
&self.api_base,
Route::Replicate(Some(memo_version.clone())),
None,
))?;

debug!("Client replicating memo from {}", url);

let capability = Capability {
with: With::Resource {
kind: Resource::Scoped(SphereReference {
did: self.sphere_identity.clone(),
}),
},
can: SphereAction::Fetch,
};

let (token, ucan_headers) = Self::make_bearer_token(
&self.session.gateway_identity,
&self.author,
&capability,
&self.store,
)
.await?;

let response = self
.client
.get(url)
.bearer_auth(token)
.headers(ucan_headers)
.send()
.await?;

Ok(
CarReader::new(StreamReader::new(response.bytes_stream().map(
|item| match item {
Ok(item) => Ok(item),
Err(error) => {
error!("Failed to read CAR stream: {}", error);
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
},
)))
.await?
.stream()
.map(|block| match block {
Ok(block) => Ok(block),
Err(error) => Err(anyhow!(error)),
}),
)
}

pub async fn fetch(&self, params: &FetchParameters) -> Result<FetchResponse> {
let url = Url::try_from(RouteUrl(&self.api_base, Route::Fetch, Some(params)))?;
debug!("Client fetching blocks from {}", url);
Expand Down
18 changes: 12 additions & 6 deletions rust/noosphere-api/src/route.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Result;
use cid::Cid;
use std::fmt::Display;
use url::{Url};
use url::Url;

use crate::data::AsQuery;

Expand All @@ -12,16 +13,21 @@ pub enum Route {
Publish,
Did,
Identify,
Replicate(Option<Cid>),
}

impl Display for Route {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let fragment = match self {
Route::Fetch => "fetch",
Route::Push => "push",
Route::Publish => "publish",
Route::Did => "did",
Route::Identify => "identify",
Route::Fetch => "fetch".into(),
Route::Push => "push".into(),
Route::Publish => "publish".into(),
Route::Did => "did".into(),
Route::Identify => "identify".into(),
Route::Replicate(cid) => match cid {
Some(cid) => format!("replicate/{}", cid),
None => "replicate/:memo".into(),
},
};

write!(f, "/api/{}/{}", API_VERSION, fragment)
Expand Down
1 change: 1 addition & 0 deletions rust/noosphere-car/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ homepage = "https://github.com/subconsciousnetwork/noosphere"
readme = "README.md"

[dependencies]
anyhow = "1"
cid = "0.9"
futures = "0.3"
integer-encoding = { version = "3", features = ["tokio_async"] }
Expand Down
9 changes: 8 additions & 1 deletion rust/noosphere-car/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,14 @@ mod tests {

use super::*;

#[test]
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::wasm_bindgen_test;

#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), test)]
fn symmetric_header_v1() {
let digest = multihash::Code::Blake2b256.digest(b"test");
let cid = Cid::new_v1(DagCborCodec.into(), digest);
Expand Down
1 change: 1 addition & 0 deletions rust/noosphere-car/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod error;
mod header;
mod reader;
mod util;
mod varint;
mod writer;

pub use crate::header::CarHeader;
Expand Down
14 changes: 13 additions & 1 deletion rust/noosphere-car/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ use crate::{
util::{ld_read, read_node},
};

#[cfg(not(target_arch = "wasm32"))]
pub trait CarReaderSend: Send {}

#[cfg(not(target_arch = "wasm32"))]
impl<S> CarReaderSend for S where S: Send {}

#[cfg(target_arch = "wasm32")]
pub trait CarReaderSend {}

#[cfg(target_arch = "wasm32")]
impl<S> CarReaderSend for S {}

/// Reads CAR files that are in a BufReader
#[derive(Debug)]
pub struct CarReader<R> {
Expand All @@ -18,7 +30,7 @@ pub struct CarReader<R> {

impl<R> CarReader<R>
where
R: AsyncRead + Send + Unpin,
R: AsyncRead + CarReaderSend + Unpin,
{
/// Creates a new CarReader and parses the CarHeader
pub async fn new(mut reader: R) -> Result<Self, Error> {
Expand Down
22 changes: 16 additions & 6 deletions rust/noosphere-car/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use anyhow::Result;
use cid::Cid;
use integer_encoding::VarIntAsyncReader;
use tokio::io::{AsyncRead, AsyncReadExt};

use crate::{reader::CarReaderSend, varint::read_varint_async};

use super::error::Error;

/// Maximum size that is used for single node.
pub(crate) const MAX_ALLOC: usize = 4 * 1024 * 1024;

pub(crate) async fn ld_read<R>(mut reader: R, buf: &mut Vec<u8>) -> Result<Option<&[u8]>, Error>
where
R: AsyncRead + Send + Unpin,
R: AsyncRead + CarReaderSend + Unpin,
{
let length: usize = match VarIntAsyncReader::read_varint_async(&mut reader).await {
let length: usize = match read_varint_async(&mut reader).await {
Ok(len) => len,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
Expand Down Expand Up @@ -41,7 +43,7 @@ pub(crate) async fn read_node<R>(
buf: &mut Vec<u8>,
) -> Result<Option<(Cid, Vec<u8>)>, Error>
where
R: AsyncRead + Send + Unpin,
R: AsyncRead + CarReaderSend + Unpin,
{
if let Some(buf) = ld_read(buf_reader, buf).await? {
let mut cursor = std::io::Cursor::new(buf);
Expand All @@ -60,6 +62,12 @@ mod tests {

use super::*;

#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::wasm_bindgen_test;

#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

async fn ld_write<'a, W>(writer: &mut W, bytes: &[u8]) -> Result<(), Error>
where
W: AsyncWrite + Send + Unpin,
Expand All @@ -70,7 +78,8 @@ mod tests {
Ok(())
}

#[tokio::test]
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn ld_read_write_good() {
let mut buffer = Vec::<u8>::new();
ld_write(&mut buffer, b"test bytes").await.unwrap();
Expand All @@ -81,7 +90,8 @@ mod tests {
assert_eq!(read, b"test bytes");
}

#[tokio::test]
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn ld_read_write_fail() {
let mut buffer = Vec::<u8>::new();
let size = MAX_ALLOC + 1;
Expand Down
89 changes: 89 additions & 0 deletions rust/noosphere-car/src/varint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! This module has been adapted from the [integer_encoding] crate. The
//! constructs here are mostly unchanged, except that the `Send` bound on the
//! async reader has been made optional in the case that we are compiling for
//! Wasm and deploying to a browser.

use std::{
io::{Error as IoError, ErrorKind as IoErrorKind},
mem::size_of,
};

use integer_encoding::VarInt;
use tokio::io::{AsyncRead, AsyncReadExt};

use crate::reader::CarReaderSend;

pub(crate) trait VarIntMaxSize {
fn varint_max_size() -> usize;
}

impl<VI: VarInt> VarIntMaxSize for VI {
fn varint_max_size() -> usize {
(size_of::<VI>() * 8 + 7) / 7
}
}

pub const MSB: u8 = 0b1000_0000;

/// VarIntProcessor encapsulates the logic for decoding a VarInt byte-by-byte.
#[derive(Default)]
pub struct VarIntProcessor {
buf: [u8; 10],
maxsize: usize,
i: usize,
}

impl VarIntProcessor {
fn new<VI: VarIntMaxSize>() -> VarIntProcessor {
VarIntProcessor {
maxsize: VI::varint_max_size(),
..VarIntProcessor::default()
}
}
fn push(&mut self, b: u8) -> Result<(), IoError> {
if self.i >= self.maxsize {
return Err(IoError::new(
IoErrorKind::InvalidData,
"Unterminated varint",
));
}
self.buf[self.i] = b;
self.i += 1;
Ok(())
}
fn finished(&self) -> bool {
self.i > 0 && (self.buf[self.i - 1] & MSB == 0)
}
fn decode<VI: VarInt>(&self) -> Option<VI> {
Some(VI::decode_var(&self.buf[0..self.i])?.0)
}
}

pub async fn read_varint_async<V, R>(reader: &mut R) -> Result<V, std::io::Error>
where
V: VarInt,
R: AsyncRead + CarReaderSend + Unpin,
{
let mut read_buffer = [0 as u8; 1];
let mut p = VarIntProcessor::new::<V>();

while !p.finished() {
let read = reader.read(&mut read_buffer).await?;

// EOF
if read == 0 && p.i == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Reached EOF",
));
}
if read == 0 {
break;
}

p.push(read_buffer[0])?;
}

p.decode()
.ok_or_else(|| IoError::new(IoErrorKind::UnexpectedEof, "Reached EOF"))
}
6 changes: 4 additions & 2 deletions rust/noosphere-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ repository = "https://github.com/subconsciousnetwork/noosphere"
homepage = "https://github.com/subconsciousnetwork/noosphere"
readme = "README.md"

[features]
test_kubo = []

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
reqwest = { version = "~0.11", default-features = false, features = ["json", "rustls-tls"] }
reqwest = { version = "~0.11", default-features = false, features = ["json", "rustls-tls", "stream"] }
noosphere-ns = { version = "0.5.0", path = "../noosphere-ns" }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand All @@ -29,7 +32,6 @@ anyhow = "^1"

tokio = { version = "^1", features = ["full"] }
tokio-stream = "~0.1"
axum = { version = "~0.5", features = ["headers", "macros"] }
tower = "~0.4"
tower-http = { version = "~0.3", features = ["cors", "trace"] }
async-trait = "~0.1"
Expand Down
Loading