diff --git a/Cargo.lock b/Cargo.lock index e72b94b366..ce672a0522 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6425,6 +6425,7 @@ dependencies = [ name = "restate-bifrost" version = "1.2.0-dev" dependencies = [ + "ahash 0.8.11", "anyhow", "async-trait", "bytes", diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 0ef7068f02..a1de5eaea3 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -320,7 +320,6 @@ impl Service { }, Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => { self.observed_cluster_state.update(&cluster_state); - // todo: potentially downgrade to trace trace!("Observed cluster state updated"); // todo quarantine this cluster controller if errors re-occur too often so that // another cluster controller can take over diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 237eab71a8..b87ab2277c 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -25,6 +25,7 @@ restate-rocksdb = { workspace = true } restate-test-util = { workspace = true, optional = true } restate-types = { workspace = true } +ahash = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 854b514aee..e5c9cc23bb 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -13,7 +13,9 @@ use std::sync::Arc; use std::sync::OnceLock; use enum_map::EnumMap; -use tracing::instrument; +use restate_types::config::Configuration; +use tokio::time::Instant; +use tracing::{info, instrument, warn}; use restate_core::{Metadata, MetadataKind, MetadataWriter, TargetVersion}; use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment}; @@ -22,7 +24,7 @@ use restate_types::storage::StorageEncode; use crate::appender::Appender; use crate::background_appender::BackgroundAppender; -use crate::loglet::LogletProvider; +use crate::loglet::{LogletProvider, OperationError}; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result}; @@ -385,8 +387,54 @@ impl BifrostInner { pub async fn find_tail(&self, log_id: LogId) -> Result<(LogletWrapper, TailState)> { let loglet = self.writeable_loglet(log_id).await?; - let tail = loglet.find_tail().await?; - Ok((loglet, tail)) + let start = Instant::now(); + // uses the same retry policy as reads to not add too many configuration keys + let mut logged = false; + let mut retry_iter = Configuration::pinned() + .bifrost + .read_retry_policy + .clone() + .into_iter(); + loop { + match loglet.find_tail().await { + Ok(tail) => { + if logged { + info!( + %log_id, + "Found the log tail after {} attempts, time spent is {:?}", + retry_iter.attempts(), + start.elapsed() + ); + } + return Ok((loglet, tail)); + } + Err(err @ OperationError::Shutdown(_)) => { + return Err(err.into()); + } + Err(OperationError::Other(err)) if !err.retryable() => { + return Err(err.into()); + } + // retryable errors + Err(OperationError::Other(err)) => { + // retry with exponential backoff + let Some(sleep_dur) = retry_iter.next() else { + // retries exhausted + return Err(err.into()); + }; + if retry_iter.attempts() > retry_iter.max_attempts() / 2 { + warn!( + %log_id, + attempts = retry_iter.attempts(), + retry_after = ?sleep_dur, + "Cannot find the tail of the log, will retry. err={}", + err + ); + logged = true; + } + tokio::time::sleep(sleep_dur).await; + } + } + } } async fn get_trim_point(&self, log_id: LogId) -> Result { diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index 40979e97b5..fffb3eb49b 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -10,7 +10,7 @@ use std::sync::Arc; -use tracing::{debug, info, instrument}; +use tracing::{debug, instrument, warn}; use restate_core::metadata_store::retry_on_retryable_error; use restate_core::{Metadata, MetadataKind}; @@ -158,7 +158,7 @@ impl<'a> BifrostAdmin<'a> { self.inner.writeable_loglet(log_id).await } - #[instrument(level = "debug", skip(self), err)] + #[instrument(level = "debug", skip(self))] pub async fn seal(&self, log_id: LogId, segment_index: SegmentIndex) -> Result { self.inner.fail_if_shutting_down()?; // first find the tail segment for this log. @@ -177,7 +177,7 @@ impl<'a> BifrostAdmin<'a> { match err { crate::loglet::OperationError::Shutdown(err) => return Err(err.into()), crate::loglet::OperationError::Other(err) => { - info!( + warn!( log_id = %log_id, segment = %segment_index, %err, diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 78520c6a86..fea665d80a 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -25,8 +25,6 @@ pub type Result = std::result::Result; pub enum Error { #[error("metadata store doesn't have an entry for log metadata")] LogsMetadataNotProvisioned, - #[error("log '{0}' is sealed")] - LogSealed(LogId), #[error("unknown log '{0}'")] UnknownLogId(LogId), #[error("invalid log sequence number '{0}'")] diff --git a/crates/bifrost/src/providers/replicated_loglet/error.rs b/crates/bifrost/src/providers/replicated_loglet/error.rs index e266f23b8e..8467f3b7cc 100644 --- a/crates/bifrost/src/providers/replicated_loglet/error.rs +++ b/crates/bifrost/src/providers/replicated_loglet/error.rs @@ -18,7 +18,7 @@ use restate_types::replication::DecoratedNodeSet; use crate::loglet::OperationError; -#[derive(Default, derive_more::Display, derive_more::Debug)] +#[derive(Default, derive_more::Display, derive_more::Debug, PartialEq, Eq, Hash, Clone, Copy)] pub enum NodeSealStatus { #[display("E")] Error, diff --git a/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs b/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs index d8fe561b4b..3fca7cf456 100644 --- a/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs +++ b/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs @@ -8,13 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::{collections::BTreeMap, ops::Deref, sync::Arc}; +use std::sync::Arc; -use crossbeam_utils::CachePadded; +use ahash::HashMap; use metrics::Histogram; -use tokio::sync::Mutex; -use restate_core::network::{NetworkError, Networking, TransportConnect, WeakConnection}; use restate_types::{ logs::{LogletOffset, SequenceNumber, TailState}, replication::NodeSet, @@ -24,22 +22,17 @@ use restate_types::{ use super::metric_definitions::BIFROST_SEQ_STORE_DURATION; use crate::loglet::util::TailOffsetWatch; -type LogServerLock = CachePadded>>; - /// LogServer instance #[derive(Clone)] pub struct RemoteLogServer { tail: TailOffsetWatch, - connection: WeakConnection, store_latency: Histogram, } impl RemoteLogServer { - fn new(connection: WeakConnection) -> Self { - let node_id = connection.peer().as_plain(); + fn new(node_id: PlainNodeId) -> Self { Self { tail: TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)), - connection, store_latency: metrics::histogram!(BIFROST_SEQ_STORE_DURATION, "node_id" => node_id.to_string()), } } @@ -51,20 +44,13 @@ impl RemoteLogServer { pub fn store_latency(&self) -> &Histogram { &self.store_latency } - - pub fn connection(&self) -> &WeakConnection { - &self.connection - } } /// LogServerManager maintains a set of [`RemoteLogServer`]s that provided via the /// [`NodeSet`]. -/// -/// The manager makes sure there is only one active connection per server. -/// It's up to the user of the client to do [`LogServerManager::renew`] if needed #[derive(Clone)] pub struct RemoteLogServerManager { - servers: Arc>, + servers: Arc>, } impl RemoteLogServerManager { @@ -72,94 +58,20 @@ impl RemoteLogServerManager { pub fn new(nodeset: &NodeSet) -> Self { let servers = nodeset .iter() - .map(|node_id| (*node_id, LogServerLock::default())) + .copied() + .map(|node_id| (node_id, RemoteLogServer::new(node_id))) .collect(); let servers = Arc::new(servers); Self { servers } } - pub fn try_get_tail_offset(&self, id: PlainNodeId) -> Option { + pub fn get_tail_offset(&self, id: PlainNodeId) -> &TailOffsetWatch { let server = self.servers.get(&id).expect("node is in nodeset"); - - if let Ok(guard) = server.try_lock() { - if let Some(current) = guard.deref() { - return Some(current.local_tail().clone()); - } - } - - None + server.local_tail() } - /// Gets a log-server instance. On first time it will initialize a new connection - /// to log server. It will make sure all following get call holds the same - /// connection. - /// - /// It's up to the client to call [`Self::renew`] if the connection it holds - /// is closed. - pub async fn get( - &self, - id: PlainNodeId, - networking: &Networking, - ) -> Result { - let server = self.servers.get(&id).expect("node is in nodeset"); - - let mut guard = server.lock().await; - - if let Some(current) = guard.deref() { - return Ok(current.clone()); - } - - let connection = networking.node_connection(id).await?; - let server = RemoteLogServer::new(connection); - - *guard = Some(server.clone()); - - Ok(server) - } - - /// Renew makes sure server connection is renewed if and only if - /// the provided server holds an outdated connection. Otherwise - /// the latest connection associated with this server is used. - /// - /// It's up the holder of the log server instance to retry to renew - /// if that connection is not valid. - /// - /// It also guarantees that concurrent calls to renew on the same server instance - /// will only renew the connection once for all callers - /// - /// However, this does not affect copies of LogServer that have been already retrieved - /// by calling [`Self::get()`]. - /// - /// Holder of old instances will have to call renew if they need to. - pub async fn renew( - &self, - server: &mut RemoteLogServer, - networking: &Networking, - ) -> Result<(), NetworkError> { - // this key must already be in the map - let current = self - .servers - .get(&server.connection.peer().as_plain()) - .expect("node is in nodeset"); - - let mut guard = current.lock().await; - - // if you calling renew then the LogServer has already been initialized - let inner = guard.as_mut().expect("initialized log server instance"); - - if inner.connection != server.connection { - // someone else has already renewed the connection - server.connection = inner.connection.clone(); - return Ok(()); - } - - let connection = networking - .node_connection(server.connection.peer().as_plain()) - .await?; - inner.connection = connection.clone(); - server.connection = connection.clone(); - - Ok(()) + pub fn get(&self, id: PlainNodeId) -> &RemoteLogServer { + self.servers.get(&id).expect("node is in nodeset") } } diff --git a/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs b/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs index cbb660df33..de313ed0fa 100644 --- a/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs +++ b/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs @@ -28,6 +28,7 @@ pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_TOTAL: &str = pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_BYTES: &str = "restate.bifrost.sequencer.committed_records.bytes"; pub(crate) const BIFROST_SEQ_STORE_DURATION: &str = "restate.bifrost.sequencer.store_duration"; +pub(crate) const BIFROST_SEQ_APPEND_DURATION: &str = "restate.bifrost.sequencer.append_duration"; pub(crate) fn describe_metrics() { describe_counter!( @@ -72,6 +73,12 @@ pub(crate) fn describe_metrics() { "Size of records committed" ); + describe_histogram!( + BIFROST_SEQ_APPEND_DURATION, + Unit::Seconds, + "Append batch duration in seconds as measured by the sequencer" + ); + describe_histogram!( BIFROST_SEQ_STORE_DURATION, Unit::Seconds, diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs index 3c389112a4..e2170d66dc 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs @@ -8,12 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::BTreeMap; use std::fmt::Debug; use std::fmt::Display; use std::hash::{Hash, Hasher}; +use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use itertools::Itertools; +use restate_types::replication::DecoratedNodeSet; use tracing::warn; use restate_types::locality::{LocationScope, NodeLocation}; @@ -734,6 +736,22 @@ impl LocationScopeState { } } +impl<'a, Attr> IntoIterator for &'a NodeSetChecker { + type Item = (&'a PlainNodeId, &'a Attr); + + type IntoIter = <&'a HashMap as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.node_to_attr.iter() + } +} + +impl From> for DecoratedNodeSet { + fn from(val: NodeSetChecker) -> DecoratedNodeSet { + DecoratedNodeSet::from_iter(val.node_to_attr) + } +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs index b8263f2b00..5cb4b76007 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs @@ -8,36 +8,29 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::{ - cmp::Ordering, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{cmp::Ordering, fmt::Display, sync::Arc, time::Duration}; -use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::{sync::OwnedSemaphorePermit, time::timeout}; -use tracing::{instrument, trace}; +use tokio::time::Instant; +use tokio::{sync::OwnedSemaphorePermit, task::JoinSet}; +use tracing::{debug, instrument, trace, warn}; use restate_core::{ cancellation_token, - network::{ - rpc_router::{RpcError, RpcRouter}, - Incoming, NetworkError, Networking, Outgoing, TransportConnect, - }, - ShutdownError, + network::{rpc_router::RpcRouter, Incoming, NetworkError, Networking, TransportConnect}, + ShutdownError, TaskCenterFutureExt, }; use restate_types::{ config::Configuration, live::Live, logs::{LogletOffset, Record, SequenceNumber, TailState}, net::log_server::{LogServerRequestHeader, Status, Store, StoreFlags, Stored}, - replicated_loglet::Spread, - replication::NodeSet, + replication::{DecoratedNodeSet, NodeSet}, time::MillisSinceEpoch, - PlainNodeId, + Merge, PlainNodeId, }; use super::{RecordsExt, SequencerSharedState}; +use crate::providers::replicated_loglet::metric_definitions::BIFROST_SEQ_APPEND_DURATION; use crate::{ loglet::{AppendError, LogletCommitResolver}, providers::replicated_loglet::{ @@ -50,8 +43,9 @@ use crate::{ }; const DEFAULT_BACKOFF_TIME: Duration = Duration::from_millis(1000); +const TONE_ESCALATION_THRESHOLD: usize = 5; -enum SequencerAppenderState { +enum State { Wave { // nodes that should be avoided by the spread selector graylist: NodeSet, @@ -69,6 +63,9 @@ pub(crate) struct SequencerAppender { networking: Networking, first_offset: LogletOffset, records: Arc<[Record]>, + checker: NodeSetChecker, + nodeset_status: DecoratedNodeSet, + current_wave: usize, // permit is held during the entire live // of the batch to limit the number of // inflight batches @@ -88,10 +85,25 @@ impl SequencerAppender { permit: OwnedSemaphorePermit, commit_resolver: LogletCommitResolver, ) -> Self { + // todo: in the future, we should update the checker's view over nodes configuration before + // each wave. At the moment this is not required as nodes will not change their storage + // state after the nodeset has been created until the loglet is sealed. + let checker = NodeSetChecker::::new( + sequencer_shared_state.selector.nodeset(), + &networking.metadata().nodes_config_ref(), + sequencer_shared_state.selector.replication_property(), + ); + + let nodeset_status = + DecoratedNodeSet::from(sequencer_shared_state.selector.nodeset().clone()); + Self { sequencer_shared_state, store_router, networking, + checker, + nodeset_status, + current_wave: 0, first_offset, records, permit: Some(permit), @@ -112,15 +124,14 @@ impl SequencerAppender { ) )] pub async fn run(mut self) { - let mut wave = 0; + let start = Instant::now(); // initial wave has 0 replicated and 0 gray listed node - let mut state = SequencerAppenderState::Wave { + let mut state = State::Wave { graylist: NodeSet::default(), }; let cancellation = cancellation_token(); - let mut cancelled = std::pin::pin!(cancellation.cancelled()); let retry_policy = self .configuration .live_load() @@ -135,36 +146,42 @@ impl SequencerAppender { let final_state = loop { state = match state { // termination conditions - SequencerAppenderState::Done - | SequencerAppenderState::Cancelled - | SequencerAppenderState::Sealed => break state, - SequencerAppenderState::Wave { graylist } => { - wave += 1; - tokio::select! { - next_state = self.wave(graylist, wave) => {next_state}, - _ = &mut cancelled => { - break SequencerAppenderState::Cancelled; - } - } + State::Done | State::Cancelled | State::Sealed => break state, + State::Wave { graylist } => { + self.current_wave += 1; + let Some(next_state) = cancellation + .run_until_cancelled(self.send_wave(graylist)) + .await + else { + break State::Cancelled; + }; + next_state } - SequencerAppenderState::Backoff => { + State::Backoff => { // since backoff can be None, or run out of iterations, // but appender should never give up we fall back to fixed backoff let delay = retry.next().unwrap_or(DEFAULT_BACKOFF_TIME); - tracing::info!( - delay = ?delay, - %wave, - "Append wave failed, retrying with a new wave after delay" - ); - - tokio::select! { - _ = tokio::time::sleep(delay) => {}, - _ = &mut cancelled => { - break SequencerAppenderState::Cancelled; - } + if self.current_wave >= TONE_ESCALATION_THRESHOLD { + warn!( + wave = %self.current_wave, + "Append wave failed, retrying with a new wave after {:?}. Status is {}", delay, self.nodeset_status + ); + } else { + debug!( + wave = %self.current_wave, + "Append wave failed, retrying with a new wave after {:?}. Status is {}", delay, self.nodeset_status + ); + } + + if cancellation + .run_until_cancelled(tokio::time::sleep(delay)) + .await + .is_none() + { + break State::Cancelled; }; - SequencerAppenderState::Wave { + State::Wave { graylist: NodeSet::default(), } } @@ -172,35 +189,42 @@ impl SequencerAppender { }; match final_state { - SequencerAppenderState::Done => { + State::Done => { assert!(self.commit_resolver.is_none()); metrics::counter!(BIFROST_SEQ_RECORDS_COMMITTED_TOTAL) .increment(self.records.len() as u64); metrics::counter!(BIFROST_SEQ_RECORDS_COMMITTED_BYTES) .increment(self.records.estimated_encode_size() as u64); + metrics::histogram!(BIFROST_SEQ_APPEND_DURATION).record(start.elapsed()); - tracing::trace!("SequencerAppender task completed"); + trace!( + wave = %self.current_wave, + "Append succeeded in {:?}, status {}", + start.elapsed(), + self.nodeset_status + ); } - SequencerAppenderState::Cancelled => { - tracing::trace!("SequencerAppender task cancelled"); + State::Cancelled => { + trace!("Append cancelled"); if let Some(commit_resolver) = self.commit_resolver.take() { commit_resolver.error(AppendError::Shutdown(ShutdownError)); } } - SequencerAppenderState::Sealed => { - tracing::debug!("SequencerAppender ended because of sealing"); + State::Sealed => { + trace!("Append ended because of sealing"); if let Some(commit_resolver) = self.commit_resolver.take() { commit_resolver.sealed(); } } - SequencerAppenderState::Backoff | SequencerAppenderState::Wave { .. } => { + State::Backoff | State::Wave { .. } => { unreachable!() } } } - async fn wave(&mut self, mut graylist: NodeSet, wave: usize) -> SequencerAppenderState { + #[instrument(skip_all, fields(wave = %self.current_wave))] + async fn send_wave(&mut self, mut graylist: NodeSet) -> State { // select the spread let spread = match self.sequencer_shared_state.selector.select( &mut rand::rng(), @@ -210,23 +234,17 @@ impl SequencerAppender { Ok(spread) => spread, Err(_) => { graylist.clear(); - return SequencerAppenderState::Backoff; + trace!( + %graylist, + "Cannot select a spread, perhaps too many nodes are graylisted, will clear the list and try again" + ); + return State::Backoff; } }; - tracing::trace!(%graylist, %spread, %wave, "Sending store wave"); - self.send_wave(spread, wave).await - } - - async fn send_wave(&mut self, spread: Spread, wave: usize) -> SequencerAppenderState { + trace!(%graylist, %spread, "Sending append wave"); let last_offset = self.records.last_offset(self.first_offset).unwrap(); - let mut checker = NodeSetChecker::::new( - self.sequencer_shared_state.selector.nodeset(), - &self.networking.metadata().nodes_config_ref(), - self.sequencer_shared_state.selector.replication_property(), - ); - // todo: should be exponential backoff let store_timeout = *self .configuration @@ -235,98 +253,114 @@ impl SequencerAppender { .replicated_loglet .log_server_rpc_timeout; - let timeout_at = MillisSinceEpoch::after(store_timeout); // track the in flight server ids let mut pending_servers = NodeSet::from_iter(spread.iter().copied()); - let mut store_tasks = FuturesUnordered::new(); - - for node_id in &spread { - store_tasks.push( - LogServerStoreTask { - node_id: *node_id, - sequencer_shared_state: &self.sequencer_shared_state, - networking: &self.networking, - first_offset: self.first_offset, - records: &self.records, - rpc_router: &self.store_router, - timeout_at, + let mut store_tasks = JoinSet::new(); + + for node_id in spread.iter().copied() { + // do not attempt on nodes that we know they're committed || sealed + if let Some(status) = self.checker.get_attribute(&node_id) { + if status.committed || status.sealed { + pending_servers.remove(node_id); + continue; } - .run(), - ); + } + store_tasks.spawn({ + let store_task = LogServerStoreTask { + node_id, + sequencer_shared_state: self.sequencer_shared_state.clone(), + networking: self.networking.clone(), + first_offset: self.first_offset, + records: self.records.clone(), + rpc_router: self.store_router.clone(), + store_timeout, + }; + async move { (node_id, store_task.run().await) }.in_current_tc() + }); } - loop { - let store_result = match timeout(store_timeout, store_tasks.next()).await { - Ok(Some(result)) => result, - Ok(None) => break, // no more tasks - Err(_) => { - // if we have already acknowledged this append, it's okay to retire. - if self.commit_resolver.is_none() { - tracing::debug!(%pending_servers, %wave, %spread, responses=?checker, "Some servers didn't store this batch, but append was committed, giving up"); - return SequencerAppenderState::Done; - } - - if self.sequencer_shared_state.known_global_tail.is_sealed() { - tracing::debug!(%pending_servers, %wave, %spread, responses=?checker, "Some servers didn't store this batch, but this loglet was sealed, giving up"); - return SequencerAppenderState::Sealed; - } - // timed out! - // none of the pending tasks has finished in time! we will assume all pending server - // are graylisted and try again - tracing::debug!(%pending_servers, %wave, %spread, responses=?checker, "Timeout waiting on store response"); - return SequencerAppenderState::Wave { - graylist: pending_servers, - }; - } + while let Some(store_result) = store_tasks.join_next().await { + // unlikely to happen, but it's there for completeness + if self.sequencer_shared_state.known_global_tail.is_sealed() { + trace!(%pending_servers, %spread, "Loglet was sealed, stopping this sequencer appender"); + return State::Sealed; + } + let Ok((node_id, store_result)) = store_result else { + // task panicked, ignore + continue; }; - let LogServerStoreTaskResult { - node_id: peer, - status, - } = store_result; - - let response = match status { + let stored = match store_result { StoreTaskStatus::Error(NetworkError::Shutdown(_)) => { - return SequencerAppenderState::Cancelled; + return State::Cancelled; + } + StoreTaskStatus::Error(NetworkError::Timeout(_)) => { + // Yes, I know those checks are ugly, but it's a quick and dirty way until we + // have a nice macro for it. + if self.current_wave >= TONE_ESCALATION_THRESHOLD { + debug!(peer = %node_id, "Timeout waiting for node {} to commit a batch", node_id); + } else { + trace!(peer = %node_id, "Timeout waiting for node {} to commit a batch", node_id); + } + self.nodeset_status.merge(node_id, PerNodeStatus::timeout()); + graylist.insert(node_id); + continue; } StoreTaskStatus::Error(err) => { // couldn't send store command to remote server - tracing::debug!(%peer, error=%err, "Failed to send batch to node"); + if self.current_wave >= TONE_ESCALATION_THRESHOLD { + debug!(peer = %node_id, %err, "Failed to send batch to node"); + } else { + trace!(peer = %node_id, %err, "Failed to send batch to node"); + } + self.nodeset_status.merge(node_id, PerNodeStatus::failed()); + graylist.insert(node_id); continue; } StoreTaskStatus::Sealed => { - tracing::debug!(%peer, "Store task cancelled, the node is sealed"); - checker.set_attribute(peer, NodeAttributes::sealed()); + debug!(peer = %node_id, "Store task cancelled, the node is sealed"); + self.checker + .set_attribute(node_id, NodeAttributes::sealed()); + self.nodeset_status.merge(node_id, PerNodeStatus::Sealed); continue; } StoreTaskStatus::Stored(stored) => { - tracing::trace!(%peer, "Store task completed"); + trace!(peer = %node_id, "Store task completed"); stored } }; - // we had a response from this node and there is still a lot we can do - match response.status { + // We had a response from this node and there is still a lot we can do + match stored.status { Status::Ok => { // only if status is okay that we remove this node // from the gray list, and move to replicated list - checker.set_attribute(peer, NodeAttributes::committed()); - pending_servers.remove(peer); + self.checker + .set_attribute(node_id, NodeAttributes::committed()); + self.nodeset_status.merge(node_id, PerNodeStatus::Committed); + pending_servers.remove(node_id); } Status::Sealed | Status::Sealing => { - checker.set_attribute(peer, NodeAttributes::sealed()); + self.checker + .set_attribute(node_id, NodeAttributes::sealed()); + graylist.insert(node_id); + } + Status::Dropped => { + // Overloaded, or request expired + debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer is load shedding"); + graylist.insert(node_id); } - Status::Disabled - | Status::Dropped - | Status::SequencerMismatch - | Status::Malformed - | Status::OutOfBounds => { - // just leave this log server in graylist (pending) - tracing::debug!(%peer, status=?response.status, "Store task returned an error status"); + Status::Disabled => { + debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer's log-store is disabled"); + graylist.insert(node_id); + } + Status::SequencerMismatch | Status::Malformed | Status::OutOfBounds => { + warn!(peer = %node_id, status=?stored.status, "Store failed on peer due to unexpected error, please check logs of the peer to investigate"); + graylist.insert(node_id); } } - if self.commit_resolver.is_some() && checker.check_write_quorum(|attr| attr.committed) { + if self.checker.check_write_quorum(|attr| attr.committed) { // resolve the commit if not resolved yet if let Some(resolver) = self.commit_resolver.take() { self.sequencer_shared_state @@ -334,20 +368,101 @@ impl SequencerAppender { .notify_offset_update(last_offset.next()); resolver.offset(last_offset); } - // drop the permit self.permit.take(); + return State::Done; } } - if checker.check_write_quorum(|attr| attr.committed) { - SequencerAppenderState::Done - } else if checker.check_fmajority(|attr| attr.sealed).passed() { - SequencerAppenderState::Sealed + if self.checker.check_fmajority(|attr| attr.sealed).passed() { + State::Sealed } else { - SequencerAppenderState::Wave { - graylist: pending_servers, + // We couldn't achieve write quorum with this wave. We will try again, as fast as + // possible until the graylist eats up enough nodes such that we won't be able to + // generate node nodesets. Only then we backoff. + State::Wave { graylist } + } + } +} + +#[derive(Default, Debug, PartialEq, Clone, Copy)] +enum PerNodeStatus { + #[default] + NotAttempted, + // todo: the distinction between timeout and failed might not be worth the hassle. + // consider only doing failed if in practice it wasn't as useful to keep both variants. + Failed { + attempts: usize, + }, + Timeout { + attempts: usize, + }, + Committed, + Sealed, +} + +impl Display for PerNodeStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PerNodeStatus::NotAttempted => write!(f, ""), + PerNodeStatus::Failed { attempts } => write!(f, "ERROR({})", attempts), + PerNodeStatus::Committed => write!(f, "COMMITTED"), + PerNodeStatus::Timeout { attempts } => write!(f, "TIMEDOUT({})", attempts), + PerNodeStatus::Sealed => write!(f, "SEALED"), + } + } +} + +impl PerNodeStatus { + fn timeout() -> Self { + Self::Timeout { attempts: 1 } + } + fn failed() -> Self { + Self::Failed { attempts: 1 } + } +} + +impl Merge for PerNodeStatus { + fn merge(&mut self, other: Self) -> bool { + use PerNodeStatus::*; + match (*self, other) { + (NotAttempted, NotAttempted) => false, + (Committed, Committed) => false, + (NotAttempted, e) => { + *self = e; + true + } + // we will not transition from committed to seal because + // committed is more important for showing where did we write. Not that this is likely + // to ever happen though. + (Committed, _) => false, + (Failed { attempts: a1 }, Failed { attempts: a2 }) => { + *self = Failed { attempts: a1 + a2 }; + true + } + (Failed { attempts: a1 }, Timeout { attempts: a2 }) => { + *self = Timeout { attempts: a1 + a2 }; + true + } + (Timeout { attempts: a1 }, Failed { attempts: a2 }) => { + *self = Failed { attempts: a1 + a2 }; + true + } + (Timeout { attempts: a1 }, Timeout { attempts: a2 }) => { + *self = Timeout { attempts: a1 + a2 }; + true } + (_, Committed) => { + *self = Committed; + true + } + (Sealed, Sealed) => false, + (_, Sealed) => { + *self = Sealed; + true + } + (Sealed, _) => false, + _ => false, } } } @@ -374,6 +489,18 @@ impl NodeAttributes { } } +impl Display for NodeAttributes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match (self.committed, self.sealed) { + // legend X = committed to be consistent with restatectl digest output + (true, true) => write!(f, "X(S)"), + (true, false) => write!(f, "X"), + (false, true) => write!(f, "-(S)"), + (false, false) => write!(f, ""), + } + } +} + #[derive(Debug)] enum StoreTaskStatus { Sealed, @@ -390,24 +517,19 @@ impl From> for StoreTaskStatus { } } -struct LogServerStoreTaskResult { - pub node_id: PlainNodeId, - pub status: StoreTaskStatus, -} - /// The task will retry to connect to the remote server if connection /// was lost. -struct LogServerStoreTask<'a, T> { +struct LogServerStoreTask { node_id: PlainNodeId, - sequencer_shared_state: &'a Arc, - networking: &'a Networking, + sequencer_shared_state: Arc, + networking: Networking, first_offset: LogletOffset, - records: &'a Arc<[Record]>, - rpc_router: &'a RpcRouter, - timeout_at: MillisSinceEpoch, + records: Arc<[Record]>, + rpc_router: RpcRouter, + store_timeout: Duration, } -impl LogServerStoreTask<'_, T> { +impl LogServerStoreTask { #[instrument( skip_all, fields( @@ -418,7 +540,7 @@ impl LogServerStoreTask<'_, T> { peer=%self.node_id, ) )] - async fn run(mut self) -> LogServerStoreTaskResult { + async fn run(mut self) -> StoreTaskStatus { let result = self.send().await; match &result { Ok(status) => { @@ -435,18 +557,14 @@ impl LogServerStoreTask<'_, T> { } } - LogServerStoreTaskResult { - node_id: self.node_id, - status: result.into(), - } + result.into() } async fn send(&mut self) -> Result { - let mut server = self + let server = self .sequencer_shared_state .log_server_manager - .get(self.node_id, self.networking) - .await?; + .get(self.node_id); let server_local_tail = server .local_tail() .wait_for_offset_or_seal(self.first_offset); @@ -481,7 +599,7 @@ impl LogServerStoreTask<'_, T> { } } - let incoming = match self.try_send(&mut server).await { + let incoming = match self.try_send(server).await { Ok(incoming) => incoming, Err(err) => { return Err(err); @@ -506,10 +624,8 @@ impl LogServerStoreTask<'_, T> { Ok(StoreTaskStatus::Stored(incoming.into_body())) } - async fn try_send( - &mut self, - server: &mut RemoteLogServer, - ) -> Result, NetworkError> { + async fn try_send(&self, server: &RemoteLogServer) -> Result, NetworkError> { + let timeout_at = MillisSinceEpoch::after(self.store_timeout); let store = Store { header: LogServerRequestHeader::new( *self.sequencer_shared_state.loglet_id(), @@ -520,53 +636,28 @@ impl LogServerStoreTask<'_, T> { first_offset: self.first_offset, flags: StoreFlags::empty(), known_archived: LogletOffset::INVALID, - payloads: Arc::clone(self.records), + payloads: Arc::clone(&self.records), sequencer: *self.sequencer_shared_state.sequencer(), - timeout_at: Some(self.timeout_at), + timeout_at: Some(timeout_at), }; - let mut msg = Outgoing::new(self.node_id, store); - - let mut attempt = 0; - loop { - attempt += 1; - - let with_connection = msg.assign_connection(server.connection().clone()); - let store_start_time = Instant::now(); - - let result = match self.rpc_router.call_on_connection(with_connection).await { - Ok(incoming) => Ok(incoming), - Err(RpcError::Shutdown(shutdown)) => Err(NetworkError::Shutdown(shutdown)), - Err(RpcError::SendError(err)) => { - msg = err.original.forget_connection(); - - match err.source { - NetworkError::ConnectionClosed(_) - | NetworkError::ConnectError(_) - | NetworkError::Timeout(_) => { - trace!( - %attempt, - "Failed to send store to log server, trying to create a new connection" - ); - self.sequencer_shared_state - .log_server_manager - .renew(server, self.networking) - .await?; - trace!( - %attempt, - "Reconnected to log-server, retrying the store" - ); - // try again - continue; - } - _ => Err(err.source), - } - } - }; - - server.store_latency().record(store_start_time.elapsed()); - - return result; + let store_start_time = Instant::now(); + + // note: we are over-indexing on the fact that currently the sequencer will send one + // message at a time per log-server. My argument to make us not sticking to a single + // connection is that the complexity with the previous design didn't add any value. When we + // support pipelined writes, it's unlikely that we'll also be doing the coordination through + // the offset watch as we are currently doing (due to its lock-contention downside). It'll be a different design altogether. + match self + .rpc_router + .call_timeout(&self.networking, self.node_id, store, self.store_timeout) + .await + { + Ok(incoming) => { + server.store_latency().record(store_start_time.elapsed()); + Ok(incoming) + } + Err(err) => Err(err), } } } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs index e0fdfed834..0292371f8e 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs @@ -92,11 +92,7 @@ impl CheckSealTask { let local_tails: BTreeMap = effective_nodeset .iter() - .filter_map(|node_id| { - log_servers - .try_get_tail_offset(*node_id) - .map(|w| (*node_id, w)) - }) + .map(|node_id| (*node_id, log_servers.get_tail_offset(*node_id).clone())) .collect(); // If some of the nodes are already sealed, we know our answer and we don't need to go through diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs index 3686745a1b..1dfda1ae22 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs @@ -10,7 +10,7 @@ use tokio::task::JoinSet; use tokio::time::Instant; -use tracing::{debug, info, instrument, trace, warn, Instrument}; +use tracing::{debug, instrument, trace, warn, Instrument}; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::{Incoming, Networking, TransportConnect}; @@ -73,7 +73,7 @@ impl SealTask { .nodeset .to_effective(&networking.metadata().nodes_config_ref()); - let mut nodeset_checker = NodeSetChecker::::new( + let mut nodeset_checker = NodeSetChecker::::new( &effective_nodeset, &networking.metadata().nodes_config_ref(), &my_params.replication, @@ -116,8 +116,6 @@ impl SealTask { }); } - let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset.clone()); - // Max observed local-tail from sealed nodes let mut max_local_tail = LogletOffset::OLDEST; while let Some(response) = inflight_requests.join_next().await { @@ -127,20 +125,23 @@ impl SealTask { }; let Ok(response) = response else { // Seal failed/aborted on this node. - nodeset_status.insert(node_id, NodeSealStatus::Error); + nodeset_checker.set_attribute(node_id, NodeSealStatus::Error); continue; }; max_local_tail = max_local_tail.max(response.header.local_tail); known_global_tail.notify_offset_update(response.header.known_global_tail); - nodeset_checker.set_attribute(node_id, true); - nodeset_status.insert(node_id, NodeSealStatus::Sealed); - - if nodeset_checker.check_fmajority(|attr| *attr).passed() { - info!( + nodeset_checker.set_attribute(node_id, NodeSealStatus::Sealed); + + if nodeset_checker + .check_fmajority(|attr| *attr == NodeSealStatus::Sealed) + .passed() + { + let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset); + nodeset_status.extend(&nodeset_checker); + tracing::info!( loglet_id = %my_params.loglet_id, replication = %my_params.replication, - %effective_nodeset, %max_local_tail, global_tail = %known_global_tail.latest_offset(), "Seal task completed on f-majority of nodes in {:?}. Nodeset status {}", @@ -152,6 +153,8 @@ impl SealTask { } } + let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset); + nodeset_status.extend(&nodeset_checker); // no more tasks left. This means that we failed to seal Err(ReplicatedLogletError::SealFailed(nodeset_status)) } diff --git a/crates/log-server/src/service.rs b/crates/log-server/src/service.rs index da18f2d933..1ef1a2ae37 100644 --- a/crates/log-server/src/service.rs +++ b/crates/log-server/src/service.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use anyhow::Context; use tonic::codec::CompressionEncoding; -use tracing::{debug, info, instrument}; +use tracing::{debug, instrument}; use restate_core::metadata_store::{retry_on_retryable_error, ReadWriteError, RetryError}; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; @@ -255,7 +255,7 @@ impl LogServerService { }; metadata_writer.update(Arc::new(nodes_config)).await?; - info!("Log-store self-provisioning is complete, the node's log-store is now in read-write state"); + debug!("Log-store self-provisioning is complete, the node's log-store is now in read-write state"); Ok(target_storage_state) } } diff --git a/crates/metadata-server/src/raft/server.rs b/crates/metadata-server/src/raft/server.rs index 89122c4706..6be075a037 100644 --- a/crates/metadata-server/src/raft/server.rs +++ b/crates/metadata-server/src/raft/server.rs @@ -282,7 +282,7 @@ impl RaftMetadataServer { let mut provision_rx = self.provision_rx.take().expect("must be present"); let result = if let Some(configuration) = self.storage.get_raft_configuration()? { - debug!(member_id = %configuration.my_member_id, "Found existing metadata store configuration."); + debug!(member_id = %configuration.my_member_id, "Found existing metadata store configuration"); Provisioned::Member(self.become_member(configuration)?) } else { let mut nodes_config_watcher = @@ -291,16 +291,18 @@ impl RaftMetadataServer { if *nodes_config_watcher.borrow_and_update() > Version::INVALID { // The metadata store must have been provisioned if there exists a // NodesConfiguration. So let's move on. - debug!("Detected a valid NodesConfiguration. This indicates that the metadata store cluster has been provisioned."); + debug!("Detected a valid nodes configuration. This indicates that the metadata store cluster has been provisioned"); Provisioned::Standby(self.become_standby()) } else { - info!("Cluster has not been provisioned, yet. Awaiting the provision signal."); + if !Configuration::pinned().common.auto_provision { + info!("Cluster has not been provisioned, yet. Awaiting provisioning via `restatectl provision`"); + } loop { tokio::select! { Some(request) = self.request_rx.recv() => { // fail incoming requests while we are waiting for the provision signal let request = request.into_request(); - request.fail(RequestError::Unavailable("Metadata store has not been provisioned yet.".into(), None)) + request.fail(RequestError::Unavailable("Metadata store has not been provisioned yet".into(), None)) }, Some(request) = self.join_cluster_rx.recv() => { let _ = request.response_tx.send(Err(JoinClusterError::NotMember(None))); @@ -309,7 +311,7 @@ impl RaftMetadataServer { match self.initialize_storage(request.nodes_configuration).await { Ok(raft_configuration) => { let _ = request.result_tx.send(Ok(true)); - debug!(member_id = %raft_configuration.my_member_id, "Successfully provisioned the metadata store."); + debug!(member_id = %raft_configuration.my_member_id, "Successfully provisioned the metadata store"); let mut member = self.become_member(raft_configuration)?; member.campaign_immediately()?; break Provisioned::Member(member); @@ -324,7 +326,7 @@ impl RaftMetadataServer { if *nodes_config_watcher.borrow_and_update() > Version::INVALID { // The metadata store must have been provisioned if there exists a // NodesConfiguration. So let's move on. - debug!("Detected a valid NodesConfiguration. This indicates that the metadata store cluster has been provisioned."); + debug!("Detected a valid nodes configuration. This indicates that the metadata store cluster has been provisioned"); break Provisioned::Standby(self.become_standby()) } } @@ -353,7 +355,7 @@ impl RaftMetadataServer { let raft_configuration = self.derive_initial_configuration(&mut nodes_configuration)?; - debug!("Initialize storage with nodes configuration: {nodes_configuration:?}"); + debug!("Initialize storage with nodes configuration"); let initial_conf_state = ConfState::from(( vec![to_raft_id(raft_configuration.my_member_id.node_id)], @@ -656,8 +658,11 @@ impl Member { self.is_leader = self.raw_node.raft.leader_id == self.raw_node.raft.id; if previous_is_leader && !self.is_leader { - debug!("Lost metadata store leadership"); let known_leader = self.known_leader(); + info!( + possible_leader = ?known_leader, + "Lost metadata cluster leadership" + ); // todo we might fail some of the request too eagerly here because the answer might be // stored in the unapplied log entries. Better to fail the callbacks based on @@ -670,7 +675,7 @@ impl Member { self.fail_join_callbacks(|| JoinClusterError::NotLeader(known_leader.clone())); self.read_index_to_request_id.clear(); } else if !previous_is_leader && self.is_leader { - debug!("Won metadata store leadership"); + info!("Won metadata cluster leadership"); } } diff --git a/crates/metadata-server/src/raft/storage.rs b/crates/metadata-server/src/raft/storage.rs index 2e8770380c..9d75791398 100644 --- a/crates/metadata-server/src/raft/storage.rs +++ b/crates/metadata-server/src/raft/storage.rs @@ -8,14 +8,21 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::raft::RaftConfiguration; -use crate::{util, StorageId}; +use std::array::TryFromSliceError; +use std::cell::RefCell; +use std::mem::size_of; +use std::sync::Arc; +use std::{error, mem}; + use bytes::{BufMut, BytesMut}; use flexbuffers::{DeserializationError, SerializationError}; use protobuf::{Message, ProtobufError}; use raft::eraftpb::{ConfState, Entry, Snapshot}; use raft::prelude::HardState; use raft::{GetEntriesContext, RaftState, Storage, StorageError}; +use rocksdb::{BoundColumnFamily, DBPinnableSlice, ReadOptions, WriteBatch, WriteOptions, DB}; +use tracing::debug; + use restate_rocksdb::{ CfName, CfPrefixPattern, DbName, DbSpecBuilder, IoMode, Priority, RocksDb, RocksDbManager, RocksError, @@ -24,13 +31,9 @@ use restate_types::config::{data_dir, MetadataServerOptions, RocksDbOptions}; use restate_types::errors::GenericError; use restate_types::live::BoxedLiveLoad; use restate_types::nodes_config::NodesConfiguration; -use rocksdb::{BoundColumnFamily, DBPinnableSlice, ReadOptions, WriteBatch, WriteOptions, DB}; -use std::array::TryFromSliceError; -use std::cell::RefCell; -use std::mem::size_of; -use std::sync::Arc; -use std::{error, mem}; -use tracing::debug; + +use crate::raft::RaftConfiguration; +use crate::{util, StorageId}; const DB_NAME: &str = "raft-metadata-store"; const RAFT_CF: &str = "raft"; @@ -117,6 +120,7 @@ impl RocksDbStorage { util::cf_options(options.rocksdb_memory_budget()), ) .ensure_column_families(cfs) + .add_to_flush_on_shutdown(CfPrefixPattern::ANY) .build() .expect("valid spec"); diff --git a/crates/node/src/cluster_marker.rs b/crates/node/src/cluster_marker.rs index 94ddc54fb6..7b179a6d14 100644 --- a/crates/node/src/cluster_marker.rs +++ b/crates/node/src/cluster_marker.rs @@ -13,7 +13,7 @@ use semver::Version; use std::cmp::Ordering; use std::fs::OpenOptions; use std::path::Path; -use tracing::info; +use tracing::debug; const CLUSTER_MARKER_FILE_NAME: &str = ".cluster-marker"; const TMP_CLUSTER_MARKER_FILE_NAME: &str = ".tmp-cluster-marker"; @@ -141,7 +141,7 @@ fn validate_and_update_cluster_marker_inner( .map_err(ClusterValidationError::CreateFile)?; serde_json::from_reader(&cluster_marker_file).map_err(ClusterValidationError::Decode)? } else { - info!( + debug!( "Did not find existing cluster marker. Creating a new one under '{}'.", cluster_marker_filepath.display() ); diff --git a/crates/node/src/init.rs b/crates/node/src/init.rs index 7a70c97450..0982500a74 100644 --- a/crates/node/src/init.rs +++ b/crates/node/src/init.rs @@ -97,7 +97,7 @@ impl<'a> NodeInit<'a> { self.sync_metadata().await; - info!("Node initialization complete"); + trace!("Node initialization complete"); Ok(()) } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 2128bc795a..60fa278ba6 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -374,13 +374,19 @@ impl Node { match response { Ok(provisioned) => { if provisioned { - info!("Auto provisioned cluster '{}'.", common_opts.cluster_name()); + info!( + "Cluster '{}' has been automatically provisioned", + common_opts.cluster_name() + ); } else { - debug!("The cluster is already provisioned."); + debug!("The cluster is already provisioned"); } } Err(err) => { - warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); + warn!( + %err, + "Failed to auto provision the cluster. In order to continue you have to provision the cluster manually" + ); } } @@ -400,8 +406,8 @@ impl Node { .init(), ) .await - .context("Giving up trying to initialize the node. Make sure that it can reach the metadata store and don't forget to provision the cluster on a fresh start.")? - .context("Failed initializing the node.")?; + .context("Giving up trying to initialize the node. Make sure that it can reach the metadata store and don't forget to provision the cluster on a fresh start")? + .context("Failed initializing the node")?; let nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); let my_node_id = Metadata::with_current(|m| m.my_node_id()); @@ -428,7 +434,7 @@ impl Node { "announce-node-at-admin-node", async move { if let Err(err) = networking.node_connection(admin_node_id).await { - info!("Failed connecting to admin node '{admin_node_id}' and announcing myself. This can indicate network problems: {err}"); + debug!("Failed connecting to admin node '{admin_node_id}' and announcing myself. This can indicate network problems: {err}"); } }, )?; @@ -507,7 +513,7 @@ impl Node { } } } - info!("Restate server is ready"); + info!("Restate node roles [{}] were started", my_roles); Ok(()) }); diff --git a/crates/rocksdb/src/db_manager.rs b/crates/rocksdb/src/db_manager.rs index 34c9467d58..25464a44e9 100644 --- a/crates/rocksdb/src/db_manager.rs +++ b/crates/rocksdb/src/db_manager.rs @@ -248,7 +248,7 @@ impl RocksDbManager { while let Some(res) = tasks.join_next().await { match res { Ok(name) => { - info!( + debug!( db = %name, "Rocksdb database shutdown completed, {} remaining", tasks.len()); } diff --git a/crates/types/src/config_loader.rs b/crates/types/src/config_loader.rs index 6011596b49..fbffc9c6ce 100644 --- a/crates/types/src/config_loader.rs +++ b/crates/types/src/config_loader.rs @@ -17,7 +17,7 @@ use notify::{EventKind, RecommendedWatcher, RecursiveMode}; use notify_debouncer_full::{ new_debouncer, DebounceEventResult, DebouncedEvent, Debouncer, RecommendedCache, }; -use tracing::{error, info, warn}; +use tracing::{debug, error, warn}; use crate::config::Configuration; @@ -133,7 +133,7 @@ impl ConfigLoader { return; }; - info!("Installing watcher for config changes: {}", path.display()); + debug!("Installing watcher for config changes: {}", path.display()); if let Err(e) = debouncer.watch(&path, notify::RecursiveMode::NonRecursive) { warn!("Couldn't install configuration watcher: {}", e); return; @@ -144,7 +144,7 @@ impl ConfigLoader { .spawn(move || { // It's important that we capture the watcher in the thread, // otherwise it'll be dropped and we won't be watching anything! - info!("Configuration watcher thread has started"); + debug!("Configuration watcher thread has started"); let mut should_run = true; while should_run { match rx.recv() { @@ -157,7 +157,7 @@ impl ConfigLoader { } } } - info!("Config watcher thread has terminated"); + debug!("Config watcher thread has terminated"); }) .expect("start config watcher thread"); } diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 01c21e846c..009619b773 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -248,9 +248,18 @@ pub struct ReplicatedLogletConfig { /// New type that enforces that the nodeset size is never larger than 128. #[derive( - Debug, Clone, Copy, derive_more::Into, serde::Serialize, serde::Deserialize, Eq, PartialEq, + Debug, + Clone, + Copy, + derive_more::Into, + serde::Serialize, + serde::Deserialize, + Eq, + PartialEq, + derive_more::Display, )] #[serde(try_from = "u16", into = "u16")] +#[display("{_0}")] pub struct NodeSetSize(u16); impl NodeSetSize { diff --git a/crates/types/src/replication/nodeset.rs b/crates/types/src/replication/nodeset.rs index 9446dd2127..13c71c0d87 100644 --- a/crates/types/src/replication/nodeset.rs +++ b/crates/types/src/replication/nodeset.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::fmt::Display; use std::hash::{BuildHasherDefault, Hash, Hasher}; @@ -18,7 +19,7 @@ use itertools::Itertools; use rand::distr::Uniform; use rand::prelude::*; -use crate::PlainNodeId; +use crate::{Merge, PlainNodeId}; // Why? Over 50% faster in iteration than HashSet and ~40% faster than default RandomState for // contains() and set intersection operations. Additionally, it's 300% faster when created from @@ -276,8 +277,8 @@ impl std::fmt::Display for NodeSet { /// The alternate format displays a *sorted* list of short-form plain node ids, suitable for human-friendly output. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match f.alternate() { - false => write_nodes(self, f), - true => write_nodes_sorted(self, f), + false => write_nodes(self.0.iter(), f), + true => write_nodes(self.0.iter().sorted(), f), } } } @@ -302,55 +303,103 @@ impl From for DecoratedNodeSet { } } +impl DecoratedNodeSet { + pub fn merge(&mut self, node_id: impl Into, other: V) -> bool + where + V: Merge, + { + let node_id = node_id.into(); + match self.0.entry(node_id) { + Entry::Vacant(vacant_entry) => { + vacant_entry.insert(other); + true + } + Entry::Occupied(occupied_entry) => occupied_entry.into_mut().merge(other), + } + } + + pub fn merge_with_iter<'a>( + &mut self, + values: impl IntoIterator, + ) where + V: Merge + Clone, + V: 'a, + { + values.into_iter().for_each(|(node_id, v)| { + self.0 + .entry(*node_id) + .and_modify(|existing| { + existing.merge(v.clone()); + }) + .or_insert(v.clone()); + }); + } +} + impl FromIterator for DecoratedNodeSet { fn from_iter>(iter: T) -> Self { Self(iter.into_iter().map(|n| (n, Default::default())).collect()) } } +impl FromIterator<(PlainNodeId, V)> for DecoratedNodeSet { + fn from_iter>(iter: T) -> Self { + Self(iter.into_iter().collect()) + } +} + impl Display for DecoratedNodeSet { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "[")?; - for (pos, (node_id, v)) in self.0.iter().with_position() { - write!(f, "{node_id}({v})")?; - if pos != itertools::Position::Last { - write!(f, ", ")?; - } - } - write!(f, "]") + write_nodes_decorated_display(self.0.iter(), f) } } impl std::fmt::Debug for DecoratedNodeSet { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "[")?; - for (pos, (node_id, v)) in self.0.iter().with_position() { - write!(f, "{node_id}({v:?})")?; - if pos != itertools::Position::Last { - write!(f, ", ")?; - } + write_nodes_decorated_debug(self.0.iter(), f) + } +} + +fn write_nodes_decorated_display<'a, V: std::fmt::Display + 'a>( + iter: impl Iterator, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { + use itertools::Position; + write!(f, "[")?; + for (pos, (node_id, v)) in iter.with_position() { + match pos { + Position::Only | Position::Last => write!(f, "{node_id}({v})")?, + Position::First | Position::Middle => write!(f, "{node_id}({v}), ")?, } - write!(f, "]") } + write!(f, "]") } -fn write_nodes(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +fn write_nodes_decorated_debug<'a, V: std::fmt::Debug + 'a>( + iter: impl Iterator, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { + use itertools::Position; write!(f, "[")?; - for (pos, node_id) in nodeset.0.iter().with_position() { - write!(f, "{node_id}")?; - if pos != itertools::Position::Last { - write!(f, ", ")?; + for (pos, (node_id, v)) in iter.with_position() { + match pos { + Position::Only | Position::Last => write!(f, "{node_id}({v:?})")?, + Position::First | Position::Middle => write!(f, "{node_id}({v:?}), ")?, } } write!(f, "]") } -fn write_nodes_sorted(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +fn write_nodes<'a>( + iter: impl Iterator, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { + use itertools::Position; write!(f, "[")?; - for (pos, node_id) in nodeset.0.iter().sorted().with_position() { - write!(f, "{node_id}")?; - if pos != itertools::Position::Last { - write!(f, ", ")?; + for (pos, node_id) in iter.with_position() { + match pos { + Position::Only | Position::Last => write!(f, "{node_id}")?, + Position::First | Position::Middle => write!(f, "{node_id}, ")?, } } write!(f, "]") @@ -358,6 +407,8 @@ fn write_nodes_sorted(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std #[cfg(test)] mod test { + use ahash::HashMap; + use super::*; #[test] @@ -428,9 +479,21 @@ mod test { #[test] fn nodeset_display() { - let nodeset1 = NodeSet::from_iter([2, 3, 1, 4, 5]); - assert_eq!(nodeset1.to_string(), "[N2, N3, N1, N4, N5]"); - assert_eq!(format!("{nodeset1:#}"), "[N1, N2, N3, N4, N5]"); + let nodeset = NodeSet::from_iter([2, 3, 1, 4, 5]); + assert_eq!(nodeset.to_string(), "[N2, N3, N1, N4, N5]"); + assert_eq!(format!("{nodeset:#}"), "[N1, N2, N3, N4, N5]"); + + let nodeset = NodeSet::from_iter([2]); + assert_eq!(nodeset.to_string(), "[N2]"); + assert_eq!(format!("{nodeset:#}"), "[N2]"); + + let nodeset = NodeSet::from_iter([2]); + assert_eq!(nodeset.to_string(), "[N2]"); + assert_eq!(format!("{nodeset:#}"), "[N2]"); + + let nodeset = NodeSet::new(); + assert_eq!(nodeset.to_string(), "[]"); + assert_eq!(format!("{nodeset:#}"), "[]"); } #[test] @@ -443,11 +506,38 @@ mod test { #[display("E")] Error, } - let mut nodeset1 = DecoratedNodeSet::::from(NodeSet::from_iter([2, 3, 1, 4, 5])); + let mut nodeset = DecoratedNodeSet::::from(NodeSet::from_iter([2, 3, 1, 4, 5])); + + assert_eq!(format!("{nodeset}"), "[N1(E), N2(E), N3(E), N4(E), N5(E)]"); + nodeset.insert(PlainNodeId::from(5), Status::Sealed); + nodeset.insert(PlainNodeId::from(2), Status::Sealed); + assert_eq!(format!("{nodeset}"), "[N1(E), N2(S), N3(E), N4(E), N5(S)]"); + + let nodeset = DecoratedNodeSet::::from(NodeSet::from_iter([3])); + assert_eq!(format!("{nodeset}"), "[N3(E)]"); + + let nodeset = DecoratedNodeSet::::from(NodeSet::new()); + assert_eq!(format!("{nodeset}"), "[]"); + } - assert_eq!(format!("{nodeset1}"), "[N1(E), N2(E), N3(E), N4(E), N5(E)]"); + #[test] + fn decorated_nodeset_extend() { + #[derive(derive_more::Display, Default, Clone, Copy)] + enum Status { + #[display("S")] + Sealed, + #[default] + #[display("E")] + Error, + } + let mut nodeset1 = DecoratedNodeSet::::from(NodeSet::from_iter([2, 3, 1, 4, 5])); nodeset1.insert(PlainNodeId::from(5), Status::Sealed); - nodeset1.insert(PlainNodeId::from(2), Status::Sealed); - assert_eq!(format!("{nodeset1}"), "[N1(E), N2(S), N3(E), N4(E), N5(S)]"); + assert_eq!(format!("{nodeset1}"), "[N1(E), N2(E), N3(E), N4(E), N5(S)]"); + let mut status_map = HashMap::default(); + status_map.insert(PlainNodeId::from(1), Status::Sealed); + status_map.insert(PlainNodeId::from(4), Status::Sealed); + nodeset1.extend(&status_map); + + assert_eq!(format!("{nodeset1}"), "[N1(S), N2(E), N3(E), N4(S), N5(S)]"); } } diff --git a/crates/types/src/retries.rs b/crates/types/src/retries.rs index 4704794f27..b97de42bac 100644 --- a/crates/types/src/retries.rs +++ b/crates/types/src/retries.rs @@ -235,6 +235,26 @@ pub struct RetryIter<'a> { last_retry: Option, } +impl RetryIter<'_> { + /// The number of attempts on this retry iterator so far + pub fn attempts(&self) -> usize { + self.attempts + } + + pub fn max_attempts(&self) -> usize { + let max_attempts = match self.policy.as_ref() { + RetryPolicy::None => return 0, + RetryPolicy::FixedDelay { max_attempts, .. } => max_attempts, + RetryPolicy::Exponential { max_attempts, .. } => max_attempts, + }; + max_attempts.unwrap_or(NonZeroUsize::MAX).into() + } + + pub fn remaining_attempts(&self) -> usize { + self.max_attempts() - self.attempts() + } +} + impl Iterator for RetryIter<'_> { type Item = Duration; diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 2a74763621..ed9a55e27a 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -270,10 +270,10 @@ where { #[instrument( level = "error", skip_all, - fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty) + fields(partition_id = %self.partition_id) )] pub async fn run(mut self) -> Result<(), ProcessorError> { - info!("Starting the partition processor."); + debug!("Starting the partition processor."); let res = tokio::select! { res = self.run_inner() => { @@ -416,7 +416,7 @@ where let mut action_collector = ActionCollector::default(); let mut command_buffer = Vec::with_capacity(self.max_command_batch_size); - info!("PartitionProcessor starting event loop."); + info!("Partition {} started", self.partition_id); loop { tokio::select! { diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 254ffc32ff..f309de3a5e 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -70,6 +70,7 @@ impl SpawnPartitionProcessorTask { } #[instrument( + level = "error", skip_all, fields( partition_id=%self.partition_id, @@ -230,19 +231,22 @@ async fn create_or_recreate_store( // Attempt to get the latest available snapshot from the snapshot repository: let snapshot = match snapshot_repository { Some(repository) => { - debug!("Looking for partition snapshot from which to bootstrap partition store"); + debug!( + %partition_id, + "Looking for partition snapshot from which to bootstrap partition store" + ); // todo(pavel): pass target LSN to repository repository.get_latest(partition_id).await? } None => { - debug!("No snapshot repository configured"); + debug!(%partition_id, "No snapshot repository configured"); None } }; Ok(match (snapshot, fast_forward_lsn) { (None, None) => { - info!("No snapshot found to bootstrap partition, creating new store"); + debug!(%partition_id, "No snapshot found to bootstrap partition, creating new store"); partition_store_manager .open_partition_store( partition_id, @@ -255,7 +259,7 @@ async fn create_or_recreate_store( (Some(snapshot), None) => { // Based on the assumptions for calling this method, we should only reach this point if // there is no existing store - we can import without first dropping the column family. - info!(partition_id = %partition_id, "Found partition snapshot, restoring it"); + info!(%partition_id, "Found partition snapshot, restoring it"); import_snapshot( partition_id, key_range, @@ -270,8 +274,9 @@ async fn create_or_recreate_store( { // We trust that the fast_forward_lsn is greater than the locally applied LSN. info!( - latest_snapshot_lsn = ?snapshot.min_applied_lsn, - ?fast_forward_lsn, + %partition_id, + latest_snapshot_lsn = %snapshot.min_applied_lsn, + %fast_forward_lsn, "Found snapshot with LSN >= target LSN, dropping local partition store state", ); partition_store_manager.drop_partition(partition_id).await; @@ -289,13 +294,15 @@ async fn create_or_recreate_store( // point. We'll likely halt again as soon as the processor starts up. if let Some(snapshot) = maybe_snapshot { warn!( - ?snapshot.min_applied_lsn, - ?fast_forward_lsn, + %partition_id, + %snapshot.min_applied_lsn, + %fast_forward_lsn, "Latest available snapshot is older than the the fast-forward target LSN!", ); } else { warn!( - ?fast_forward_lsn, + %partition_id, + %fast_forward_lsn, "A fast-forward target LSN is set, but no snapshot available for partition!", ); } @@ -338,23 +345,26 @@ async fn import_snapshot( { Ok(partition_store) => { let res = tokio::fs::remove_dir_all(&snapshot_path).await; - if let Err(e) = res { + if let Err(err) = res { // This is not critical; since we move the SST files into RocksDB on import, // at worst only the snapshot metadata file will remain in the staging dir warn!( + %partition_id, snapshot_path = %snapshot_path.display(), - "Failed to remove local snapshot directory, continuing with startup: {}", - e, + %err, + "Failed to remove local snapshot directory, continuing with startup", ); } Ok(partition_store) } - Err(e) => { + Err(err) => { warn!( + %partition_id, snapshot_path = %snapshot_path.display(), + %err, "Failed to import snapshot, local snapshot data retained" ); - Err(anyhow::anyhow!(e)) + Err(err.into()) } } } diff --git a/server/tests/cluster.rs b/server/tests/cluster.rs index 0612f42630..5f07bf8cff 100644 --- a/server/tests/cluster.rs +++ b/server/tests/cluster.rs @@ -42,7 +42,7 @@ async fn replicated_loglet() -> googletest::Result<()> { false, ); - let regex: Regex = "Starting the partition processor".parse()?; + let regex: Regex = "Partition [0-9]+ started".parse()?; let mut partition_processors_starting_up: Vec<_> = nodes.iter().map(|node| node.lines(regex.clone())).collect(); diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs index c026246281..3c7dd85574 100644 --- a/server/tests/trim_gap_handling.rs +++ b/server/tests/trim_gap_handling.rs @@ -68,8 +68,8 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { let worker_1 = &nodes[0]; let worker_2 = &nodes[1]; - let mut worker_1_ready = worker_1.lines("PartitionProcessor starting event loop".parse()?); - let mut worker_2_ready = worker_2.lines("PartitionProcessor starting event loop".parse()?); + let mut worker_1_ready = worker_1.lines("Partition [0-9]+ started".parse()?); + let mut worker_2_ready = worker_2.lines("Partition [0-0]+ started".parse()?); let mut cluster = Cluster::builder() .temp_base_dir() diff --git a/tools/restatectl/src/commands/cluster/config.rs b/tools/restatectl/src/commands/cluster/config.rs index dcf0212314..08533914ea 100644 --- a/tools/restatectl/src/commands/cluster/config.rs +++ b/tools/restatectl/src/commands/cluster/config.rs @@ -80,6 +80,13 @@ fn write_default_provider( "Replication property", config.replication_property.to_string(), )?; + write_leaf( + w, + depth, + true, + "Nodeset size", + config.target_nodeset_size.to_string(), + )?; } } Ok(())