Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat: Refactor storage interfaces (#178)
Browse files Browse the repository at this point in the history
The `noosphere-storage` crate is cleaned up and significantly refactored
to support more ergonomic adaptation to other backend storage scenarios.

BREAKING CHANGE: The `StorageProvider` trait has been replaced by the
`Storage` trait. This new trait allows for distinct backing
implementations of `BlockStore` and `KeyValueStore`.

BREAKING CHANGE: A `SphereDb` was previously generic over a `Store`, but
is now generic over the new `Storage` trait.

BREAKING CHANGE: The modules in the `noosphere-storage` crate have been
re-arranged so that they are organized more consistently with the way we
organize modules in other crates.
  • Loading branch information
cdata authored Nov 29, 2022
1 parent e269e04 commit 4db55c4
Show file tree
Hide file tree
Showing 71 changed files with 715 additions and 577 deletions.
2 changes: 1 addition & 1 deletion rust/noosphere-api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use cid::Cid;
use libipld_cbor::DagCborCodec;

use noosphere_core::authority::{Author, SphereAction, SphereReference};
use noosphere_storage::encoding::{block_deserialize, block_serialize};
use noosphere_storage::{block_deserialize, block_serialize};
use reqwest::{header::HeaderMap, Body, StatusCode};
use ucan::{
builder::UcanBuilder,
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-api/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use noosphere_core::{
authority::{SphereAction, SphereReference, SPHERE_SEMANTICS},
data::{Bundle, Did},
};
use noosphere_storage::encoding::{base64_decode, base64_encode};
use noosphere_storage::{base64_decode, base64_encode};
use serde::{Deserialize, Deserializer, Serialize};
use ucan::{
capability::{Capability, Resource, With},
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-cli/src/native/commands/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use noosphere::sphere::GATEWAY_URL;
use noosphere_core::data::Did;
use noosphere_storage::interface::KeyValueStore;
use noosphere_storage::KeyValueStore;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use url::Url;
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-cli/src/native/commands/save.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use cid::Cid;
use libipld_cbor::DagCborCodec;
use noosphere_core::{authority::Author, data::Header};
use noosphere_fs::SphereFs;
use noosphere_storage::{interface::BlockStore, memory::MemoryStore};
use noosphere_storage::{BlockStore, MemoryStore};

use crate::native::workspace::{FileReference, Workspace};

Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-cli/src/native/commands/serve/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use axum::{
use libipld_core::cid::Cid;
use noosphere::sphere::SphereContext;
use noosphere_core::authority::{SphereAction, SphereReference, SPHERE_SEMANTICS};
use noosphere_storage::native::NativeStore;
use noosphere_storage::NativeStorage;

use tokio::sync::Mutex;
use ucan::{capability::Capability, chain::ProofChain, crypto::KeyMaterial, store::UcanJwtStore};
Expand Down Expand Up @@ -78,7 +78,7 @@ where
// Look for the SphereContext
let sphere_context = req
.extensions()
.get::<Arc<Mutex<SphereContext<K, NativeStore>>>>()
.get::<Arc<Mutex<SphereContext<K, NativeStorage>>>>()
.ok_or_else(|| {
error!("Could not find DidParser in extensions");
StatusCode::INTERNAL_SERVER_ERROR
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-cli/src/native/commands/serve/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::{
};
use libipld_cbor::DagCborCodec;
use mime_guess::mime;
use noosphere_storage::encoding::{block_deserialize, block_serialize};
use noosphere_storage::{block_deserialize, block_serialize};
use serde::{de::DeserializeOwned, Serialize};

#[derive(Debug, Clone, Copy, Default)]
Expand Down
6 changes: 3 additions & 3 deletions rust/noosphere-cli/src/native/commands/serve/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use ucan::crypto::KeyMaterial;
use url::Url;

use noosphere_api::route::Route as GatewayRoute;
use noosphere_storage::native::NativeStore;
use noosphere_storage::NativeStorage;

use crate::native::commands::serve::{
ipfs::start_ipfs_syndication,
Expand All @@ -30,7 +30,7 @@ pub struct GatewayScope {
pub async fn start_gateway<K>(
listener: TcpListener,
gateway_scope: GatewayScope,
sphere_context: Arc<Mutex<SphereContext<K, NativeStore>>>,
sphere_context: Arc<Mutex<SphereContext<K, NativeStorage>>>,
ipfs_api: Url,
cors_origin: Option<Url>,
) -> Result<()>
Expand Down Expand Up @@ -65,7 +65,7 @@ where
]);
}

let (syndication_tx, syndication_task) = start_ipfs_syndication::<K, NativeStore>(ipfs_api);
let (syndication_tx, syndication_task) = start_ipfs_syndication::<K, NativeStorage>(ipfs_api);

let app = Router::new()
.route(&GatewayRoute::Did.to_string(), get(did_route::<K>))
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-cli/src/native/commands/serve/ipfs/kubo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ mod tests {
use cid::Cid;
use iroh_car::{CarHeader, CarWriter};
use libipld_cbor::DagCborCodec;
use noosphere_storage::encoding::block_serialize;
use noosphere_storage::block_serialize;
use serde::{Deserialize, Serialize};
use url::Url;

Expand Down
19 changes: 9 additions & 10 deletions rust/noosphere-cli/src/native/commands/serve/ipfs/syndication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use noosphere_core::{
view::{Sphere, Timeline},
};
use noosphere_storage::{
encoding::{block_deserialize, block_serialize},
interface::{BlockStore, KeyValueStore, Store},
BlockStore,
block_deserialize, block_serialize,
KeyValueStore,
Storage,
};
use serde::{Deserialize, Serialize};
use tokio::{
Expand All @@ -37,7 +39,7 @@ use super::KuboClient;
pub struct SyndicationJob<K, S>
where
K: KeyMaterial + Clone + 'static,
S: Store,
S: Storage,
{
/// The revision of the _local_ sphere to discover the _counterpart_ sphere
/// from; the counterpart sphere's revision will need to be derived using
Expand Down Expand Up @@ -68,7 +70,7 @@ pub fn start_ipfs_syndication<K, S>(
)
where
K: KeyMaterial + Clone + 'static,
S: Store + 'static,
S: Storage + 'static,
{
let (tx, rx) = unbounded_channel();

Expand All @@ -81,7 +83,7 @@ async fn ipfs_syndication_task<K, S>(
) -> Result<()>
where
K: KeyMaterial + Clone + 'static,
S: Store,
S: Storage,
{
debug!("Syndicating sphere revisions to IPFS API at {}", ipfs_api);

Expand Down Expand Up @@ -110,10 +112,7 @@ where
let sphere = Sphere::at(&revision, context.db());
let links = sphere.try_get_links().await?;

let counterpart_revision = links
.require(&counterpart_identity.to_string())
.await?
.clone();
let counterpart_revision = *links.require(&counterpart_identity).await?;

let fs = context.fs().await?;

Expand Down Expand Up @@ -163,7 +162,7 @@ where
move |cid| {
let filter = filter.clone();
let kubo_client = kubo_client.clone();
let cid = cid.clone();
let cid = *cid;

async move {
// The Bloom filter probabilistically tells us if we
Expand Down
6 changes: 3 additions & 3 deletions rust/noosphere-cli/src/native/commands/serve/route/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use noosphere_core::{
data::Bundle,
view::Sphere,
};
use noosphere_storage::{db::SphereDb, native::NativeStore};
use noosphere_storage::{SphereDb, NativeStorage};
use tokio::sync::Mutex;
use ucan::{
capability::{Capability, Resource, With},
Expand All @@ -26,7 +26,7 @@ pub async fn fetch_route<K>(
authority: GatewayAuthority<K>,
Query(FetchParameters { since }): Query<FetchParameters>,
Extension(scope): Extension<GatewayScope>,
Extension(sphere_context): Extension<Arc<Mutex<SphereContext<K, NativeStore>>>>,
Extension(sphere_context): Extension<Arc<Mutex<SphereContext<K, NativeStorage>>>>,
) -> Result<impl IntoResponse, StatusCode>
where
K: KeyMaterial + Clone,
Expand Down Expand Up @@ -61,7 +61,7 @@ where
pub async fn generate_fetch_bundle(
scope: &GatewayScope,
since: Option<&Cid>,
db: &SphereDb<NativeStore>,
db: &SphereDb<NativeStorage>,
) -> Result<Option<(Cid, Bundle)>> {
debug!("Resolving latest local sphere version...");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use noosphere::sphere::SphereContext;
use noosphere_api::data::IdentifyResponse;
use noosphere_core::authority::{SphereAction, SphereReference};
use noosphere_storage::native::NativeStore;
use noosphere_storage::NativeStorage;
use tokio::sync::Mutex;
use ucan::{
capability::{Capability, Resource, With},
Expand All @@ -16,7 +16,7 @@ use crate::native::commands::serve::{authority::GatewayAuthority, gateway::Gatew
pub async fn identify_route<K: KeyMaterial + Clone>(
authority: GatewayAuthority<K>,
Extension(scope): Extension<GatewayScope>,
Extension(sphere_context): Extension<Arc<Mutex<SphereContext<K, NativeStore>>>>,
Extension(sphere_context): Extension<Arc<Mutex<SphereContext<K, NativeStorage>>>>,
) -> Result<impl IntoResponse, StatusCode> {
debug!("Invoking identify route...");

Expand Down
23 changes: 12 additions & 11 deletions rust/noosphere-cli/src/native/commands/serve/route/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use axum::{extract::ContentLengthLimit, http::StatusCode, Extension};
use cid::Cid;
use noosphere::sphere::SphereContext;
use noosphere_api::data::{PushBody, PushResponse};
use noosphere_core::authority::{Authorization, SphereAction, SphereReference};
use noosphere_core::data::Bundle;
use noosphere_core::view::{Sphere, SphereMutation, Timeline};
use noosphere_storage::{db::SphereDb, native::NativeStore};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;
use noosphere_core::{
authority::{Authorization, SphereAction, SphereReference},
data::Bundle,
view::{Sphere, SphereMutation, Timeline},
};
use noosphere_storage::{SphereDb, NativeStorage};
use tokio::sync::{mpsc::UnboundedSender, Mutex};
use ucan::capability::{Capability, Resource, With};
use ucan::crypto::KeyMaterial;

Expand All @@ -24,9 +25,9 @@ use crate::native::commands::serve::{
pub async fn push_route<K>(
authority: GatewayAuthority<K>,
ContentLengthLimit(Cbor(push_body)): ContentLengthLimit<Cbor<PushBody>, { 1024 * 5000 }>,
Extension(sphere_context_mutex): Extension<Arc<Mutex<SphereContext<K, NativeStore>>>>,
Extension(sphere_context_mutex): Extension<Arc<Mutex<SphereContext<K, NativeStorage>>>>,
Extension(scope): Extension<GatewayScope>,
Extension(syndication_tx): Extension<UnboundedSender<SyndicationJob<K, NativeStore>>>,
Extension(syndication_tx): Extension<UnboundedSender<SyndicationJob<K, NativeStorage>>>,
) -> Result<Cbor<PushResponse>, StatusCode>
where
K: KeyMaterial + Clone,
Expand Down Expand Up @@ -124,7 +125,7 @@ where
// an explicit publish action. Move this to the publish handler when we
// have added it to the gateway.
if let Err(error) = syndication_tx.send(SyndicationJob {
revision: new_gateway_tip.clone(),
revision: new_gateway_tip,
context: sphere_context_mutex.clone(),
}) {
warn!("Failed to queue IPFS syndication job: {}", error);
Expand All @@ -141,7 +142,7 @@ async fn update_gateway_sphere<K>(
scope: &GatewayScope,
key: &K,
authority: &Authorization,
db: &mut SphereDb<NativeStore>,
db: &mut SphereDb<NativeStorage>,
) -> Result<(Cid, Bundle)>
where
K: KeyMaterial + Send,
Expand Down Expand Up @@ -172,7 +173,7 @@ where

async fn incorporate_lineage(
scope: &GatewayScope,
db: &mut SphereDb<NativeStore>,
db: &mut SphereDb<NativeStorage>,
push_body: &PushBody,
) -> Result<()> {
push_body.blocks.load_into(db).await?;
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-cli/src/native/commands/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::BTreeMap;
use crate::native::Workspace;
use anyhow::Result;
use noosphere_core::data::ContentType;
use noosphere_storage::memory::MemoryStore;
use noosphere_storage::MemoryStore;

pub fn status_section(
name: &str,
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-cli/src/native/commands/sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::native::{commands::serve::tracing::initialize_tracing, workspace::Workspace};
use anyhow::{anyhow, Result};
use noosphere_storage::memory::MemoryStore;
use noosphere_storage::MemoryStore;

pub async fn sync(workspace: &Workspace) -> Result<()> {
initialize_tracing();
Expand Down
8 changes: 3 additions & 5 deletions rust/noosphere-cli/src/native/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use noosphere_core::{
};
use noosphere_fs::SphereFs;
use noosphere_storage::{
db::SphereDb,
interface::{BlockStore, KeyValueStore, Store},
native::NativeStore,
BlockStore, SphereDb, KeyValueStore, NativeStorage, Store,
};
use pathdiff::diff_paths;
use std::{
Expand Down Expand Up @@ -40,7 +38,7 @@ const SPHERE_DIRECTORY: &str = ".sphere";
const NOOSPHERE_DIRECTORY: &str = ".noosphere";
// const STORAGE_DIRECTORY: &str = "storage";

pub type CliSphereContext = SphereContext<Ed25519KeyMaterial, NativeStore>;
pub type CliSphereContext = SphereContext<Ed25519KeyMaterial, NativeStorage>;

/// A delta manifest of changes to the local content space
#[derive(Default)]
Expand Down Expand Up @@ -117,7 +115,7 @@ impl Workspace {
/// Get an owned referenced to the [SphereDb] that backs the local sphere.
/// Note that this will initialize the [SphereContext] if it has not been
/// already.
pub async fn db(&self) -> Result<SphereDb<NativeStore>> {
pub async fn db(&self) -> Result<SphereDb<NativeStorage>> {
let context = self.sphere_context().await?;
let context = context.lock().await;
Ok(context.db().clone())
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-cli/tests/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use anyhow::anyhow;
use noosphere::key::KeyStorage;
use noosphere_storage::interface::BlockStore;
use noosphere_storage::BlockStore;
use std::net::TcpListener;
use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt;
Expand Down
16 changes: 8 additions & 8 deletions rust/noosphere-collections/src/hamt/hamt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use anyhow::Result;
use libipld_cbor::DagCborCodec;

use noosphere_storage::interface::BlockStore;
use noosphere_storage::BlockStore;
use std::borrow::Borrow;
use std::marker::PhantomData;
use std::pin::Pin;
Expand Down Expand Up @@ -43,7 +43,7 @@ impl<S> TargetConditionalSendSync for S {}
///
/// ```
/// use noosphere_collections::hamt::Hamt;
/// use noosphere_storage::memory::MemoryStore;
/// use noosphere_storage::MemoryStore;
///
/// async_std::task::block_on(async {
/// let store = MemoryStore::default();
Expand Down Expand Up @@ -155,7 +155,7 @@ where
///
/// ```
/// use noosphere_collections::hamt::Hamt;
/// use noosphere_storage::memory::MemoryStore;
/// use noosphere_storage::MemoryStore;
///
/// async_std::task::block_on(async {
/// let store = MemoryStore::default();
Expand Down Expand Up @@ -185,7 +185,7 @@ where
///
/// ```
/// use noosphere_collections::hamt::Hamt;
/// use noosphere_storage::memory::MemoryStore;
/// use noosphere_storage::MemoryStore;
///
/// async_std::task::block_on(async {
/// let store = MemoryStore::default();
Expand Down Expand Up @@ -223,7 +223,7 @@ where
///
/// ```
/// use noosphere_collections::hamt::Hamt;
/// use noosphere_storage::memory::MemoryStore;
/// use noosphere_storage::MemoryStore;
///
/// async_std::task::block_on(async {
/// let store = MemoryStore::default();
Expand Down Expand Up @@ -261,7 +261,7 @@ where
///
/// ```
/// use noosphere_collections::hamt::Hamt;
/// use noosphere_storage::memory::MemoryStore;
/// use noosphere_storage::MemoryStore;
///
/// async_std::task::block_on(async {
/// let store = MemoryStore::default();
Expand Down Expand Up @@ -296,7 +296,7 @@ where
///
/// ```
/// use noosphere_collections::hamt::Hamt;
/// use noosphere_storage::memory::MemoryStore;
/// use noosphere_storage::MemoryStore;
///
/// async_std::task::block_on(async {
/// let store = MemoryStore::default();
Expand Down Expand Up @@ -336,7 +336,7 @@ where
///
/// ```
/// use noosphere_collections::hamt::Hamt;
/// use noosphere_storage::memory::MemoryStore;
/// use noosphere_storage::MemoryStore;
///
/// async_std::task::block_on(async {
/// let store = MemoryStore::default();
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-collections/src/hamt/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use async_recursion::async_recursion;
use async_stream::try_stream;
use libipld_cbor::DagCborCodec;

use noosphere_storage::interface::BlockStore;
use noosphere_storage::BlockStore;
use std::borrow::Borrow;
use std::fmt::Debug;
use std::marker::PhantomData;
Expand Down
Loading

0 comments on commit 4db55c4

Please sign in to comment.