Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat: Sphere writes do not block immutable reads
Browse files Browse the repository at this point in the history
  • Loading branch information
cdata committed Apr 14, 2023
1 parent 6209e75 commit 90b7163
Show file tree
Hide file tree
Showing 15 changed files with 316 additions and 107 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ resolver = "2"
[workspace.dependencies]
subtext = { version = "0.3.4" }
tracing = { version = "0.1" }
tracing-subscriber = { version = "~0.3", features = ["env-filter", "tracing-log"] }
thiserror = { version = "^1.0.38" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "tracing-log"] }
thiserror = { version = "1" }
instant = { version = "0.1" }
gloo-timers = { version = "0.2", features = ["futures"] }

[profile.release]
opt-level = 'z'
Expand Down
9 changes: 7 additions & 2 deletions rust/noosphere-core/src/view/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ucan::crypto::KeyMaterial;
use crate::{
authority::Authorization,
data::{
ChangelogIpld, DelegationIpld, IdentityIpld, Jwt, Link, MapOperation, MemoIpld,
ChangelogIpld, DelegationIpld, Did, IdentityIpld, Jwt, Link, MapOperation, MemoIpld,
RevocationIpld, VersionedMapKey, VersionedMapValue,
},
};
Expand Down Expand Up @@ -49,7 +49,7 @@ impl<S: BlockStore> SphereRevision<S> {
/// [SphereRevision], which may then be signed.
#[derive(Debug)]
pub struct SphereMutation {
did: String,
did: Did,
content: ContentMutation,
identities: IdentitiesMutation,
delegations: DelegationsMutation,
Expand All @@ -67,6 +67,11 @@ impl<'a> SphereMutation {
}
}

/// Get the identity of the author of this mutation
pub fn author(&self) -> &Did {
&self.did
}

/// Reset the state of the [SphereMutation], so that it may be re-used
/// without being recreated. This is sometimes useful if the code that is
/// working with the [SphereMutation] does not have sufficient information
Expand Down
37 changes: 24 additions & 13 deletions rust/noosphere-sphere/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,25 @@ where
mutation: SphereMutation,
}

impl<K, S> Clone for SphereContext<K, S>
where
K: KeyMaterial + Clone + 'static,
S: Storage,
{
fn clone(&self) -> Self {
Self {
sphere_identity: self.sphere_identity.clone(),
origin_sphere_identity: self.origin_sphere_identity.clone(),
author: self.author.clone(),
access: OnceCell::new(),
db: self.db.clone(),
did_parser: DidParser::new(SUPPORTED_KEYS),
client: self.client.clone(),
mutation: SphereMutation::new(self.mutation.author()),
}
}
}

