Skip to content

Commit

Permalink
[Bifrost] Allow top-level find_tail to auto-retry
Browse files Browse the repository at this point in the history
Those find_tail operations are only used by the partition_processor on startup and we don't want it to fail unnecessarily, this gives it a good chance to succeed unless the underlying error is not retryable.
```
// intentionally empty
```
  • Loading branch information
AhmedSoliman committed Feb 6, 2025
1 parent e6423d8 commit bc16992
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 7 deletions.
1 change: 0 additions & 1 deletion crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ impl<T: TransportConnect> Service<T> {
},
Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => {
self.observed_cluster_state.update(&cluster_state);
// todo: potentially downgrade to trace
trace!("Observed cluster state updated");
// todo quarantine this cluster controller if errors re-occur too often so that
// another cluster controller can take over
Expand Down
56 changes: 52 additions & 4 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use std::sync::Arc;
use std::sync::OnceLock;

use enum_map::EnumMap;
use tracing::instrument;
use restate_types::config::Configuration;
use tokio::time::Instant;
use tracing::{info, instrument, warn};

use restate_core::{Metadata, MetadataKind, MetadataWriter, TargetVersion};
use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment};
Expand All @@ -22,7 +24,7 @@ use restate_types::storage::StorageEncode;

use crate::appender::Appender;
use crate::background_appender::BackgroundAppender;
use crate::loglet::LogletProvider;
use crate::loglet::{LogletProvider, OperationError};
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result};
Expand Down Expand Up @@ -385,8 +387,54 @@ impl BifrostInner {

pub async fn find_tail(&self, log_id: LogId) -> Result<(LogletWrapper, TailState)> {
let loglet = self.writeable_loglet(log_id).await?;
let tail = loglet.find_tail().await?;
Ok((loglet, tail))
let start = Instant::now();
// uses the same retry policy as reads to not add too many configuration keys
let mut logged = false;
let mut retry_iter = Configuration::pinned()
.bifrost
.read_retry_policy
.clone()
.into_iter();
loop {
match loglet.find_tail().await {
Ok(tail) => {
if logged {
info!(
%log_id,
"Found the log tail after {} attempts, time spent is {:?}",
retry_iter.attempts(),
start.elapsed()
);
}
return Ok((loglet, tail));
}
Err(err @ OperationError::Shutdown(_)) => {
return Err(err.into());
}
Err(OperationError::Other(err)) if !err.retryable() => {
return Err(err.into());
}
// retryable errors
Err(OperationError::Other(err)) => {
// retry with exponential backoff
let Some(sleep_dur) = retry_iter.next() else {
// retries exhausted
return Err(err.into());
};
if retry_iter.attempts() > retry_iter.max_attempts() / 2 {
warn!(
%log_id,
attempts = retry_iter.attempts(),
retry_after = ?sleep_dur,
"Cannot find the tail of the log, will retry. err={}",
err
);
logged = true;
}
tokio::time::sleep(sleep_dur).await;
}
}
}
}

async fn get_trim_point(&self, log_id: LogId) -> Result<Lsn, Error> {
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub enum Error {
#[error("metadata store doesn't have an entry for log metadata")]
LogsMetadataNotProvisioned,
#[error("log '{0}' is sealed")]
LogSealed(LogId),
// #[error("log '{0}' is sealed")]
// LogSealed(LogId),
#[error("unknown log '{0}'")]
UnknownLogId(LogId),
#[error("invalid log sequence number '{0}'")]
Expand Down
20 changes: 20 additions & 0 deletions crates/types/src/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,26 @@ pub struct RetryIter<'a> {
last_retry: Option<Duration>,
}

impl RetryIter<'_> {
/// The number of attempts on this retry iterator so far
pub fn attempts(&self) -> usize {
self.attempts
}

pub fn max_attempts(&self) -> usize {
let max_attempts = match self.policy.as_ref() {
RetryPolicy::None => return 0,
RetryPolicy::FixedDelay { max_attempts, .. } => max_attempts,
RetryPolicy::Exponential { max_attempts, .. } => max_attempts,
};
max_attempts.unwrap_or(NonZeroUsize::MAX).into()
}

pub fn remaining_attempts(&self) -> usize {
self.max_attempts() - self.attempts()
}
}

impl Iterator for RetryIter<'_> {
type Item = Duration;

Expand Down

0 comments on commit bc16992

Please sign in to comment.