Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat: 3P replication fall-back and resilience
Browse files Browse the repository at this point in the history
  • Loading branch information
cdata committed Oct 10, 2023
1 parent 43b325c commit f8d39b8
Show file tree
Hide file tree
Showing 28 changed files with 797 additions and 144 deletions.
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ fastcdc = { version = "3.1" }
futures = { version = "0.3" }
futures-util = { version = "0.3" }
gloo-net = { version = "0.4" }
gloo-timers = { version = "0.2", features = ["futures"] }
gloo-timers = { version = "0.3", features = ["futures"] }
ignore = { version = "0.4.20" }
instant = { version = "0.1", features = ["wasm-bindgen"] }
iroh-car = { version = "^0.3.0" }
js-sys = { version = "^0.3" }
libipld = { version = "0.16" }
Expand Down
8 changes: 7 additions & 1 deletion rust/noosphere-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ homepage = "https://github.com/subconsciousnetwork/noosphere"
readme = "README.md"

[features]
helpers = ["rand"]
helpers = ["rand", "gloo-timers"]

[dependencies]
anyhow = { workspace = true }
gloo-timers = { workspace = true, optional = true }
tracing = { workspace = true }
rand = { workspace = true, optional = true }
futures-util = { workspace = true }
instant = { workspace = true }

[dev-dependencies]
rand = { workspace = true }
tokio-stream = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, features = ["full"] }
Expand All @@ -32,3 +35,6 @@ tokio = { workspace = true, features = ["sync", "macros"] }
futures = { workspace = true }
wasm-bindgen = { workspace = true }
wasm-bindgen-futures = { workspace = true }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = { workspace = true }
5 changes: 4 additions & 1 deletion rust/noosphere-common/src/helpers/wait.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::time::Duration;
use instant::Duration;

/// Wait for the specified number of seconds; uses [tokio::time::sleep], so this
/// will yield to the async runtime rather than block until the sleep time is
/// elapsed.
pub async fn wait(seconds: u64) {
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(Duration::from_secs(seconds)).await;
#[cfg(target_arch = "wasm32")]
gloo_timers::future::sleep(Duration::from_secs(seconds)).await
}
130 changes: 130 additions & 0 deletions rust/noosphere-common/src/latency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use instant::{Duration, Instant};

use futures_util::Stream;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

use crate::ConditionalSend;

/// A helper for observing when [Stream] throughput appears to have stalled
pub struct StreamLatencyGuard<S>
where
S: Stream + Unpin,
S::Item: ConditionalSend + 'static,
{
inner: S,
threshold: Duration,
last_ready_time: Instant,
tx: UnboundedSender<()>,
}

impl<S> StreamLatencyGuard<S>
where
S: Stream + Unpin,
S::Item: ConditionalSend + 'static,
{
/// Wraps a [Stream] and provides an [UnboundedReceiver<()>] that will receive
/// a message any time the wrapped [Stream] is pending for longer than the provided
/// threshold [Duration].
pub fn wrap(stream: S, threshold: Duration) -> (Self, UnboundedReceiver<()>) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
(
StreamLatencyGuard {
inner: stream,
threshold,
last_ready_time: Instant::now(),
tx,
},
rx,
)
}
}

impl<S> Stream for StreamLatencyGuard<S>
where
S: Stream + Unpin,
S::Item: ConditionalSend + 'static,
{
type Item = S::Item;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let result = std::pin::pin!(&mut self.inner).poll_next(cx);

if result.is_pending() {
if Instant::now() - self.last_ready_time > self.threshold {
let _ = self.tx.send(());
}
} else if result.is_ready() {
self.last_ready_time = Instant::now();
}

result
}
}

#[cfg(test)]
mod tests {
use anyhow::Result;
use instant::Duration;
use tokio::select;
use tokio_stream::StreamExt;

use crate::{helpers::wait, StreamLatencyGuard};

#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::wasm_bindgen_test;

#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn it_does_not_impede_the_behavior_of_a_wrapped_stream() -> Result<()> {
let stream = tokio_stream::iter(Vec::from([0u32; 1024]));

let (guarded_stream, _latency_signal) =
StreamLatencyGuard::wrap(stream, Duration::from_secs(1));

tokio::pin!(guarded_stream);

guarded_stream.collect::<Vec<u32>>().await;

Ok(())
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn it_signals_when_a_stream_encounters_latency() -> Result<()> {
let stream = Box::pin(futures_util::stream::unfold(0, |index| async move {
match index {
512 => {
for _ in 0..3 {
// Uh oh, latency! Note that `tokio::time::sleep` is observed to cooperate
// with the runtime, so we wait multiple times to ensure that the stream is
// actually polled multiple times
wait(1).await;
}
Some((index, index + 1))
}
_ if index < 1024 => Some((index, index + 1)),
_ => None,
}
}));

let (guarded_stream, mut latency_guard) =
StreamLatencyGuard::wrap(stream, Duration::from_millis(100));

tokio::pin!(guarded_stream);

select! {
_ = guarded_stream.collect::<Vec<u32>>() => {
unreachable!("Latency guard should be hit first");
},
_ = latency_guard.recv() => ()
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions rust/noosphere-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
extern crate tracing;

pub mod channel;
mod latency;
mod sync;
mod task;
mod unshared;

pub use latency::*;
pub use sync::*;
pub use task::*;
pub use unshared::*;
Expand Down
6 changes: 3 additions & 3 deletions rust/noosphere-common/src/unshared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ impl<T> std::fmt::Debug for Unshared<T> {
pub struct UnsharedStream<T>(Unshared<T>)
where
T: Stream + Unpin,
T::Item: ConditionalSend + 'static;
for<'a> T::Item: ConditionalSend + 'a;

impl<T> UnsharedStream<T>
where
T: Stream + Unpin,
T::Item: ConditionalSend + 'static,
for<'a> T::Item: ConditionalSend + 'a,
{
/// Initialize a new [UnsharedStream] wrapping a provided (presumably `!Sync`)
/// [Stream]
Expand All @@ -70,7 +70,7 @@ where
impl<T> Stream for UnsharedStream<T>
where
T: Stream + Unpin,
T::Item: ConditionalSend + 'static,
for<'a> T::Item: ConditionalSend + 'a,
{
type Item = T::Item;

Expand Down
1 change: 1 addition & 0 deletions rust/noosphere-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async-stream = { workspace = true }
async-once-cell = "~0.4"
anyhow = { workspace = true }
bytes = { workspace = true }
instant = { workspace = true }
iroh-car = { workspace = true }
thiserror = { workspace = true }
fastcdc = { workspace = true }
Expand Down
Loading

0 comments on commit f8d39b8

Please sign in to comment.