impl<K, S> SphereContext<K, S>
where
K: KeyMaterial + Clone + 'static,
Expand Down Expand Up @@ -78,7 +97,7 @@ where
/// front. So, if the sequence is "gold", "cat", "bob", it will traverse to
/// bob, then to bob's cat, then to bob's cat's gold.
pub async fn traverse_by_petnames(
&mut self,
&self,
petname_path: &[String],
) -> Result<Option<SphereContext<K, S>>> {
let mut sphere_context: Option<Self> = None;
Expand All @@ -87,7 +106,7 @@ where
while let Some(petname) = path.pop() {
let next_sphere_context = match sphere_context {
None => self.traverse_by_petname(&petname).await?,
Some(mut sphere_context) => sphere_context.traverse_by_petname(&petname).await?,
Some(sphere_context) => sphere_context.traverse_by_petname(&petname).await?,
};
sphere_context = match next_sphere_context {
any @ Some(_) => any,
Expand All @@ -105,10 +124,7 @@ where
/// sphere being traversed to is not available, an attempt will be made to
/// replicate the data from a Noosphere Gateway.
#[instrument(level = "debug", skip(self))]
pub async fn traverse_by_petname(
&mut self,
petname: &str,
) -> Result<Option<SphereContext<K, S>>> {
pub async fn traverse_by_petname(&self, petname: &str) -> Result<Option<SphereContext<K, S>>> {
// Resolve petname to sphere version via address book entry

let identity = match self
Expand Down Expand Up @@ -196,6 +212,7 @@ where
};

// If no version available or memo/body missing, replicate from gateway
let mut db = self.db.clone();

if should_replicate_from_gateway {
debug!("Attempting to replicate from gateway...");
Expand All @@ -205,16 +222,10 @@ where
tokio::pin!(stream);

while let Some((cid, block)) = stream.try_next().await? {
self.db_mut().put_block(&cid, &block).await?;
db.put_block(&cid, &block).await?;
}
}

// Update the version in local sphere DB

self.db_mut()
.set_version(&identity.did, &resolved_version)
.await?;

// Initialize a `SphereContext` with the same author and sphere DB as
// this one, but referring to the resolved sphere DID, and return it

Expand Down
4 changes: 4 additions & 0 deletions rust/noosphere/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ tempfile = "^3"
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "~0.3"
witty-phrase-generator = "~0.2"
instant = { workspace = true, features = ["wasm-bindgen", "stdweb"] }
gloo-timers = { workspace = true }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies.js-sys]
69 changes: 27 additions & 42 deletions rust/noosphere/src/ffi/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::{

use crate::{
ffi::{NsError, NsHeaders, NsNoosphere, TryOrInitialize},
platform::{PlatformKeyMaterial, PlatformStorage},
platform::{PlatformKeyMaterial, PlatformSphereChannel, PlatformStorage},
};

use noosphere_sphere::{
Expand All @@ -29,32 +29,18 @@ use noosphere_sphere::{
///
/// An opaque struct representing a sphere.
pub struct NsSphere {
inner: SphereCursor<
Arc<Mutex<SphereContext<PlatformKeyMaterial, PlatformStorage>>>,
PlatformKeyMaterial,
PlatformStorage,
>,
inner: PlatformSphereChannel,
}

impl NsSphere {
pub fn inner(
&self,
) -> &SphereCursor<
Arc<Mutex<SphereContext<PlatformKeyMaterial, PlatformStorage>>>,
PlatformKeyMaterial,
PlatformStorage,
> {
&self.inner
pub fn inner(&self) -> &Arc<SphereContext<PlatformKeyMaterial, PlatformStorage>> {
self.inner.immutable()
}

pub fn inner_mut(
&mut self,
) -> &mut SphereCursor<
Arc<Mutex<SphereContext<PlatformKeyMaterial, PlatformStorage>>>,
PlatformKeyMaterial,
PlatformStorage,
> {
&mut self.inner
) -> &mut Arc<Mutex<SphereContext<PlatformKeyMaterial, PlatformStorage>>> {
self.inner.mutable()
}
}

Expand Down Expand Up @@ -93,14 +79,15 @@ pub fn ns_sphere_open(
) -> Option<repr_c::Box<NsSphere>> {
error_out.try_or_initialize(|| {
let fs = noosphere.async_runtime().block_on(async {
let sphere_context = noosphere
let sphere_channel = noosphere
.inner()
.get_sphere_context(&Did(sphere_identity.to_str().into()))
.get_sphere_channel(&Did(sphere_identity.to_str().into()))
.await?;

let cursor = SphereCursor::latest(sphere_context);

Ok(Box::new(NsSphere { inner: cursor }).into()) as Result<_, anyhow::Error>
Ok(Box::new(NsSphere {
inner: sphere_channel,
})
.into()) as Result<_, anyhow::Error>
})?;

Ok(fs)
Expand Down Expand Up @@ -152,7 +139,7 @@ pub fn ns_sphere_traverse_by_petname(

Ok(next_sphere_context.map(|next_sphere_context| {
Box::new(NsSphere {
inner: SphereCursor::latest(Arc::new(Mutex::new(next_sphere_context))),
inner: next_sphere_context.into(),
})
.into()
})) as Result<Option<_>, anyhow::Error>
Expand Down Expand Up @@ -212,13 +199,11 @@ pub fn ns_sphere_content_read(
.traverse_by_petnames(&petnames)
.await?
{
Some(sphere_context) => {
SphereCursor::latest(Arc::new(Mutex::new(sphere_context)))
}
Some(sphere_context) => SphereCursor::latest(Arc::new(sphere_context)),
None => return Ok(None),
}
}
Peer::None => sphere.inner().clone(),
Peer::None => SphereCursor::latest(sphere.inner().clone()),
Peer::Did(_) => return Err(anyhow!("DID peer in slashlink not yet supported")),
};

Expand Down Expand Up @@ -268,7 +253,7 @@ pub fn ns_sphere_content_write(
error_out.try_or_initialize(|| {
noosphere.async_runtime().block_on(async {
let slug = slug.to_str();
let cursor = sphere.inner_mut();
let mut cursor = SphereCursor::latest(sphere.inner_mut().clone());

println!(
"Writing sphere {} slug {}...",
Expand Down Expand Up @@ -350,7 +335,9 @@ pub fn ns_sphere_content_list(
) -> c_slice::Box<char_p::Box> {
let possible_output = error_out.try_or_initialize(|| {
noosphere.async_runtime().block_on(async {
let slug_set = SphereWalker::from(sphere.inner()).list_slugs().await?;
let slug_set = SphereWalker::from(sphere.inner().clone())
.list_slugs()
.await?;
let mut all_slugs: Vec<char_p::Box> = Vec::new();

for slug in slug_set.into_iter() {
Expand Down Expand Up @@ -399,7 +386,7 @@ pub fn ns_sphere_content_changes(
None => None,
};

let changed_slug_set = SphereWalker::from(sphere.inner())
let changed_slug_set = SphereWalker::from(sphere.inner().clone())
.content_changes(since.as_ref())
.await?;
let mut changed_slugs: Vec<char_p::Box> = Vec::new();
Expand Down Expand Up @@ -542,14 +529,12 @@ pub fn ns_sphere_identity(
match noosphere
.async_runtime()
.block_on(async { sphere.inner().identity().await })
{
Ok(identity) => {
identity
.to_string()
.try_into()
.map_err(|error: InvalidNulTerminator<String>| anyhow!(error).into())
}
Err(error) => Err(anyhow!(error).into()),
}
{
Ok(identity) => identity
.to_string()
.try_into()
.map_err(|error: InvalidNulTerminator<String>| anyhow!(error).into()),
Err(error) => Err(anyhow!(error).into()),
}
})
}
6 changes: 4 additions & 2 deletions rust/noosphere/src/ffi/petname.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ pub fn ns_sphere_petname_list(
) -> c_slice::Box<char_p::Box> {
let possible_output = error_out.try_or_initialize(|| {
noosphere.async_runtime().block_on(async {
let petname_set = SphereWalker::from(sphere.inner()).list_petnames().await?;
let petname_set = SphereWalker::from(sphere.inner().clone())
.list_petnames()
.await?;
let mut all_petnames: Vec<char_p::Box> = Vec::new();

for petname in petname_set.into_iter() {
Expand Down Expand Up @@ -197,7 +199,7 @@ pub fn ns_sphere_petname_changes(
None => None,
};

let changed_petname_set = SphereWalker::from(sphere.inner())
let changed_petname_set = SphereWalker::from(sphere.inner().clone())
.petname_changes(since.as_ref())
.await?;
let mut changed_petnames: Vec<char_p::Box> = Vec::new();
Expand Down
19 changes: 12 additions & 7 deletions rust/noosphere/src/ffi/sphere.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ pub fn ns_sphere_version_get(
) -> Option<char_p::Box> {
error_out.try_or_initialize(|| {
noosphere.async_runtime().block_on(async {
let sphere_context = noosphere
let sphere_channel = noosphere
.inner()
.get_sphere_context(&Did(sphere_identity.to_str().into()))
.get_sphere_channel(&Did(sphere_identity.to_str().into()))
.await?;

let sphere_context = sphere_context.lock().await;
let sphere_context = sphere_channel.immutable();
sphere_context
.sphere()
.await?
Expand Down Expand Up @@ -167,14 +167,19 @@ pub fn ns_sphere_sync(
) -> Option<char_p::Box> {
error_out.try_or_initialize(|| {
let cid = noosphere.async_runtime().block_on(async {
let mut sphere_context = noosphere
let mut sphere_channel = noosphere
.inner()
.get_sphere_context(&Did(sphere_identity.to_str().into()))
.get_sphere_channel(&Did(sphere_identity.to_str().into()))
.await?;

sphere_context.sync().await?;
sphere_channel.mutable().sync().await?;

Ok(sphere_context.to_sphere().await?.cid().to_string()) as Result<String, anyhow::Error>
Ok(sphere_channel
.immutable()
.to_sphere()
.await?
.cid()
.to_string()) as Result<String, anyhow::Error>
})?;

Ok(cid
Expand Down
Loading

0 comments on commit 90b7163

Please sign in to comment.