Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

feat: 3P replication fall-back and resilience #673

Merged
merged 1 commit into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ fastcdc = { version = "3.1" }
futures = { version = "0.3" }
futures-util = { version = "0.3" }
gloo-net = { version = "0.4" }
gloo-timers = { version = "0.2", features = ["futures"] }
gloo-timers = { version = "0.3", features = ["futures"] }
ignore = { version = "0.4.20" }
instant = { version = "0.1", features = ["wasm-bindgen"] }
iroh-car = { version = "^0.3.0" }
js-sys = { version = "^0.3" }
libipld = { version = "0.16" }
Expand Down
8 changes: 7 additions & 1 deletion rust/noosphere-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ homepage = "https://github.com/subconsciousnetwork/noosphere"
readme = "README.md"

[features]
helpers = ["rand"]
helpers = ["rand", "gloo-timers"]

[dependencies]
anyhow = { workspace = true }
gloo-timers = { workspace = true, optional = true }
tracing = { workspace = true }
rand = { workspace = true, optional = true }
futures-util = { workspace = true }
instant = { workspace = true }

[dev-dependencies]
rand = { workspace = true }
tokio-stream = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, features = ["full"] }
Expand All @@ -32,3 +35,6 @@ tokio = { workspace = true, features = ["sync", "macros"] }
futures = { workspace = true }
wasm-bindgen = { workspace = true }
wasm-bindgen-futures = { workspace = true }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = { workspace = true }
5 changes: 4 additions & 1 deletion rust/noosphere-common/src/helpers/wait.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::time::Duration;
use instant::Duration;

/// Wait for the specified number of seconds; uses [tokio::time::sleep], so this
/// will yield to the async runtime rather than block until the sleep time is
/// elapsed.
pub async fn wait(seconds: u64) {
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(Duration::from_secs(seconds)).await;
#[cfg(target_arch = "wasm32")]
gloo_timers::future::sleep(Duration::from_secs(seconds)).await
}
130 changes: 130 additions & 0 deletions rust/noosphere-common/src/latency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use instant::{Duration, Instant};

use futures_util::Stream;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

use crate::ConditionalSend;

/// A helper for observing when [Stream] throughput appears to have stalled
pub struct StreamLatencyGuard<S>
where
S: Stream + Unpin,
S::Item: ConditionalSend + 'static,
{
inner: S,
threshold: Duration,
last_ready_time: Instant,
tx: UnboundedSender<()>,
}

impl<S> StreamLatencyGuard<S>
where
S: Stream + Unpin,
S::Item: ConditionalSend + 'static,
{
/// Wraps a [Stream] and provides an [UnboundedReceiver<()>] that will receive
/// a message any time the wrapped [Stream] is pending for longer than the provided
/// threshold [Duration].
pub fn wrap(stream: S, threshold: Duration) -> (Self, UnboundedReceiver<()>) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
(
StreamLatencyGuard {
inner: stream,
threshold,
last_ready_time: Instant::now(),
tx,
},
rx,
)
}
}

impl<S> Stream for StreamLatencyGuard<S>
where
S: Stream + Unpin,
S::Item: ConditionalSend + 'static,
{
type Item = S::Item;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let result = std::pin::pin!(&mut self.inner).poll_next(cx);

if result.is_pending() {
if Instant::now() - self.last_ready_time > self.threshold {
let _ = self.tx.send(());
}
} else if result.is_ready() {
self.last_ready_time = Instant::now();
}

result
}
}

#[cfg(test)]
mod tests {
use anyhow::Result;
use instant::Duration;
use tokio::select;
use tokio_stream::StreamExt;

use crate::{helpers::wait, StreamLatencyGuard};

#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::wasm_bindgen_test;

#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn it_does_not_impede_the_behavior_of_a_wrapped_stream() -> Result<()> {
let stream = tokio_stream::iter(Vec::from([0u32; 1024]));

let (guarded_stream, _latency_signal) =
StreamLatencyGuard::wrap(stream, Duration::from_secs(1));

tokio::pin!(guarded_stream);

guarded_stream.collect::<Vec<u32>>().await;

Ok(())
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn it_signals_when_a_stream_encounters_latency() -> Result<()> {
let stream = Box::pin(futures_util::stream::unfold(0, |index| async move {
match index {
512 => {
for _ in 0..3 {
// Uh oh, latency! Note that `tokio::time::sleep` is observed to cooperate
// with the runtime, so we wait multiple times to ensure that the stream is
// actually polled multiple times
wait(1).await;
}
Some((index, index + 1))
}
_ if index < 1024 => Some((index, index + 1)),
_ => None,
}
}));

let (guarded_stream, mut latency_guard) =
StreamLatencyGuard::wrap(stream, Duration::from_millis(100));

tokio::pin!(guarded_stream);

select! {
_ = guarded_stream.collect::<Vec<u32>>() => {
unreachable!("Latency guard should be hit first");
},
_ = latency_guard.recv() => ()
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions rust/noosphere-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
extern crate tracing;

pub mod channel;
mod latency;
mod sync;
mod task;
mod unshared;

pub use latency::*;
pub use sync::*;
pub use task::*;
pub use unshared::*;
Expand Down
6 changes: 3 additions & 3 deletions rust/noosphere-common/src/unshared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ impl<T> std::fmt::Debug for Unshared<T> {
pub struct UnsharedStream<T>(Unshared<T>)
where
T: Stream + Unpin,
T::Item: ConditionalSend + 'static;
for<'a> T::Item: ConditionalSend + 'a;

impl<T> UnsharedStream<T>
where
T: Stream + Unpin,
T::Item: ConditionalSend + 'static,
for<'a> T::Item: ConditionalSend + 'a,
{
/// Initialize a new [UnsharedStream] wrapping a provided (presumably `!Sync`)
/// [Stream]
Expand All @@ -70,7 +70,7 @@ where
impl<T> Stream for UnsharedStream<T>
where
T: Stream + Unpin,
T::Item: ConditionalSend + 'static,
for<'a> T::Item: ConditionalSend + 'a,
{
type Item = T::Item;

Expand Down
1 change: 1 addition & 0 deletions rust/noosphere-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async-stream = { workspace = true }
async-once-cell = "~0.4"
anyhow = { workspace = true }
bytes = { workspace = true }
instant = { workspace = true }
iroh-car = { workspace = true }
thiserror = { workspace = true }
fastcdc = { workspace = true }
Expand Down
Loading