Skip to content

Commit

Permalink
DecoratedNodeSet Utility to display decorated nodesets
Browse files Browse the repository at this point in the history
The PR also uses it in CheckSeal. This utility `DecoratedNodeSet` can be used when you want to add a nugget of information next to every node. It prints the nodeset sorted as well.
```
// intentionally empty
```
  • Loading branch information
AhmedSoliman committed Feb 6, 2025
1 parent 00fdd95 commit adad40e
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ restate-rocksdb = { workspace = true }
restate-test-util = { workspace = true, optional = true }
restate-types = { workspace = true }

ahash = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::sync::Arc;

use tracing::{debug, info, instrument};
use tracing::{debug, instrument, warn};

use restate_core::metadata_store::retry_on_retryable_error;
use restate_core::{Metadata, MetadataKind};
Expand Down Expand Up @@ -158,7 +158,7 @@ impl<'a> BifrostAdmin<'a> {
self.inner.writeable_loglet(log_id).await
}

#[instrument(level = "debug", skip(self), err)]
#[instrument(level = "debug", skip(self))]
pub async fn seal(&self, log_id: LogId, segment_index: SegmentIndex) -> Result<SealedSegment> {
self.inner.fail_if_shutting_down()?;
// first find the tail segment for this log.
Expand All @@ -177,7 +177,7 @@ impl<'a> BifrostAdmin<'a> {
match err {
crate::loglet::OperationError::Shutdown(err) => return Err(err.into()),
crate::loglet::OperationError::Other(err) => {
info!(
warn!(
log_id = %log_id,
segment = %segment_index,
%err,
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/replicated_loglet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use restate_types::replication::DecoratedNodeSet;

use crate::loglet::OperationError;

#[derive(Default, derive_more::Display, derive_more::Debug)]
#[derive(Default, derive_more::Display, derive_more::Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub enum NodeSealStatus {
#[display("E")]
Error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::fmt::Display;
use std::hash::{Hash, Hasher};

use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use itertools::Itertools;
use restate_types::replication::DecoratedNodeSet;
use tracing::warn;

use restate_types::locality::{LocationScope, NodeLocation};
Expand Down Expand Up @@ -734,6 +736,22 @@ impl<Attr: Eq + Hash> LocationScopeState<Attr> {
}
}

impl<'a, Attr> IntoIterator for &'a NodeSetChecker<Attr> {
type Item = (&'a PlainNodeId, &'a Attr);

type IntoIter = <&'a HashMap<PlainNodeId, Attr> as IntoIterator>::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.node_to_attr.iter()
}
}

impl<Attr> From<NodeSetChecker<Attr>> for DecoratedNodeSet<Attr> {
fn from(val: NodeSetChecker<Attr>) -> DecoratedNodeSet<Attr> {
DecoratedNodeSet::from_iter(val.node_to_attr)
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand Down
21 changes: 12 additions & 9 deletions crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl SealTask {
.nodeset
.to_effective(&networking.metadata().nodes_config_ref());

let mut nodeset_checker = NodeSetChecker::<bool>::new(
let mut nodeset_checker = NodeSetChecker::<NodeSealStatus>::new(
&effective_nodeset,
&networking.metadata().nodes_config_ref(),
&my_params.replication,
Expand Down Expand Up @@ -116,8 +116,6 @@ impl SealTask {
});
}

let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset.clone());

// Max observed local-tail from sealed nodes
let mut max_local_tail = LogletOffset::OLDEST;
while let Some(response) = inflight_requests.join_next().await {
Expand All @@ -127,20 +125,23 @@ impl SealTask {
};
let Ok(response) = response else {
// Seal failed/aborted on this node.
nodeset_status.insert(node_id, NodeSealStatus::Error);
nodeset_checker.set_attribute(node_id, NodeSealStatus::Error);
continue;
};

max_local_tail = max_local_tail.max(response.header.local_tail);
known_global_tail.notify_offset_update(response.header.known_global_tail);
nodeset_checker.set_attribute(node_id, true);
nodeset_status.insert(node_id, NodeSealStatus::Sealed);

if nodeset_checker.check_fmajority(|attr| *attr).passed() {
nodeset_checker.set_attribute(node_id, NodeSealStatus::Sealed);

if nodeset_checker
.check_fmajority(|attr| *attr == NodeSealStatus::Sealed)
.passed()
{
let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset);
nodeset_status.extend(&nodeset_checker);
info!(
loglet_id = %my_params.loglet_id,
replication = %my_params.replication,
%effective_nodeset,
%max_local_tail,
global_tail = %known_global_tail.latest_offset(),
"Seal task completed on f-majority of nodes in {:?}. Nodeset status {}",
Expand All @@ -152,6 +153,8 @@ impl SealTask {
}
}

let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset);
nodeset_status.extend(&nodeset_checker);
// no more tasks left. This means that we failed to seal
Err(ReplicatedLogletError::SealFailed(nodeset_status))
}
Expand Down
160 changes: 125 additions & 35 deletions crates/types/src/replication/nodeset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::Display;
use std::hash::{BuildHasherDefault, Hash, Hasher};
Expand All @@ -18,7 +19,7 @@ use itertools::Itertools;
use rand::distr::Uniform;
use rand::prelude::*;

use crate::PlainNodeId;
use crate::{Merge, PlainNodeId};

// Why? Over 50% faster in iteration than HashSet and ~40% faster than default RandomState for
// contains() and set intersection operations. Additionally, it's 300% faster when created from
Expand Down Expand Up @@ -276,8 +277,8 @@ impl std::fmt::Display for NodeSet {
/// The alternate format displays a *sorted* list of short-form plain node ids, suitable for human-friendly output.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match f.alternate() {
false => write_nodes(self, f),
true => write_nodes_sorted(self, f),
false => write_nodes(self.0.iter(), f),
true => write_nodes(self.0.iter().sorted(), f),
}
}
}
Expand All @@ -302,62 +303,112 @@ impl<V: Default> From<NodeSet> for DecoratedNodeSet<V> {
}
}

impl<V> DecoratedNodeSet<V> {
pub fn merge(&mut self, node_id: impl Into<PlainNodeId>, other: V) -> bool
where
V: Merge,
{
let node_id = node_id.into();
match self.0.entry(node_id) {
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(other);
true
}
Entry::Occupied(occupied_entry) => occupied_entry.into_mut().merge(other),
}
}

pub fn merge_with_iter<'a>(
&mut self,
values: impl IntoIterator<Item = (&'a PlainNodeId, &'a V)>,
) where
V: Merge + Clone,
V: 'a,
{
values.into_iter().for_each(|(node_id, v)| {
self.0
.entry(*node_id)
.and_modify(|existing| {
existing.merge(v.clone());
})
.or_insert(v.clone());
});
}
}

impl<V: Default> FromIterator<PlainNodeId> for DecoratedNodeSet<V> {
fn from_iter<T: IntoIterator<Item = PlainNodeId>>(iter: T) -> Self {
Self(iter.into_iter().map(|n| (n, Default::default())).collect())
}
}

impl<V> FromIterator<(PlainNodeId, V)> for DecoratedNodeSet<V> {
fn from_iter<T: IntoIterator<Item = (PlainNodeId, V)>>(iter: T) -> Self {
Self(iter.into_iter().collect())
}
}

impl<V: Display> Display for DecoratedNodeSet<V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
for (pos, (node_id, v)) in self.0.iter().with_position() {
write!(f, "{node_id}({v})")?;
if pos != itertools::Position::Last {
write!(f, ", ")?;
}
}
write!(f, "]")
write_nodes_decorated_display(self.0.iter(), f)
}
}

