Skip to content

Commit

Permalink
feat: Add blocking sleeper for blocking retry (#138)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored Aug 30, 2024
1 parent 0f6923b commit 7ffcbd0
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 15 deletions.
3 changes: 2 additions & 1 deletion backon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ targets = [
]

[features]
default = ["tokio-sleep", "gloo-timers-sleep"]
default = ["std-blocking-sleep", "tokio-sleep", "gloo-timers-sleep"]
std-blocking-sleep = []
gloo-timers-sleep = ["dep:gloo-timers", "gloo-timers?/futures"]
tokio-sleep = ["dep:tokio", "tokio?/time"]

Expand Down
71 changes: 64 additions & 7 deletions backon/src/blocking_retry.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::thread;
use std::time::Duration;

use crate::backoff::BackoffBuilder;
use crate::Backoff;
use crate::blocking_sleep::MaybeBlockingSleeper;
use crate::{Backoff, BlockingSleeper, DefaultBlockingSleeper};

/// BlockingRetryable adds retry support for blocking functions.
///
Expand Down Expand Up @@ -63,13 +63,15 @@ pub struct BlockingRetry<
T,
E,
F: FnMut() -> Result<T, E>,
SF: MaybeBlockingSleeper = DefaultBlockingSleeper,
RF = fn(&E) -> bool,
NF = fn(&E, Duration),
> {
backoff: B,
retryable: RF,
notify: NF,
f: F,
sleep_fn: SF,
}

impl<B, T, E, F> BlockingRetry<B, T, E, F>
Expand All @@ -83,18 +85,57 @@ where
backoff,
retryable: |_: &E| true,
notify: |_: &E, _: Duration| {},
sleep_fn: DefaultBlockingSleeper::default(),
f,
}
}
}

impl<B, T, E, F, RF, NF> BlockingRetry<B, T, E, F, RF, NF>
impl<B, T, E, F, SF, RF, NF> BlockingRetry<B, T, E, F, SF, RF, NF>
where
B: Backoff,
F: FnMut() -> Result<T, E>,
SF: MaybeBlockingSleeper,
RF: FnMut(&E) -> bool,
NF: FnMut(&E, Duration),
{
/// Set the sleeper for retrying.
///
/// The sleeper should implement the [`BlockingSleeper`] trait. The simplest way is to use a closure like `Fn(Duration)`.
///
/// If not specified, we use the [`DefaultBlockingSleeper`].
///
/// # Examples
///
/// ```no_run
/// use anyhow::Result;
/// use backon::BlockingRetryable;
/// use backon::ExponentialBuilder;
///
/// fn fetch() -> Result<String> {
/// Ok("hello, world!".to_string())
/// }
///
/// fn main() -> Result<()> {
/// let retry = fetch
/// .retry(ExponentialBuilder::default())
/// .sleep(std::thread::sleep);
/// let content = retry.call()?;
/// println!("fetch succeeded: {}", content);
///
/// Ok(())
/// }
/// ```
pub fn sleep<SN: BlockingSleeper>(self, sleep_fn: SN) -> BlockingRetry<B, T, E, F, SN, RF, NF> {
BlockingRetry {
backoff: self.backoff,
retryable: self.retryable,
notify: self.notify,
f: self.f,
sleep_fn,
}
}

/// Set the conditions for retrying.
///
/// If not specified, all errors are considered retryable.
Expand All @@ -120,12 +161,16 @@ where
/// Ok(())
/// }
/// ```
pub fn when<RN: FnMut(&E) -> bool>(self, retryable: RN) -> BlockingRetry<B, T, E, F, RN, NF> {
pub fn when<RN: FnMut(&E) -> bool>(
self,
retryable: RN,
) -> BlockingRetry<B, T, E, F, SF, RN, NF> {
BlockingRetry {
backoff: self.backoff,
retryable,
notify: self.notify,
f: self.f,
sleep_fn: self.sleep_fn,
}
}

Expand Down Expand Up @@ -160,15 +205,28 @@ where
/// Ok(())
/// }
/// ```
pub fn notify<NN: FnMut(&E, Duration)>(self, notify: NN) -> BlockingRetry<B, T, E, F, RF, NN> {
pub fn notify<NN: FnMut(&E, Duration)>(
self,
notify: NN,
) -> BlockingRetry<B, T, E, F, SF, RF, NN> {
BlockingRetry {
backoff: self.backoff,
retryable: self.retryable,
notify,
f: self.f,
sleep_fn: self.sleep_fn,
}
}
}

impl<B, T, E, F, SF, RF, NF> BlockingRetry<B, T, E, F, SF, RF, NF>
where
B: Backoff,
F: FnMut() -> Result<T, E>,
SF: BlockingSleeper,
RF: FnMut(&E) -> bool,
NF: FnMut(&E, Duration),
{
/// Call the retried function.
///
/// TODO: implement [`std::ops::FnOnce`] after it stable.
Expand All @@ -187,15 +245,14 @@ where
None => return Err(err),
Some(dur) => {
(self.notify)(&err, dur);
thread::sleep(dur);
self.sleep_fn.sleep(dur);
}
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Mutex;
Expand Down
49 changes: 42 additions & 7 deletions backon/src/blocking_retry_with_context.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::thread;
use std::time::Duration;

use crate::backoff::BackoffBuilder;
use crate::Backoff;
use crate::blocking_sleep::MaybeBlockingSleeper;
use crate::{Backoff, BlockingSleeper, DefaultBlockingSleeper};

/// BlockingRetryableWithContext adds retry support for blocking functions.
pub trait BlockingRetryableWithContext<
Expand Down Expand Up @@ -34,13 +34,15 @@ pub struct BlockingRetryWithContext<
E,
Ctx,
F: FnMut(Ctx) -> (Ctx, Result<T, E>),
SF: MaybeBlockingSleeper = DefaultBlockingSleeper,
RF = fn(&E) -> bool,
NF = fn(&E, Duration),
> {
backoff: B,
retryable: RF,
notify: NF,
f: F,
sleep_fn: SF,
ctx: Option<Ctx>,
}

Expand All @@ -55,44 +57,67 @@ where
backoff,
retryable: |_: &E| true,
notify: |_: &E, _: Duration| {},
sleep_fn: DefaultBlockingSleeper::default(),
f,
ctx: None,
}
}
}

