diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index 33688da8d5..40979e97b5 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -46,7 +46,7 @@ impl<'a> BifrostAdmin<'a> { /// Trim the log prefix up to and including the `trim_point`. /// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to /// trim all records of the log. - #[instrument(level = "debug", skip(self), err)] + #[instrument(level = "debug", skip(self))] pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<()> { self.inner.fail_if_shutting_down()?; self.inner.trim(log_id, trim_point).await @@ -59,7 +59,7 @@ impl<'a> BifrostAdmin<'a> { /// If the intention is to create the log, then `segment_index` must be set to `None`. /// /// This will continue to retry sealing for seal retryable errors automatically. - #[instrument(level = "debug", skip(self), err)] + #[instrument(level = "debug", skip(self))] pub async fn seal_and_auto_extend_chain( &self, log_id: LogId, @@ -115,7 +115,7 @@ impl<'a> BifrostAdmin<'a> { /// - if segment_index is set, the tail loglet must match segment_index. /// /// This will continue to retry sealing for seal retryable errors automatically. - #[instrument(level = "debug", skip(self), err)] + #[instrument(level = "debug", skip(self, params))] pub async fn seal_and_extend_chain( &self, log_id: LogId, @@ -201,7 +201,7 @@ impl<'a> BifrostAdmin<'a> { /// /// The loglet must be sealed first. This operations assumes that the loglet with /// `last_segment_index` has been sealed prior to this call. - #[instrument(level = "debug", skip(self), err)] + #[instrument(level = "debug", skip(self))] async fn add_segment_with_params( &self, log_id: LogId, @@ -248,7 +248,7 @@ impl<'a> BifrostAdmin<'a> { } /// Adds a new log if it doesn't exist. - #[instrument(level = "debug", skip(self), err)] + #[instrument(level = "debug", skip(self, params))] async fn add_log( &self, log_id: LogId, diff --git a/crates/bifrost/src/loglet.rs b/crates/bifrost/src/loglet.rs index 4ac1e1fddc..0f3f160320 100644 --- a/crates/bifrost/src/loglet.rs +++ b/crates/bifrost/src/loglet.rs @@ -18,6 +18,7 @@ pub mod util; pub use error::*; use futures::stream::BoxStream; pub use provider::{LogletProvider, LogletProviderFactory}; +use restate_types::logs::metadata::ProviderKind; use tokio::sync::oneshot; use std::pin::Pin; @@ -28,7 +29,7 @@ use async_trait::async_trait; use futures::{FutureExt, Stream}; use restate_core::ShutdownError; -use restate_types::logs::{KeyFilter, LogletOffset, Record, TailState}; +use restate_types::logs::{KeyFilter, LogletId, LogletOffset, Record, TailState}; use crate::LogEntry; use crate::Result; @@ -68,7 +69,7 @@ use crate::Result; /// ``` #[async_trait] -pub trait Loglet: Send + Sync + std::fmt::Debug { +pub trait Loglet: Send + Sync { /// Create a read stream that streams record from a single loglet instance. /// /// `to`: The offset of the last record to be read (inclusive). If `None`, the @@ -80,6 +81,12 @@ pub trait Loglet: Send + Sync + std::fmt::Debug { to: Option, ) -> Result; + /// A string representation of the id of this loglet + fn id(&self) -> LogletId; + + /// What is the provider of this loglet + fn provider(&self) -> ProviderKind; + /// Create a stream watching the state of tail for this loglet /// /// The stream will return the last known TailState with seal notification semantics diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index e7255e0406..6fe8dce7c7 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -44,6 +44,7 @@ pub struct LogletWrapper { pub(crate) tail_lsn: Option, #[debug(skip)] pub(crate) config: LogletConfig, + #[debug("{}/{}", loglet.provider(), loglet.id())] loglet: Arc, } diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index e9ab52f938..5b6d036731 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -25,11 +25,12 @@ use std::sync::Arc; use async_trait::async_trait; use futures::stream::BoxStream; use metrics::{counter, histogram, Histogram}; +use restate_types::logs::metadata::ProviderKind; use tokio::sync::Mutex; use tracing::{debug, warn}; use restate_core::ShutdownError; -use restate_types::logs::{KeyFilter, LogletOffset, Record, SequenceNumber, TailState}; +use restate_types::logs::{KeyFilter, LogletId, LogletOffset, Record, SequenceNumber, TailState}; use self::log_store::LogStoreError; use self::log_store::RocksDbLogStore; @@ -121,6 +122,14 @@ impl LocalLoglet { #[async_trait] impl Loglet for LocalLoglet { + fn id(&self) -> LogletId { + LogletId::from(self.loglet_id) + } + + fn provider(&self) -> ProviderKind { + ProviderKind::Local + } + async fn create_read_stream( self: Arc, filter: KeyFilter, diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index dd9bbf6ae1..a91611c686 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -93,7 +93,10 @@ impl LogletProvider for MemoryLogletProvider { } // Create loglet - let loglet = entry.insert(MemoryLoglet::new(params.clone())); + let raw: u64 = params + .parse() + .expect("memory loglets are configured just with u64 loglet ids"); + let loglet = entry.insert(MemoryLoglet::new(LogletId::new_unchecked(raw))); Arc::clone(loglet) } hash_map::Entry::Occupied(entry) => entry.get().clone(), @@ -111,9 +114,8 @@ impl LogletProvider for MemoryLogletProvider { let new_segment_index = chain .map(|c| c.tail_index().next()) .unwrap_or(SegmentIndex::OLDEST); - Ok(LogletParams::from( - LogletId::new(log_id, new_segment_index).to_string(), - )) + let id = LogletId::new(log_id, new_segment_index); + Ok(LogletParams::from(u64::from(id).to_string())) } async fn shutdown(&self) -> Result<(), OperationError> { @@ -124,8 +126,7 @@ impl LogletProvider for MemoryLogletProvider { #[derive(derive_more::Debug)] pub struct MemoryLoglet { - // We treat params as an opaque identifier for the underlying loglet. - params: LogletParams, + loglet_id: LogletId, #[debug(skip)] log: Mutex>, // internal offset _before_ the loglet head. Loglet head is trim_point_offset.next() @@ -138,9 +139,9 @@ pub struct MemoryLoglet { } impl MemoryLoglet { - pub fn new(params: LogletParams) -> Arc { + pub fn new(loglet_id: LogletId) -> Arc { Arc::new(Self { - params, + loglet_id, log: Mutex::new(Vec::new()), // Trim point is 0 initially trim_point_offset: AtomicU32::new(0), @@ -326,6 +327,13 @@ impl Stream for MemoryReadStream { #[async_trait] impl Loglet for MemoryLoglet { + fn id(&self) -> LogletId { + self.loglet_id + } + + fn provider(&self) -> ProviderKind { + ProviderKind::InMemory + } async fn create_read_stream( self: Arc, filter: KeyFilter, @@ -352,8 +360,8 @@ impl Loglet for MemoryLoglet { for payload in payloads.iter() { last_committed_offset = last_committed_offset.next(); debug!( - "Appending record to in-memory loglet {:?} at offset {}", - self.params, last_committed_offset + "Appending record to in-memory loglet {} at offset {}", + self.loglet_id, last_committed_offset ); log.push(payload.clone()); } @@ -430,7 +438,7 @@ mod tests { .set_provider_kind(ProviderKind::InMemory) .build() .await; - let loglet = MemoryLoglet::new(LogletParams::from("112".to_string())); + let loglet = MemoryLoglet::new(LogletId::new_unchecked(112)); crate::loglet::loglet_tests::$test(loglet).await } } diff --git a/crates/bifrost/src/providers/replicated_loglet/error.rs b/crates/bifrost/src/providers/replicated_loglet/error.rs index 775709cbf7..e266f23b8e 100644 --- a/crates/bifrost/src/providers/replicated_loglet/error.rs +++ b/crates/bifrost/src/providers/replicated_loglet/error.rs @@ -13,18 +13,32 @@ use std::sync::Arc; use restate_core::ShutdownError; use restate_types::errors::MaybeRetryableError; use restate_types::logs::metadata::SegmentIndex; -use restate_types::logs::{LogId, LogletId}; +use restate_types::logs::LogId; +use restate_types::replication::DecoratedNodeSet; use crate::loglet::OperationError; +#[derive(Default, derive_more::Display, derive_more::Debug)] +pub enum NodeSealStatus { + #[display("E")] + Error, + #[display("S")] + Sealed, + #[display("?")] + #[default] + Unknown, +} + #[derive(Debug, thiserror::Error)] pub(crate) enum ReplicatedLogletError { #[error("cannot parse loglet configuration for log_id={0} at segment_index={1}: {2}")] LogletParamsParsingError(LogId, SegmentIndex, serde_json::Error), #[error("cannot find the tail of the loglet: {0}")] FindTailFailed(String), - #[error("could not seal loglet_id={0}, insufficient nodes available for seal")] - SealFailed(LogletId), + #[error( + "could not seal loglet because insufficient nodes confirmed the seal. The nodeset status is {0}" + )] + SealFailed(DecoratedNodeSet), #[error(transparent)] Shutdown(#[from] ShutdownError), } diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index 2625447a9e..f12edb5ae6 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -17,9 +17,9 @@ use tokio::time::Instant; use tracing::{debug, info, instrument, trace}; use restate_core::network::{Networking, TransportConnect}; -use restate_types::logs::metadata::SegmentIndex; +use restate_types::logs::metadata::{ProviderKind, SegmentIndex}; use restate_types::logs::{ - KeyFilter, LogId, LogletOffset, Record, RecordCache, SequenceNumber, TailState, + KeyFilter, LogId, LogletId, LogletOffset, Record, RecordCache, SequenceNumber, TailState, }; use restate_types::replicated_loglet::ReplicatedLogletParams; @@ -263,6 +263,14 @@ impl ReplicatedLoglet { #[async_trait] impl Loglet for ReplicatedLoglet { + fn id(&self) -> LogletId { + self.my_params.loglet_id + } + + fn provider(&self) -> ProviderKind { + ProviderKind::Replicated + } + async fn create_read_stream( self: Arc, filter: KeyFilter, diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs index 230f830563..d0690fbdbd 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs @@ -19,10 +19,10 @@ use restate_types::config::Configuration; use restate_types::logs::{LogletOffset, SequenceNumber}; use restate_types::net::log_server::{LogServerRequestHeader, Seal, Sealed, Status}; use restate_types::replicated_loglet::{LogNodeSetExt, ReplicatedLogletParams}; -use restate_types::replication::NodeSet; +use restate_types::replication::DecoratedNodeSet; use crate::loglet::util::TailOffsetWatch; -use crate::providers::replicated_loglet::error::ReplicatedLogletError; +use crate::providers::replicated_loglet::error::{NodeSealStatus, ReplicatedLogletError}; use crate::providers::replicated_loglet::replication::NodeSetChecker; use crate::providers::replicated_loglet::tasks::util::RunOnSingleNode; @@ -66,7 +66,7 @@ impl SealTask { is_sealed = known_global_tail.is_sealed(), "Cannot seal the loglet as all nodeset members are in `Provisioning` storage state" ); - return Err(ReplicatedLogletError::SealFailed(my_params.loglet_id)); + return Err(ReplicatedLogletError::SealFailed(Default::default())); } // Use the entire nodeset except for StorageState::Disabled. let effective_nodeset = my_params @@ -116,6 +116,8 @@ impl SealTask { }); } + let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset.clone()); + // Max observed local-tail from sealed nodes let mut max_local_tail = LogletOffset::OLDEST; while let Some(response) = inflight_requests.join_next().await { @@ -125,29 +127,26 @@ impl SealTask { }; let Ok(response) = response else { // Seal failed/aborted on this node. + nodeset_status.insert(node_id, NodeSealStatus::Error); continue; }; max_local_tail = max_local_tail.max(response.header.local_tail); known_global_tail.notify_offset_update(response.header.known_global_tail); nodeset_checker.set_attribute(node_id, true); + nodeset_status.insert(node_id, NodeSealStatus::Sealed); // todo: consider allowing the seal to pass at best-effort f-majority. if nodeset_checker.check_fmajority(|attr| *attr).passed() { - let sealed_nodes: NodeSet = nodeset_checker - .filter(|sealed| *sealed) - .map(|(n, _)| *n) - .collect(); - info!( loglet_id = %my_params.loglet_id, replication = %my_params.replication, %effective_nodeset, %max_local_tail, global_tail = %known_global_tail.latest_offset(), - "Seal task completed on f-majority of nodes in {:?}. Sealed log-servers {}", + "Seal task completed on f-majority of nodes in {:?}. Nodeset status {}", start.elapsed(), - sealed_nodes, + nodeset_status, ); // note that we drop the rest of the seal requests after return return Ok(max_local_tail); @@ -155,7 +154,7 @@ impl SealTask { } // no more tasks left. This means that we failed to seal - Err(ReplicatedLogletError::SealFailed(my_params.loglet_id)) + Err(ReplicatedLogletError::SealFailed(nodeset_status)) } } diff --git a/crates/types/src/replication/nodeset.rs b/crates/types/src/replication/nodeset.rs index cb1ecc6d71..9446dd2127 100644 --- a/crates/types/src/replication/nodeset.rs +++ b/crates/types/src/replication/nodeset.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeMap; +use std::fmt::Display; use std::hash::{BuildHasherDefault, Hash, Hasher}; use std::iter::FusedIterator; @@ -280,25 +282,75 @@ impl std::fmt::Display for NodeSet { } } -fn write_nodes(node_set: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +/// A helper type for attaching displayable information neatly with the nodeset +/// useful to construct a view of the impact of an operation on a set of nodes. +/// Note that it sorts the nodeset for display. +/// +/// For example: [N1(S), N2(F), N3(?)] +#[derive(Default, derive_more::DerefMut, derive_more::Deref)] +pub struct DecoratedNodeSet(BTreeMap); + +impl From for DecoratedNodeSet { + fn from(value: NodeSet) -> Self { + Self( + value + .iter() + .copied() + .map(|n| (n, Default::default())) + .collect(), + ) + } +} + +impl FromIterator for DecoratedNodeSet { + fn from_iter>(iter: T) -> Self { + Self(iter.into_iter().map(|n| (n, Default::default())).collect()) + } +} + +impl Display for DecoratedNodeSet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[")?; + for (pos, (node_id, v)) in self.0.iter().with_position() { + write!(f, "{node_id}({v})")?; + if pos != itertools::Position::Last { + write!(f, ", ")?; + } + } + write!(f, "]") + } +} + +impl std::fmt::Debug for DecoratedNodeSet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[")?; + for (pos, (node_id, v)) in self.0.iter().with_position() { + write!(f, "{node_id}({v:?})")?; + if pos != itertools::Position::Last { + write!(f, ", ")?; + } + } + write!(f, "]") + } +} + +fn write_nodes(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "[")?; - let mut nodes = node_set.0.iter(); - if let Some(node) = nodes.next() { - write!(f, "{node}")?; - for node in nodes { - write!(f, ", {node}")?; + for (pos, node_id) in nodeset.0.iter().with_position() { + write!(f, "{node_id}")?; + if pos != itertools::Position::Last { + write!(f, ", ")?; } } write!(f, "]") } -fn write_nodes_sorted(node_set: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +fn write_nodes_sorted(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "[")?; - let mut nodes = node_set.0.iter().sorted(); - if let Some(node) = nodes.next() { - write!(f, "{node}")?; - for node in nodes { - write!(f, ", {node}")?; + for (pos, node_id) in nodeset.0.iter().sorted().with_position() { + write!(f, "{node_id}")?; + if pos != itertools::Position::Last { + write!(f, ", ")?; } } write!(f, "]") @@ -373,4 +425,29 @@ mod test { assert_eq!(intersection.len(), 5); assert_eq!(intersection, NodeSet::from(vec![1, 2, 3, 4, 5])); } + + #[test] + fn nodeset_display() { + let nodeset1 = NodeSet::from_iter([2, 3, 1, 4, 5]); + assert_eq!(nodeset1.to_string(), "[N2, N3, N1, N4, N5]"); + assert_eq!(format!("{nodeset1:#}"), "[N1, N2, N3, N4, N5]"); + } + + #[test] + fn decorated_nodeset_display() { + #[derive(derive_more::Display, Default)] + enum Status { + #[display("S")] + Sealed, + #[default] + #[display("E")] + Error, + } + let mut nodeset1 = DecoratedNodeSet::::from(NodeSet::from_iter([2, 3, 1, 4, 5])); + + assert_eq!(format!("{nodeset1}"), "[N1(E), N2(E), N3(E), N4(E), N5(E)]"); + nodeset1.insert(PlainNodeId::from(5), Status::Sealed); + nodeset1.insert(PlainNodeId::from(2), Status::Sealed); + assert_eq!(format!("{nodeset1}"), "[N1(E), N2(S), N3(E), N4(E), N5(S)]"); + } }