diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 0ef7068f0..a1de5eaea 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -320,7 +320,6 @@ impl Service { }, Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => { self.observed_cluster_state.update(&cluster_state); - // todo: potentially downgrade to trace trace!("Observed cluster state updated"); // todo quarantine this cluster controller if errors re-occur too often so that // another cluster controller can take over diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 854b514ae..e5c9cc23b 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -13,7 +13,9 @@ use std::sync::Arc; use std::sync::OnceLock; use enum_map::EnumMap; -use tracing::instrument; +use restate_types::config::Configuration; +use tokio::time::Instant; +use tracing::{info, instrument, warn}; use restate_core::{Metadata, MetadataKind, MetadataWriter, TargetVersion}; use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment}; @@ -22,7 +24,7 @@ use restate_types::storage::StorageEncode; use crate::appender::Appender; use crate::background_appender::BackgroundAppender; -use crate::loglet::LogletProvider; +use crate::loglet::{LogletProvider, OperationError}; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result}; @@ -385,8 +387,54 @@ impl BifrostInner { pub async fn find_tail(&self, log_id: LogId) -> Result<(LogletWrapper, TailState)> { let loglet = self.writeable_loglet(log_id).await?; - let tail = loglet.find_tail().await?; - Ok((loglet, tail)) + let start = Instant::now(); + // uses the same retry policy as reads to not add too many configuration keys + let mut logged = false; + let mut retry_iter = Configuration::pinned() + .bifrost + .read_retry_policy + .clone() + .into_iter(); + loop { + match loglet.find_tail().await { + Ok(tail) => { + if logged { + info!( + %log_id, + "Found the log tail after {} attempts, time spent is {:?}", + retry_iter.attempts(), + start.elapsed() + ); + } + return Ok((loglet, tail)); + } + Err(err @ OperationError::Shutdown(_)) => { + return Err(err.into()); + } + Err(OperationError::Other(err)) if !err.retryable() => { + return Err(err.into()); + } + // retryable errors + Err(OperationError::Other(err)) => { + // retry with exponential backoff + let Some(sleep_dur) = retry_iter.next() else { + // retries exhausted + return Err(err.into()); + }; + if retry_iter.attempts() > retry_iter.max_attempts() / 2 { + warn!( + %log_id, + attempts = retry_iter.attempts(), + retry_after = ?sleep_dur, + "Cannot find the tail of the log, will retry. err={}", + err + ); + logged = true; + } + tokio::time::sleep(sleep_dur).await; + } + } + } } async fn get_trim_point(&self, log_id: LogId) -> Result { diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 78520c6a8..9823a0f16 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -25,8 +25,8 @@ pub type Result = std::result::Result; pub enum Error { #[error("metadata store doesn't have an entry for log metadata")] LogsMetadataNotProvisioned, - #[error("log '{0}' is sealed")] - LogSealed(LogId), + // #[error("log '{0}' is sealed")] + // LogSealed(LogId), #[error("unknown log '{0}'")] UnknownLogId(LogId), #[error("invalid log sequence number '{0}'")] diff --git a/crates/types/src/retries.rs b/crates/types/src/retries.rs index 4704794f2..b97de42ba 100644 --- a/crates/types/src/retries.rs +++ b/crates/types/src/retries.rs @@ -235,6 +235,26 @@ pub struct RetryIter<'a> { last_retry: Option, } +impl RetryIter<'_> { + /// The number of attempts on this retry iterator so far + pub fn attempts(&self) -> usize { + self.attempts + } + + pub fn max_attempts(&self) -> usize { + let max_attempts = match self.policy.as_ref() { + RetryPolicy::None => return 0, + RetryPolicy::FixedDelay { max_attempts, .. } => max_attempts, + RetryPolicy::Exponential { max_attempts, .. } => max_attempts, + }; + max_attempts.unwrap_or(NonZeroUsize::MAX).into() + } + + pub fn remaining_attempts(&self) -> usize { + self.max_attempts() - self.attempts() + } +} + impl Iterator for RetryIter<'_> { type Item = Duration;