impl<B, T, E, Ctx, F, RF, NF> BlockingRetryWithContext<B, T, E, Ctx, F, RF, NF>
impl<B, T, E, Ctx, F, SF, RF, NF> BlockingRetryWithContext<B, T, E, Ctx, F, SF, RF, NF>
where
B: Backoff,
F: FnMut(Ctx) -> (Ctx, Result<T, E>),
SF: MaybeBlockingSleeper,
RF: FnMut(&E) -> bool,
NF: FnMut(&E, Duration),
{
/// Set the context for retrying.
///
/// Context is used to capture ownership manually to prevent lifetime issues.
pub fn context(self, context: Ctx) -> BlockingRetryWithContext<B, T, E, Ctx, F, RF, NF> {
pub fn context(self, context: Ctx) -> BlockingRetryWithContext<B, T, E, Ctx, F, SF, RF, NF> {
BlockingRetryWithContext {
backoff: self.backoff,
retryable: self.retryable,
notify: self.notify,
f: self.f,
sleep_fn: self.sleep_fn,
ctx: Some(context),
}
}

/// Set the sleeper for retrying.
///
/// The sleeper should implement the [`BlockingSleeper`] trait. The simplest way is to use a closure like `Fn(Duration)`.
///
/// If not specified, we use the [`DefaultBlockingSleeper`].
pub fn sleep<SN: BlockingSleeper>(
self,
sleep_fn: SN,
) -> BlockingRetryWithContext<B, T, E, Ctx, F, SN, RF, NF> {
BlockingRetryWithContext {
backoff: self.backoff,
retryable: self.retryable,
notify: self.notify,
f: self.f,
sleep_fn,
ctx: self.ctx,
}
}

/// Set the conditions for retrying.
///
/// If not specified, all errors are considered retryable.
pub fn when<RN: FnMut(&E) -> bool>(
self,
retryable: RN,
) -> BlockingRetryWithContext<B, T, E, Ctx, F, RN, NF> {
) -> BlockingRetryWithContext<B, T, E, Ctx, F, SF, RN, NF> {
BlockingRetryWithContext {
backoff: self.backoff,
retryable,
notify: self.notify,
f: self.f,
sleep_fn: self.sleep_fn,
ctx: self.ctx,
}
}
Expand All @@ -105,16 +130,26 @@ where
pub fn notify<NN: FnMut(&E, Duration)>(
self,
notify: NN,
) -> BlockingRetryWithContext<B, T, E, Ctx, F, RF, NN> {
) -> BlockingRetryWithContext<B, T, E, Ctx, F, SF, RF, NN> {
BlockingRetryWithContext {
backoff: self.backoff,
retryable: self.retryable,
notify,
f: self.f,
sleep_fn: self.sleep_fn,
ctx: self.ctx,
}
}
}

impl<B, T, E, Ctx, F, SF, RF, NF> BlockingRetryWithContext<B, T, E, Ctx, F, SF, RF, NF>
where
B: Backoff,
F: FnMut(Ctx) -> (Ctx, Result<T, E>),
SF: BlockingSleeper,
RF: FnMut(&E) -> bool,
NF: FnMut(&E, Duration),
{
/// Call the retried function.
///
/// TODO: implement [`std::ops::FnOnce`] after it stable.
Expand All @@ -136,7 +171,7 @@ where
None => return (ctx, Err(err)),
Some(dur) => {
(self.notify)(&err, dur);
thread::sleep(dur);
self.sleep_fn.sleep(dur);
}
}
}
Expand Down
55 changes: 55 additions & 0 deletions backon/src/blocking_sleep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::time::Duration;

