diff --git a/Cargo.lock b/Cargo.lock index c1037ac20e..18df18f116 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7627,7 +7627,9 @@ dependencies = [ "crossterm 0.27.0", "ctrlc", "diff", + "enumset", "futures-util", + "humantime", "itertools 0.14.0", "json-patch", "prost-types", diff --git a/crates/admin/src/cluster_controller/cluster_state_refresher.rs b/crates/admin/src/cluster_controller/cluster_state_refresher.rs index e0e7226c36..ffda502558 100644 --- a/crates/admin/src/cluster_controller/cluster_state_refresher.rs +++ b/crates/admin/src/cluster_controller/cluster_state_refresher.rs @@ -166,6 +166,7 @@ impl ClusterStateRefresher { last_heartbeat_at: MillisSinceEpoch::now(), generational_node_id: peer, partitions: msg.partition_processor_state.unwrap_or_default(), + age: msg.age, }), ); } diff --git a/crates/admin/src/cluster_controller/observed_cluster_state.rs b/crates/admin/src/cluster_controller/observed_cluster_state.rs index 74d18a5929..651d5074d2 100644 --- a/crates/admin/src/cluster_controller/observed_cluster_state.rs +++ b/crates/admin/src/cluster_controller/observed_cluster_state.rs @@ -141,6 +141,7 @@ mod tests { use restate_types::time::MillisSinceEpoch; use restate_types::{GenerationalNodeId, PlainNodeId, Version}; use std::collections::{BTreeMap, HashMap}; + use std::time::Duration; impl ObservedClusterState { pub fn remove_node_from_partition( @@ -203,6 +204,7 @@ mod tests { generational_node_id, last_heartbeat_at: MillisSinceEpoch::now(), partitions, + age: Duration::default(), }) } diff --git a/crates/admin/src/cluster_controller/scheduler.rs b/crates/admin/src/cluster_controller/scheduler.rs index fb21508f25..1ce1bd70db 100644 --- a/crates/admin/src/cluster_controller/scheduler.rs +++ b/crates/admin/src/cluster_controller/scheduler.rs @@ -1080,6 +1080,7 @@ mod tests { generational_node_id: node_id, last_heartbeat_at: MillisSinceEpoch::now(), partitions, + age: Duration::default(), } } diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 721dc51ef0..8b11e708d4 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -884,6 +884,7 @@ mod tests { let state = [(PartitionId::MIN, partition_processor_status)].into(); let response = msg.to_rpc_response(NodeStateResponse { partition_processor_state: Some(state), + age: Duration::from_secs(100), }); // We are not really sending something back to target, we just need to provide a known diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 981cd33086..77675451af 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -539,6 +539,7 @@ impl TrimMode { mod tests { use std::collections::BTreeMap; use std::sync::Arc; + use std::time::Duration; use crate::cluster_controller::service::state::{TrimMode, TrimPointsUnavailable}; use restate_types::cluster::cluster_state::{ @@ -901,6 +902,7 @@ mod tests { generational_node_id, last_heartbeat_at: MillisSinceEpoch::now(), partitions, + age: Duration::default(), }) } diff --git a/crates/node/src/roles/base.rs b/crates/node/src/roles/base.rs index 0fe74b56d8..b62569429f 100644 --- a/crates/node/src/roles/base.rs +++ b/crates/node/src/roles/base.rs @@ -12,6 +12,7 @@ use futures::StreamExt; use restate_core::{ network::{Incoming, MessageRouterBuilder, MessageStream}, + task_center::TaskCenterMonitoring, worker_api::ProcessorsManagerHandle, ShutdownError, TaskCenter, TaskKind, }; @@ -64,6 +65,7 @@ impl BaseRole { let _ = msg .to_rpc_response(NodeStateResponse { partition_processor_state: partition_state, + age: TaskCenter::with_current(|t| t.age()), }) .try_send(); } diff --git a/crates/types/protobuf/restate/cluster.proto b/crates/types/protobuf/restate/cluster.proto index cd72d808d7..fd1237deec 100644 --- a/crates/types/protobuf/restate/cluster.proto +++ b/crates/types/protobuf/restate/cluster.proto @@ -39,11 +39,13 @@ message SuspectNode { } message AliveNode { - restate.common.NodeId generational_node_id = 1; + restate.common.GenerationalNodeId generational_node_id = 1; google.protobuf.Timestamp last_heartbeat_at = 2; // partition id is u16 but protobuf doesn't support u16. This must be a value // that's safe to convert to u16 map partitions = 3; + // age of node since the daemon started in seconds + uint64 age_s = 4; } message DeadNode { google.protobuf.Timestamp last_seen_alive = 1; } @@ -84,8 +86,9 @@ message BifrostProvider { // only required if provider = "replicated" optional ReplicationProperty replication_property = 2; // only required if provider = "replicated" - // The default target for new nodesets. 0 (default) auto-chooses a nodeset-size that - // balances read and write availability. It's a reasonable default for most cases. + // The default target for new nodesets. 0 (default) auto-chooses a + // nodeset-size that balances read and write availability. It's a reasonable + // default for most cases. uint32 target_nodeset_size = 3; } diff --git a/crates/types/src/cluster/cluster_state.rs b/crates/types/src/cluster/cluster_state.rs index b823f0af88..31915e1c5f 100644 --- a/crates/types/src/cluster/cluster_state.rs +++ b/crates/types/src/cluster/cluster_state.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use std::collections::BTreeMap; -use std::time::Instant; +use std::time::{Duration, Instant}; use prost_dto::IntoProst; use serde::{Deserialize, Serialize}; @@ -97,6 +97,10 @@ pub struct AliveNode { #[prost(required)] pub generational_node_id: GenerationalNodeId, pub partitions: BTreeMap, + // age of daemon in seconds + #[prost(name=age_s)] + #[into_prost(map=Duration::as_secs, map_by_ref)] + pub age: Duration, } #[derive(Debug, Clone, IntoProst)] diff --git a/crates/types/src/net/node.rs b/crates/types/src/net/node.rs index 9806adfc65..af99abf209 100644 --- a/crates/types/src/net/node.rs +++ b/crates/types/src/net/node.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::BTreeMap; +use std::{collections::BTreeMap, time::Duration}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -32,4 +32,7 @@ pub struct NodeStateResponse { /// Partition processor status per partition. Is set to None if this node is not a `Worker` node #[serde_as(as = "Option>")] pub partition_processor_state: Option>, + + /// age of node + pub age: Duration, } diff --git a/tools/restatectl/Cargo.toml b/tools/restatectl/Cargo.toml index 0aef6f275a..f88474d5a7 100644 --- a/tools/restatectl/Cargo.toml +++ b/tools/restatectl/Cargo.toml @@ -44,9 +44,11 @@ cling = { workspace = true } crossterm = { version = "0.27.0" } ctrlc = { version = "3.4" } diff = "0.1.13" +enumset = { workspace = true } futures-util = { workspace = true } itertools = { workspace = true } json-patch = "2.0.0" +humantime = { workspace = true } prost-types = { workspace = true } rand = { workspace = true } rlimit = { workspace = true } diff --git a/tools/restatectl/src/commands/metadata_server/status.rs b/tools/restatectl/src/commands/metadata_server/status.rs index caa087b872..eee049d6ed 100644 --- a/tools/restatectl/src/commands/metadata_server/status.rs +++ b/tools/restatectl/src/commands/metadata_server/status.rs @@ -137,7 +137,7 @@ pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Resul Ok(()) } -fn render_metadata_server_status(metadata_server_status: MetadataServerStatus) -> Cell { +pub fn render_metadata_server_status(metadata_server_status: MetadataServerStatus) -> Cell { match metadata_server_status { MetadataServerStatus::Unknown => Cell::new("UNKNOWN").fg(Color::Red), MetadataServerStatus::StartingUp => Cell::new("Starting").fg(Color::Yellow), diff --git a/tools/restatectl/src/commands/partition/list.rs b/tools/restatectl/src/commands/partition/list.rs index adace2682f..010acd3a5a 100644 --- a/tools/restatectl/src/commands/partition/list.rs +++ b/tools/restatectl/src/commands/partition/list.rs @@ -120,8 +120,7 @@ pub async fn list_partitions( .generational_node_id .as_ref() .expect("alive partition has a node id"); - let host_node = - GenerationalNodeId::new(host.id, host.generation.expect("generation")); + let host_node = GenerationalNodeId::from(*host); let details = PartitionListEntry { host_node, status }; partitions.push((partition_id, details)); diff --git a/tools/restatectl/src/commands/status.rs b/tools/restatectl/src/commands/status.rs index 92f02b9a1b..da954ed632 100644 --- a/tools/restatectl/src/commands/status.rs +++ b/tools/restatectl/src/commands/status.rs @@ -8,16 +8,39 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::time::Duration; + +use anyhow::Context; use clap::Parser; use cling::{Collect, Run}; +use enumset::EnumSet; +use itertools::Itertools; +use restate_metadata_server::grpc::metadata_server_svc_client::MetadataServerSvcClient; +use restate_types::health::MetadataServerStatus; +use tonic::codec::CompressionEncoding; +use tonic::{Code, IntoRequest}; +use tracing::error; +use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; +use restate_admin::cluster_controller::protobuf::ClusterStateRequest; +use restate_cli_util::_comfy_table::{Cell, Color, Row, Table}; use restate_cli_util::c_println; +use restate_cli_util::ui::console::StyledTable; +use restate_types::logs::metadata::Logs; +use restate_types::nodes_config::{NodeConfig, Role}; +use restate_types::protobuf::cluster::node_state::State; +use restate_types::protobuf::cluster::{AliveNode, RunMode}; +use restate_types::{GenerationalNodeId, NodeId}; use crate::commands::log::list_logs::{list_logs, ListLogsOpts}; use crate::commands::metadata_server::status::list_metadata_servers; use crate::commands::node::list_nodes::{list_nodes, ListNodesOpts}; use crate::commands::partition::list::{list_partitions, ListPartitionsOpts}; use crate::connection::ConnectionInfo; +use crate::util::grpc_channel; + +use super::log::deserialize_replicated_log_params; +use super::metadata_server::status::render_metadata_server_status; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "cluster_status")] @@ -31,13 +54,13 @@ async fn cluster_status( connection: &ConnectionInfo, status_opts: &ClusterStatusOpts, ) -> anyhow::Result<()> { - list_nodes( - connection, - &ListNodesOpts { - extra: status_opts.extra, - }, - ) - .await?; + if !status_opts.extra { + compact_cluster_status(connection).await?; + + return Ok(()); + } + + list_nodes(connection, &ListNodesOpts { extra: false }).await?; c_println!(); list_logs(connection, &ListLogsOpts {}).await?; @@ -50,3 +73,322 @@ async fn cluster_status( Ok(()) } + +async fn compact_cluster_status(connection: &ConnectionInfo) -> anyhow::Result<()> { + let nodes_config = connection.get_nodes_configuration().await?; + let logs = connection.get_logs().await?; + + let cluster_state = connection + .try_each(Some(Role::Admin), |channel| async { + let mut client = ClusterCtrlSvcClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + client + .get_cluster_state(ClusterStateRequest::default()) + .await + }) + .await? + .into_inner() + .cluster_state + .context("no cluster state returned")?; + + let mut table = Table::new_styled(); + table.set_styled_header(NodeRow::header()); + for (node_id, node_config) in nodes_config.iter().sorted_by_key(|(id, _)| *id) { + let mut row = NodeRow::default(); + row.with_name(node_config.name.clone()) + .with_roles(&node_config.roles); + + let Some(node_state) = cluster_state.nodes.get(&u32::from(node_id)) else { + continue; + }; + + let node_state = node_state.state.as_ref().context("missing node state")?; + + match node_state { + State::Alive(alive) => { + // test + row.with_id( + GenerationalNodeId::from( + alive.generational_node_id.context("node id is missing")?, + ), + Color::Green, + ) + .with_uptime(Duration::from_secs(alive.age_s)); + + alive_node_status(&mut row, alive, node_config, &logs).await?; + } + State::Dead(_dead) => { + row.with_id(node_id, Color::Red); + } + State::Suspect(suspect) => { + row.with_id( + suspect.generational_node_id.context("node id is missing")?, + Color::Yellow, + ); + } + }; + + table.add_row(row); + } + + c_println!("Node Configuration ({})", nodes_config.version()); + c_println!("{}", table); + Ok(()) +} + +async fn alive_node_status( + row: &mut NodeRow, + alive_node: &AliveNode, + node_config: &NodeConfig, + logs: &Logs, +) -> anyhow::Result<()> { + let node_id: GenerationalNodeId = alive_node + .generational_node_id + .context("node id is missing")? + .into(); + + let counter = alive_node.partitions.values().fold( + PartitionCounter::default(), + |mut counter, partition_status| { + let effective = + RunMode::try_from(partition_status.effective_mode).expect("valid effective mode"); + let planned = + RunMode::try_from(partition_status.planned_mode).expect("valid planned mode"); + + match (effective, planned) { + (RunMode::Leader, RunMode::Leader) => counter.leaders += 1, + (RunMode::Follower, RunMode::Follower) => counter.followers += 1, + (RunMode::Leader, RunMode::Follower) => counter.downgrading += 1, + (RunMode::Follower, RunMode::Leader) => counter.upgrading += 1, + (_, _) => { + // unknown state! + } + }; + + counter + }, + ); + + row.with_partitions(counter); + + let mut counter = LogsCounter::default(); + + for (log_id, chain) in logs.iter() { + let tail = chain.tail(); + let Some(replicated_loglet_params) = deserialize_replicated_log_params(&tail) else { + continue; + }; + + if replicated_loglet_params.nodeset.contains(node_id) { + counter.nodesets += 1; + } + + if replicated_loglet_params.sequencer != node_id { + // sequencer fot this log is not running here + continue; + } + + // check if we also running the partition + counter.sequencers += 1; + let Some(partition_status) = alive_node.partitions.get(&u32::from(*log_id)) else { + // partition is not running here + counter.optimal_placement = false; + continue; + }; + + let effective = + RunMode::try_from(partition_status.effective_mode).expect("valid effective mode"); + let planned = RunMode::try_from(partition_status.planned_mode).expect("valid planned mode"); + + // or partition is not (or not becoming) the leader + if effective != RunMode::Leader && planned != RunMode::Leader { + counter.optimal_placement = false; + } + } + + row.with_logs(counter); + + if node_config.has_role(Role::MetadataServer) { + let metadata_channel = grpc_channel(node_config.address.clone()); + let mut metadata_client = MetadataServerSvcClient::new(metadata_channel) + .accept_compressed(CompressionEncoding::Gzip); + + let metadata_store_status = metadata_client.status(().into_request()).await; + + let metadata_cell = match metadata_store_status { + Ok(response) => { + let status = MetadataServerStatus::try_from(response.into_inner().status) + .unwrap_or_default(); + render_metadata_server_status(status) + } + Err(err) if err.code() == Code::Unimplemented => Cell::new("Local").fg(Color::Yellow), + Err(err) => { + error!( + "failed to get metadata status from node {}: {}", + node_config.current_generation, err, + ); + Cell::new("ERR!").fg(Color::Red) + } + }; + + row.with_metadata(metadata_cell); + } + + Ok(()) +} + +#[derive(Default)] +struct PartitionCounter { + leaders: u16, + followers: u16, + upgrading: u16, + downgrading: u16, +} + +impl PartitionCounter { + fn leaders(&self) -> String { + use std::fmt::Write; + + let mut buf = String::with_capacity(5); + write!(buf, "{}", self.leaders).expect("must succeed"); + if self.upgrading > 0 { + write!(buf, "+{}", self.upgrading).expect("must succeed"); + } + + buf + } + + fn followers(&self) -> String { + use std::fmt::Write; + + let mut buf = String::with_capacity(5); + write!(buf, "{}", self.followers).expect("must succeed"); + if self.downgrading > 0 { + write!(buf, "+{}", self.downgrading).expect("must succeed"); + } + + buf + } +} + +struct LogsCounter { + sequencers: usize, + nodesets: usize, + optimal_placement: bool, +} + +impl Default for LogsCounter { + fn default() -> Self { + Self { + sequencers: 0, + nodesets: 0, + optimal_placement: true, + } + } +} + +#[derive(Default)] +struct NodeRow { + id: Option, + name: Option, + uptime: Option, + roles: Option, + metadata: Option, + leaders: Option, + followers: Option, + nodesets: Option, + sequencers: Option, +} + +impl NodeRow { + fn header() -> Vec<&'static str> { + vec![ + "NODE-ID", + "NAME", + "UPTIME", + "METADATA", + "LEADERS", + "FOLLOWERS", + "NODESETS", + "SEQUENCERS", + "ROLES", + ] + } + + fn with_id>(&mut self, node_id: I, color: Color) -> &mut Self { + self.id = Some(Cell::new(node_id.into()).fg(color)); + self + } + + fn with_uptime(&mut self, uptime: Duration) -> &mut Self { + let uptime = humantime::Duration::from(uptime); + self.uptime = Some(Cell::new(format!("{}", uptime))); + self + } + + fn with_name(&mut self, name: String) -> &mut Self { + self.name = Some(Cell::new(name)); + self + } + + fn with_roles(&mut self, roles: &EnumSet) -> &mut Self { + self.roles = Some(Cell::new( + roles.iter().map(|r| r.to_string()).sorted().join(" | "), + )); + self + } + + fn with_partitions(&mut self, counter: PartitionCounter) -> &mut Self { + self.leaders = Some(Cell::new(counter.leaders())); + self.followers = Some(Cell::new(counter.followers())); + self + } + + fn with_logs(&mut self, counter: LogsCounter) -> &mut Self { + self.nodesets = Some(Cell::new(counter.nodesets)); + + let mut cell = Cell::new(counter.sequencers).fg(Color::Yellow); + if counter.optimal_placement { + cell = cell.fg(Color::Green); + } + self.sequencers = Some(cell); + self + } + + fn with_metadata(&mut self, metadata: Cell) -> &mut Self { + self.metadata = Some(metadata); + self + } +} + +// helper macro to unwrap cells +macro_rules! unwrap { + ($cell:expr) => { + $cell.unwrap_or_else(|| Cell::new("")) + }; + ($cell:expr, $default:expr) => { + $cell.unwrap_or_else(|| Cell::new($default)) + }; + ($cell:expr, $default:expr => $color:expr) => { + $cell.unwrap_or_else(|| Cell::new($default).fg($color)) + }; +} + +impl From for Row { + fn from(value: NodeRow) -> Self { + let row = vec![ + value.id.expect("must be set"), + unwrap!(value.name), + unwrap!(value.uptime, "offline" => Color::Red), + unwrap!(value.metadata), + unwrap!(value.leaders), + unwrap!(value.followers), + unwrap!(value.nodesets), + unwrap!(value.sequencers), + unwrap!(value.roles), + ]; + row.into() + } +}