impl<V: std::fmt::Debug> std::fmt::Debug for DecoratedNodeSet<V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
for (pos, (node_id, v)) in self.0.iter().with_position() {
write!(f, "{node_id}({v:?})")?;
if pos != itertools::Position::Last {
write!(f, ", ")?;
}
write_nodes_decorated_debug(self.0.iter(), f)
}
}

fn write_nodes_decorated_display<'a, V: std::fmt::Display + 'a>(
iter: impl Iterator<Item = (&'a PlainNodeId, &'a V)>,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
use itertools::Position;
write!(f, "[")?;
for (pos, (node_id, v)) in iter.with_position() {
match pos {
Position::Only | Position::Last => write!(f, "{node_id}({v})")?,
Position::First | Position::Middle => write!(f, "{node_id}({v}), ")?,
}
write!(f, "]")
}
write!(f, "]")
}

fn write_nodes(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn write_nodes_decorated_debug<'a, V: std::fmt::Debug + 'a>(
iter: impl Iterator<Item = (&'a PlainNodeId, &'a V)>,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
use itertools::Position;
write!(f, "[")?;
for (pos, node_id) in nodeset.0.iter().with_position() {
write!(f, "{node_id}")?;
if pos != itertools::Position::Last {
write!(f, ", ")?;
for (pos, (node_id, v)) in iter.with_position() {
match pos {
Position::Only | Position::Last => write!(f, "{node_id}({v:?})")?,
Position::First | Position::Middle => write!(f, "{node_id}({v:?}), ")?,
}
}
write!(f, "]")
}

fn write_nodes_sorted(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn write_nodes<'a>(
iter: impl Iterator<Item = &'a PlainNodeId>,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
use itertools::Position;
write!(f, "[")?;
for (pos, node_id) in nodeset.0.iter().sorted().with_position() {
write!(f, "{node_id}")?;
if pos != itertools::Position::Last {
write!(f, ", ")?;
for (pos, node_id) in iter.with_position() {
match pos {
Position::Only | Position::Last => write!(f, "{node_id}")?,
Position::First | Position::Middle => write!(f, "{node_id}, ")?,
}
}
write!(f, "]")
}

#[cfg(test)]
mod test {
use ahash::HashMap;

use super::*;

#[test]
Expand Down Expand Up @@ -428,9 +479,21 @@ mod test {

#[test]
fn nodeset_display() {
let nodeset1 = NodeSet::from_iter([2, 3, 1, 4, 5]);
assert_eq!(nodeset1.to_string(), "[N2, N3, N1, N4, N5]");
assert_eq!(format!("{nodeset1:#}"), "[N1, N2, N3, N4, N5]");
let nodeset = NodeSet::from_iter([2, 3, 1, 4, 5]);
assert_eq!(nodeset.to_string(), "[N2, N3, N1, N4, N5]");
assert_eq!(format!("{nodeset:#}"), "[N1, N2, N3, N4, N5]");

let nodeset = NodeSet::from_iter([2]);
assert_eq!(nodeset.to_string(), "[N2]");
assert_eq!(format!("{nodeset:#}"), "[N2]");

let nodeset = NodeSet::from_iter([2]);
assert_eq!(nodeset.to_string(), "[N2]");
assert_eq!(format!("{nodeset:#}"), "[N2]");

let nodeset = NodeSet::new();
assert_eq!(nodeset.to_string(), "[]");
assert_eq!(format!("{nodeset:#}"), "[]");
}

#[test]
Expand All @@ -443,11 +506,38 @@ mod test {
#[display("E")]
Error,
}
let mut nodeset1 = DecoratedNodeSet::<Status>::from(NodeSet::from_iter([2, 3, 1, 4, 5]));
let mut nodeset = DecoratedNodeSet::<Status>::from(NodeSet::from_iter([2, 3, 1, 4, 5]));

assert_eq!(format!("{nodeset}"), "[N1(E), N2(E), N3(E), N4(E), N5(E)]");
nodeset.insert(PlainNodeId::from(5), Status::Sealed);
nodeset.insert(PlainNodeId::from(2), Status::Sealed);
assert_eq!(format!("{nodeset}"), "[N1(E), N2(S), N3(E), N4(E), N5(S)]");

let nodeset = DecoratedNodeSet::<Status>::from(NodeSet::from_iter([3]));
assert_eq!(format!("{nodeset}"), "[N3(E)]");

let nodeset = DecoratedNodeSet::<Status>::from(NodeSet::new());
assert_eq!(format!("{nodeset}"), "[]");
}

assert_eq!(format!("{nodeset1}"), "[N1(E), N2(E), N3(E), N4(E), N5(E)]");
#[test]
fn decorated_nodeset_extend() {
#[derive(derive_more::Display, Default, Clone, Copy)]
enum Status {
#[display("S")]
Sealed,
#[default]
#[display("E")]
Error,
}
let mut nodeset1 = DecoratedNodeSet::<Status>::from(NodeSet::from_iter([2, 3, 1, 4, 5]));
nodeset1.insert(PlainNodeId::from(5), Status::Sealed);
nodeset1.insert(PlainNodeId::from(2), Status::Sealed);
assert_eq!(format!("{nodeset1}"), "[N1(E), N2(S), N3(E), N4(E), N5(S)]");
assert_eq!(format!("{nodeset1}"), "[N1(E), N2(E), N3(E), N4(E), N5(S)]");
let mut status_map = HashMap::default();
status_map.insert(PlainNodeId::from(1), Status::Sealed);
status_map.insert(PlainNodeId::from(4), Status::Sealed);
nodeset1.extend(&status_map);

assert_eq!(format!("{nodeset1}"), "[N1(S), N2(E), N3(E), N4(S), N5(S)]");
}
}

0 comments on commit adad40e

Please sign in to comment.