/// A sleeper is used sleep for a specified duration.
pub trait BlockingSleeper: 'static {
/// sleep for a specified duration.
fn sleep(&self, dur: Duration);
}

/// A stub trait allowing non-[`BlockingSleeper`] types to be used as a generic parameter in [`BlockingRetry`][crate::BlockingRetry].
/// It does not provide actual functionality.
#[doc(hidden)]
pub trait MaybeBlockingSleeper: 'static {}

/// All `BlockingSleeper` will implement `MaybeBlockingSleeper`, but not vice versa.
impl<T: BlockingSleeper + ?Sized> MaybeBlockingSleeper for T {}

/// All `Fn(Duration)` implements `Sleeper`.
impl<F: Fn(Duration) + 'static> BlockingSleeper for F {
fn sleep(&self, dur: Duration) {
self(dur)
}
}

/// The default implementation of `Sleeper` when no features are enabled.
///
/// It will fail to compile if a containing [`Retry`][crate::Retry] is `.await`ed without calling [`Retry::sleep`][crate::Retry::sleep] to provide a valid sleeper.
#[cfg(all(not(feature = "tokio-sleep"), not(feature = "gloo-timers-sleep")))]
pub type DefaultBlockingSleeper = PleaseEnableAFeatureOrProvideACustomSleeper;
/// The default implementation of `Sleeper` while feature `std-blocking-sleep` enabled.
///
/// it uses [`std::thread::sleep`].
#[cfg(feature = "std-blocking-sleep")]
pub type DefaultBlockingSleeper = StdSleeper;

/// A placeholder type that does not implement [`Sleeper`] and will therefore fail to compile if used as one.
///
/// Users should enable a feature of this crate that provides a valid [`Sleeper`] implementation when this type appears in compilation errors. Alternatively, a custom [`Sleeper`] implementation should be provided where necessary, such as in [`crate::Retry::sleeper`].
#[doc(hidden)]
#[derive(Clone, Copy, Debug, Default)]
pub struct PleaseEnableAFeatureOrProvideACustomSleeper;

/// Implement `MaybeSleeper` but not `Sleeper`.
impl MaybeBlockingSleeper for PleaseEnableAFeatureOrProvideACustomSleeper {}

/// The implementation of `StdSleeper` uses [`std::thread::sleep`].
#[cfg(feature = "std-blocking-sleep")]
#[derive(Clone, Copy, Debug, Default)]
pub struct StdSleeper;

#[cfg(feature = "std-blocking-sleep")]
impl BlockingSleeper for StdSleeper {
fn sleep(&self, dur: Duration) {
std::thread::sleep(dur)
}
}
6 changes: 6 additions & 0 deletions backon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,11 @@ mod blocking_retry_with_context;
pub use blocking_retry_with_context::BlockingRetryWithContext;
pub use blocking_retry_with_context::BlockingRetryableWithContext;

mod blocking_sleep;
pub use blocking_sleep::BlockingSleeper;
pub use blocking_sleep::DefaultBlockingSleeper;
#[cfg(feature = "std-blocking-sleep")]
pub use blocking_sleep::StdSleeper;

#[cfg(docsrs)]
pub mod docs;

0 comments on commit 7ffcbd0

Please sign in to comment.