Skip to content

Commit

Permalink
[Bifrost] Improve logging of seal failures
Browse files Browse the repository at this point in the history
Additionally, loglet now has `id()` and `provider()` and `LogletWrapper` will print nice Debug value when used in tracing.
  • Loading branch information
AhmedSoliman committed Feb 4, 2025
1 parent ebc159e commit fd56ace
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<'a> BifrostAdmin<'a> {
/// Trim the log prefix up to and including the `trim_point`.
/// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to
/// trim all records of the log.
#[instrument(level = "debug", skip(self), err)]
#[instrument(level = "debug", skip(self))]
pub async fn trim(&self, log_id: LogId, trim_point: Lsn) -> Result<()> {
self.inner.fail_if_shutting_down()?;
self.inner.trim(log_id, trim_point).await
Expand All @@ -59,7 +59,7 @@ impl<'a> BifrostAdmin<'a> {
/// If the intention is to create the log, then `segment_index` must be set to `None`.
///
/// This will continue to retry sealing for seal retryable errors automatically.
#[instrument(level = "debug", skip(self), err)]
#[instrument(level = "debug", skip(self))]
pub async fn seal_and_auto_extend_chain(
&self,
log_id: LogId,
Expand Down Expand Up @@ -115,7 +115,7 @@ impl<'a> BifrostAdmin<'a> {
/// - if segment_index is set, the tail loglet must match segment_index.
///
/// This will continue to retry sealing for seal retryable errors automatically.
#[instrument(level = "debug", skip(self), err)]
#[instrument(level = "debug", skip(self, params))]
pub async fn seal_and_extend_chain(
&self,
log_id: LogId,
Expand Down Expand Up @@ -201,7 +201,7 @@ impl<'a> BifrostAdmin<'a> {
///
/// The loglet must be sealed first. This operations assumes that the loglet with
/// `last_segment_index` has been sealed prior to this call.
#[instrument(level = "debug", skip(self), err)]
#[instrument(level = "debug", skip(self))]
async fn add_segment_with_params(
&self,
log_id: LogId,
Expand Down Expand Up @@ -248,7 +248,7 @@ impl<'a> BifrostAdmin<'a> {
}

/// Adds a new log if it doesn't exist.
#[instrument(level = "debug", skip(self), err)]
#[instrument(level = "debug", skip(self, params))]
async fn add_log(
&self,
log_id: LogId,
Expand Down
11 changes: 9 additions & 2 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod util;
pub use error::*;
use futures::stream::BoxStream;
pub use provider::{LogletProvider, LogletProviderFactory};
use restate_types::logs::metadata::ProviderKind;
use tokio::sync::oneshot;

use std::pin::Pin;
Expand All @@ -28,7 +29,7 @@ use async_trait::async_trait;
use futures::{FutureExt, Stream};

use restate_core::ShutdownError;
use restate_types::logs::{KeyFilter, LogletOffset, Record, TailState};
use restate_types::logs::{KeyFilter, LogletId, LogletOffset, Record, TailState};

use crate::LogEntry;
use crate::Result;
Expand Down Expand Up @@ -68,7 +69,7 @@ use crate::Result;
/// ```
#[async_trait]
pub trait Loglet: Send + Sync + std::fmt::Debug {
pub trait Loglet: Send + Sync {
/// Create a read stream that streams record from a single loglet instance.
///
/// `to`: The offset of the last record to be read (inclusive). If `None`, the
Expand All @@ -80,6 +81,12 @@ pub trait Loglet: Send + Sync + std::fmt::Debug {
to: Option<LogletOffset>,
) -> Result<SendableLogletReadStream, OperationError>;

/// A string representation of the id of this loglet
fn id(&self) -> LogletId;

/// What is the provider of this loglet
fn provider(&self) -> ProviderKind;

/// Create a stream watching the state of tail for this loglet
///
/// The stream will return the last known TailState with seal notification semantics
Expand Down
1 change: 1 addition & 0 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct LogletWrapper {
pub(crate) tail_lsn: Option<Lsn>,
#[debug(skip)]
pub(crate) config: LogletConfig,
#[debug("{}/{}", loglet.provider(), loglet.id())]
loglet: Arc<dyn Loglet>,
}

Expand Down
11 changes: 10 additions & 1 deletion crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::stream::BoxStream;
use metrics::{counter, histogram, Histogram};
use restate_types::logs::metadata::ProviderKind;
use tokio::sync::Mutex;
use tracing::{debug, warn};

use restate_core::ShutdownError;
use restate_types::logs::{KeyFilter, LogletOffset, Record, SequenceNumber, TailState};
use restate_types::logs::{KeyFilter, LogletId, LogletOffset, Record, SequenceNumber, TailState};

use self::log_store::LogStoreError;
use self::log_store::RocksDbLogStore;
Expand Down Expand Up @@ -121,6 +122,14 @@ impl LocalLoglet {

#[async_trait]
impl Loglet for LocalLoglet {
fn id(&self) -> LogletId {
LogletId::from(self.loglet_id)
}

fn provider(&self) -> ProviderKind {
ProviderKind::Local
}

async fn create_read_stream(
self: Arc<Self>,
filter: KeyFilter,
Expand Down
30 changes: 19 additions & 11 deletions crates/bifrost/src/providers/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ impl LogletProvider for MemoryLogletProvider {
}

// Create loglet
let loglet = entry.insert(MemoryLoglet::new(params.clone()));
let raw: u64 = params
.parse()
.expect("memory loglets are configured just with u64 loglet ids");
let loglet = entry.insert(MemoryLoglet::new(LogletId::new_unchecked(raw)));
Arc::clone(loglet)
}
hash_map::Entry::Occupied(entry) => entry.get().clone(),
Expand All @@ -111,9 +114,8 @@ impl LogletProvider for MemoryLogletProvider {
let new_segment_index = chain
.map(|c| c.tail_index().next())
.unwrap_or(SegmentIndex::OLDEST);
Ok(LogletParams::from(
LogletId::new(log_id, new_segment_index).to_string(),
))
let id = LogletId::new(log_id, new_segment_index);
Ok(LogletParams::from(u64::from(id).to_string()))
}

async fn shutdown(&self) -> Result<(), OperationError> {
Expand All @@ -124,8 +126,7 @@ impl LogletProvider for MemoryLogletProvider {

#[derive(derive_more::Debug)]
pub struct MemoryLoglet {
// We treat params as an opaque identifier for the underlying loglet.
params: LogletParams,
loglet_id: LogletId,
#[debug(skip)]
log: Mutex<Vec<Record>>,
// internal offset _before_ the loglet head. Loglet head is trim_point_offset.next()
Expand All @@ -138,9 +139,9 @@ pub struct MemoryLoglet {
}

impl MemoryLoglet {
pub fn new(params: LogletParams) -> Arc<Self> {
pub fn new(loglet_id: LogletId) -> Arc<Self> {
Arc::new(Self {
params,
loglet_id,
log: Mutex::new(Vec::new()),
// Trim point is 0 initially
trim_point_offset: AtomicU32::new(0),
Expand Down Expand Up @@ -326,6 +327,13 @@ impl Stream for MemoryReadStream {

#[async_trait]
impl Loglet for MemoryLoglet {
fn id(&self) -> LogletId {
self.loglet_id
}

fn provider(&self) -> ProviderKind {
ProviderKind::InMemory
}
async fn create_read_stream(
self: Arc<Self>,
filter: KeyFilter,
Expand All @@ -352,8 +360,8 @@ impl Loglet for MemoryLoglet {
for payload in payloads.iter() {
last_committed_offset = last_committed_offset.next();
debug!(
"Appending record to in-memory loglet {:?} at offset {}",
self.params, last_committed_offset
"Appending record to in-memory loglet {} at offset {}",
self.loglet_id, last_committed_offset
);
log.push(payload.clone());
}
Expand Down Expand Up @@ -430,7 +438,7 @@ mod tests {
.set_provider_kind(ProviderKind::InMemory)
.build()
.await;
let loglet = MemoryLoglet::new(LogletParams::from("112".to_string()));
let loglet = MemoryLoglet::new(LogletId::new_unchecked(112));
crate::loglet::loglet_tests::$test(loglet).await
}
}
Expand Down
20 changes: 17 additions & 3 deletions crates/bifrost/src/providers/replicated_loglet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,32 @@ use std::sync::Arc;
use restate_core::ShutdownError;
use restate_types::errors::MaybeRetryableError;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{LogId, LogletId};
use restate_types::logs::LogId;
use restate_types::replication::DecoratedNodeSet;

use crate::loglet::OperationError;

#[derive(Default, derive_more::Display, derive_more::Debug)]
pub enum NodeSealStatus {
#[display("E")]
Error,
#[display("S")]
Sealed,
#[display("?")]
#[default]
Unknown,
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum ReplicatedLogletError {
#[error("cannot parse loglet configuration for log_id={0} at segment_index={1}: {2}")]
LogletParamsParsingError(LogId, SegmentIndex, serde_json::Error),
#[error("cannot find the tail of the loglet: {0}")]
FindTailFailed(String),
#[error("could not seal loglet_id={0}, insufficient nodes available for seal")]
SealFailed(LogletId),
#[error(
"could not seal loglet because insufficient nodes confirmed the seal. The nodeset status is {0}"
)]
SealFailed(DecoratedNodeSet<NodeSealStatus>),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
}
Expand Down
12 changes: 10 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use tokio::time::Instant;
use tracing::{debug, info, instrument, trace};

use restate_core::network::{Networking, TransportConnect};
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::metadata::{ProviderKind, SegmentIndex};
use restate_types::logs::{
KeyFilter, LogId, LogletOffset, Record, RecordCache, SequenceNumber, TailState,
KeyFilter, LogId, LogletId, LogletOffset, Record, RecordCache, SequenceNumber, TailState,
};
use restate_types::replicated_loglet::ReplicatedLogletParams;

Expand Down Expand Up @@ -263,6 +263,14 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {

#[async_trait]
impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
fn id(&self) -> LogletId {
self.my_params.loglet_id
}

fn provider(&self) -> ProviderKind {
ProviderKind::Replicated
}

async fn create_read_stream(
self: Arc<Self>,
filter: KeyFilter,
Expand Down
21 changes: 10 additions & 11 deletions crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use restate_types::config::Configuration;
use restate_types::logs::{LogletOffset, SequenceNumber};
use restate_types::net::log_server::{LogServerRequestHeader, Seal, Sealed, Status};
use restate_types::replicated_loglet::{LogNodeSetExt, ReplicatedLogletParams};
use restate_types::replication::NodeSet;
use restate_types::replication::DecoratedNodeSet;

use crate::loglet::util::TailOffsetWatch;
use crate::providers::replicated_loglet::error::ReplicatedLogletError;
use crate::providers::replicated_loglet::error::{NodeSealStatus, ReplicatedLogletError};
use crate::providers::replicated_loglet::replication::NodeSetChecker;
use crate::providers::replicated_loglet::tasks::util::RunOnSingleNode;

Expand Down Expand Up @@ -66,7 +66,7 @@ impl SealTask {
is_sealed = known_global_tail.is_sealed(),
"Cannot seal the loglet as all nodeset members are in `Provisioning` storage state"
);
return Err(ReplicatedLogletError::SealFailed(my_params.loglet_id));
return Err(ReplicatedLogletError::SealFailed(Default::default()));
}
// Use the entire nodeset except for StorageState::Disabled.
let effective_nodeset = my_params
Expand Down Expand Up @@ -116,6 +116,8 @@ impl SealTask {
});
}

let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset.clone());

// Max observed local-tail from sealed nodes
let mut max_local_tail = LogletOffset::OLDEST;
while let Some(response) = inflight_requests.join_next().await {
Expand All @@ -125,37 +127,34 @@ impl SealTask {
};
let Ok(response) = response else {
// Seal failed/aborted on this node.
nodeset_status.insert(node_id, NodeSealStatus::Error);
continue;
};

max_local_tail = max_local_tail.max(response.header.local_tail);
known_global_tail.notify_offset_update(response.header.known_global_tail);
nodeset_checker.set_attribute(node_id, true);
nodeset_status.insert(node_id, NodeSealStatus::Sealed);

// todo: consider allowing the seal to pass at best-effort f-majority.
if nodeset_checker.check_fmajority(|attr| *attr).passed() {
let sealed_nodes: NodeSet = nodeset_checker
.filter(|sealed| *sealed)
.map(|(n, _)| *n)
.collect();

info!(
loglet_id = %my_params.loglet_id,
replication = %my_params.replication,
%effective_nodeset,
%max_local_tail,
global_tail = %known_global_tail.latest_offset(),
"Seal task completed on f-majority of nodes in {:?}. Sealed log-servers {}",
"Seal task completed on f-majority of nodes in {:?}. Nodeset status {}",
start.elapsed(),
sealed_nodes,
nodeset_status,
);
// note that we drop the rest of the seal requests after return
return Ok(max_local_tail);
}
}

// no more tasks left. This means that we failed to seal
Err(ReplicatedLogletError::SealFailed(my_params.loglet_id))
Err(ReplicatedLogletError::SealFailed(nodeset_status))
}
}

Expand Down
Loading

0 comments on commit fd56ace

Please sign in to comment.