From adad40e949c97aae4b131855e7a52a5b952009b1 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 6 Feb 2025 12:37:07 +0000 Subject: [PATCH] `DecoratedNodeSet` Utility to display decorated nodesets The PR also uses it in CheckSeal. This utility `DecoratedNodeSet` can be used when you want to add a nugget of information next to every node. It prints the nodeset sorted as well. ``` // intentionally empty ``` --- Cargo.lock | 1 + crates/bifrost/Cargo.toml | 1 + crates/bifrost/src/bifrost_admin.rs | 6 +- .../src/providers/replicated_loglet/error.rs | 2 +- .../replicated_loglet/replication/checker.rs | 20 ++- .../providers/replicated_loglet/tasks/seal.rs | 21 ++- crates/types/src/replication/nodeset.rs | 160 ++++++++++++++---- 7 files changed, 162 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e72b94b36..ce672a052 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6425,6 +6425,7 @@ dependencies = [ name = "restate-bifrost" version = "1.2.0-dev" dependencies = [ + "ahash 0.8.11", "anyhow", "async-trait", "bytes", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 237eab71a..b87ab2277 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -25,6 +25,7 @@ restate-rocksdb = { workspace = true } restate-test-util = { workspace = true, optional = true } restate-types = { workspace = true } +ahash = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index 40979e97b..fffb3eb49 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -10,7 +10,7 @@ use std::sync::Arc; -use tracing::{debug, info, instrument}; +use tracing::{debug, instrument, warn}; use restate_core::metadata_store::retry_on_retryable_error; use restate_core::{Metadata, MetadataKind}; @@ -158,7 +158,7 @@ impl<'a> BifrostAdmin<'a> { self.inner.writeable_loglet(log_id).await } - #[instrument(level = "debug", skip(self), err)] + #[instrument(level = "debug", skip(self))] pub async fn seal(&self, log_id: LogId, segment_index: SegmentIndex) -> Result { self.inner.fail_if_shutting_down()?; // first find the tail segment for this log. @@ -177,7 +177,7 @@ impl<'a> BifrostAdmin<'a> { match err { crate::loglet::OperationError::Shutdown(err) => return Err(err.into()), crate::loglet::OperationError::Other(err) => { - info!( + warn!( log_id = %log_id, segment = %segment_index, %err, diff --git a/crates/bifrost/src/providers/replicated_loglet/error.rs b/crates/bifrost/src/providers/replicated_loglet/error.rs index e266f23b8..8467f3b7c 100644 --- a/crates/bifrost/src/providers/replicated_loglet/error.rs +++ b/crates/bifrost/src/providers/replicated_loglet/error.rs @@ -18,7 +18,7 @@ use restate_types::replication::DecoratedNodeSet; use crate::loglet::OperationError; -#[derive(Default, derive_more::Display, derive_more::Debug)] +#[derive(Default, derive_more::Display, derive_more::Debug, PartialEq, Eq, Hash, Clone, Copy)] pub enum NodeSealStatus { #[display("E")] Error, diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs index 3c389112a..e2170d66d 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs @@ -8,12 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::BTreeMap; use std::fmt::Debug; use std::fmt::Display; use std::hash::{Hash, Hasher}; +use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use itertools::Itertools; +use restate_types::replication::DecoratedNodeSet; use tracing::warn; use restate_types::locality::{LocationScope, NodeLocation}; @@ -734,6 +736,22 @@ impl LocationScopeState { } } +impl<'a, Attr> IntoIterator for &'a NodeSetChecker { + type Item = (&'a PlainNodeId, &'a Attr); + + type IntoIter = <&'a HashMap as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.node_to_attr.iter() + } +} + +impl From> for DecoratedNodeSet { + fn from(val: NodeSetChecker) -> DecoratedNodeSet { + DecoratedNodeSet::from_iter(val.node_to_attr) + } +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs index 3686745a1..c7e5fe794 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs @@ -73,7 +73,7 @@ impl SealTask { .nodeset .to_effective(&networking.metadata().nodes_config_ref()); - let mut nodeset_checker = NodeSetChecker::::new( + let mut nodeset_checker = NodeSetChecker::::new( &effective_nodeset, &networking.metadata().nodes_config_ref(), &my_params.replication, @@ -116,8 +116,6 @@ impl SealTask { }); } - let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset.clone()); - // Max observed local-tail from sealed nodes let mut max_local_tail = LogletOffset::OLDEST; while let Some(response) = inflight_requests.join_next().await { @@ -127,20 +125,23 @@ impl SealTask { }; let Ok(response) = response else { // Seal failed/aborted on this node. - nodeset_status.insert(node_id, NodeSealStatus::Error); + nodeset_checker.set_attribute(node_id, NodeSealStatus::Error); continue; }; max_local_tail = max_local_tail.max(response.header.local_tail); known_global_tail.notify_offset_update(response.header.known_global_tail); - nodeset_checker.set_attribute(node_id, true); - nodeset_status.insert(node_id, NodeSealStatus::Sealed); - - if nodeset_checker.check_fmajority(|attr| *attr).passed() { + nodeset_checker.set_attribute(node_id, NodeSealStatus::Sealed); + + if nodeset_checker + .check_fmajority(|attr| *attr == NodeSealStatus::Sealed) + .passed() + { + let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset); + nodeset_status.extend(&nodeset_checker); info!( loglet_id = %my_params.loglet_id, replication = %my_params.replication, - %effective_nodeset, %max_local_tail, global_tail = %known_global_tail.latest_offset(), "Seal task completed on f-majority of nodes in {:?}. Nodeset status {}", @@ -152,6 +153,8 @@ impl SealTask { } } + let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset); + nodeset_status.extend(&nodeset_checker); // no more tasks left. This means that we failed to seal Err(ReplicatedLogletError::SealFailed(nodeset_status)) } diff --git a/crates/types/src/replication/nodeset.rs b/crates/types/src/replication/nodeset.rs index 9446dd212..13c71c0d8 100644 --- a/crates/types/src/replication/nodeset.rs +++ b/crates/types/src/replication/nodeset.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::fmt::Display; use std::hash::{BuildHasherDefault, Hash, Hasher}; @@ -18,7 +19,7 @@ use itertools::Itertools; use rand::distr::Uniform; use rand::prelude::*; -use crate::PlainNodeId; +use crate::{Merge, PlainNodeId}; // Why? Over 50% faster in iteration than HashSet and ~40% faster than default RandomState for // contains() and set intersection operations. Additionally, it's 300% faster when created from @@ -276,8 +277,8 @@ impl std::fmt::Display for NodeSet { /// The alternate format displays a *sorted* list of short-form plain node ids, suitable for human-friendly output. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match f.alternate() { - false => write_nodes(self, f), - true => write_nodes_sorted(self, f), + false => write_nodes(self.0.iter(), f), + true => write_nodes(self.0.iter().sorted(), f), } } } @@ -302,55 +303,103 @@ impl From for DecoratedNodeSet { } } +impl DecoratedNodeSet { + pub fn merge(&mut self, node_id: impl Into, other: V) -> bool + where + V: Merge, + { + let node_id = node_id.into(); + match self.0.entry(node_id) { + Entry::Vacant(vacant_entry) => { + vacant_entry.insert(other); + true + } + Entry::Occupied(occupied_entry) => occupied_entry.into_mut().merge(other), + } + } + + pub fn merge_with_iter<'a>( + &mut self, + values: impl IntoIterator, + ) where + V: Merge + Clone, + V: 'a, + { + values.into_iter().for_each(|(node_id, v)| { + self.0 + .entry(*node_id) + .and_modify(|existing| { + existing.merge(v.clone()); + }) + .or_insert(v.clone()); + }); + } +} + impl FromIterator for DecoratedNodeSet { fn from_iter>(iter: T) -> Self { Self(iter.into_iter().map(|n| (n, Default::default())).collect()) } } +impl FromIterator<(PlainNodeId, V)> for DecoratedNodeSet { + fn from_iter>(iter: T) -> Self { + Self(iter.into_iter().collect()) + } +} + impl Display for DecoratedNodeSet { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "[")?; - for (pos, (node_id, v)) in self.0.iter().with_position() { - write!(f, "{node_id}({v})")?; - if pos != itertools::Position::Last { - write!(f, ", ")?; - } - } - write!(f, "]") + write_nodes_decorated_display(self.0.iter(), f) } } impl std::fmt::Debug for DecoratedNodeSet { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "[")?; - for (pos, (node_id, v)) in self.0.iter().with_position() { - write!(f, "{node_id}({v:?})")?; - if pos != itertools::Position::Last { - write!(f, ", ")?; - } + write_nodes_decorated_debug(self.0.iter(), f) + } +} + +fn write_nodes_decorated_display<'a, V: std::fmt::Display + 'a>( + iter: impl Iterator, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { + use itertools::Position; + write!(f, "[")?; + for (pos, (node_id, v)) in iter.with_position() { + match pos { + Position::Only | Position::Last => write!(f, "{node_id}({v})")?, + Position::First | Position::Middle => write!(f, "{node_id}({v}), ")?, } - write!(f, "]") } + write!(f, "]") } -fn write_nodes(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +fn write_nodes_decorated_debug<'a, V: std::fmt::Debug + 'a>( + iter: impl Iterator, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { + use itertools::Position; write!(f, "[")?; - for (pos, node_id) in nodeset.0.iter().with_position() { - write!(f, "{node_id}")?; - if pos != itertools::Position::Last { - write!(f, ", ")?; + for (pos, (node_id, v)) in iter.with_position() { + match pos { + Position::Only | Position::Last => write!(f, "{node_id}({v:?})")?, + Position::First | Position::Middle => write!(f, "{node_id}({v:?}), ")?, } } write!(f, "]") } -fn write_nodes_sorted(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +fn write_nodes<'a>( + iter: impl Iterator, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { + use itertools::Position; write!(f, "[")?; - for (pos, node_id) in nodeset.0.iter().sorted().with_position() { - write!(f, "{node_id}")?; - if pos != itertools::Position::Last { - write!(f, ", ")?; + for (pos, node_id) in iter.with_position() { + match pos { + Position::Only | Position::Last => write!(f, "{node_id}")?, + Position::First | Position::Middle => write!(f, "{node_id}, ")?, } } write!(f, "]") @@ -358,6 +407,8 @@ fn write_nodes_sorted(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std #[cfg(test)] mod test { + use ahash::HashMap; + use super::*; #[test] @@ -428,9 +479,21 @@ mod test { #[test] fn nodeset_display() { - let nodeset1 = NodeSet::from_iter([2, 3, 1, 4, 5]); - assert_eq!(nodeset1.to_string(), "[N2, N3, N1, N4, N5]"); - assert_eq!(format!("{nodeset1:#}"), "[N1, N2, N3, N4, N5]"); + let nodeset = NodeSet::from_iter([2, 3, 1, 4, 5]); + assert_eq!(nodeset.to_string(), "[N2, N3, N1, N4, N5]"); + assert_eq!(format!("{nodeset:#}"), "[N1, N2, N3, N4, N5]"); + + let nodeset = NodeSet::from_iter([2]); + assert_eq!(nodeset.to_string(), "[N2]"); + assert_eq!(format!("{nodeset:#}"), "[N2]"); + + let nodeset = NodeSet::from_iter([2]); + assert_eq!(nodeset.to_string(), "[N2]"); + assert_eq!(format!("{nodeset:#}"), "[N2]"); + + let nodeset = NodeSet::new(); + assert_eq!(nodeset.to_string(), "[]"); + assert_eq!(format!("{nodeset:#}"), "[]"); } #[test] @@ -443,11 +506,38 @@ mod test { #[display("E")] Error, } - let mut nodeset1 = DecoratedNodeSet::::from(NodeSet::from_iter([2, 3, 1, 4, 5])); + let mut nodeset = DecoratedNodeSet::::from(NodeSet::from_iter([2, 3, 1, 4, 5])); + + assert_eq!(format!("{nodeset}"), "[N1(E), N2(E), N3(E), N4(E), N5(E)]"); + nodeset.insert(PlainNodeId::from(5), Status::Sealed); + nodeset.insert(PlainNodeId::from(2), Status::Sealed); + assert_eq!(format!("{nodeset}"), "[N1(E), N2(S), N3(E), N4(E), N5(S)]"); + + let nodeset = DecoratedNodeSet::::from(NodeSet::from_iter([3])); + assert_eq!(format!("{nodeset}"), "[N3(E)]"); + + let nodeset = DecoratedNodeSet::::from(NodeSet::new()); + assert_eq!(format!("{nodeset}"), "[]"); + } - assert_eq!(format!("{nodeset1}"), "[N1(E), N2(E), N3(E), N4(E), N5(E)]"); + #[test] + fn decorated_nodeset_extend() { + #[derive(derive_more::Display, Default, Clone, Copy)] + enum Status { + #[display("S")] + Sealed, + #[default] + #[display("E")] + Error, + } + let mut nodeset1 = DecoratedNodeSet::::from(NodeSet::from_iter([2, 3, 1, 4, 5])); nodeset1.insert(PlainNodeId::from(5), Status::Sealed); - nodeset1.insert(PlainNodeId::from(2), Status::Sealed); - assert_eq!(format!("{nodeset1}"), "[N1(E), N2(S), N3(E), N4(E), N5(S)]"); + assert_eq!(format!("{nodeset1}"), "[N1(E), N2(E), N3(E), N4(E), N5(S)]"); + let mut status_map = HashMap::default(); + status_map.insert(PlainNodeId::from(1), Status::Sealed); + status_map.insert(PlainNodeId::from(4), Status::Sealed); + nodeset1.extend(&status_map); + + assert_eq!(format!("{nodeset1}"), "[N1(S), N2(E), N3(E), N4(S), N5(S)]"); } }