Skip to content

Commit

Permalink
Compact cluster status
Browse files Browse the repository at this point in the history
Summary:
Print out a compact cluster status by default

Extended view of the cluster status can still be accessed
via `--extra` flag
  • Loading branch information
muhamadazmy committed Feb 12, 2025
1 parent 85d331f commit 3fe3dd3
Show file tree
Hide file tree
Showing 18 changed files with 383 additions and 19 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
last_heartbeat_at: MillisSinceEpoch::now(),
generational_node_id: peer,
partitions: msg.partition_processor_state.unwrap_or_default(),
age: msg.age,
}),
);
}
Expand Down
2 changes: 2 additions & 0 deletions crates/admin/src/cluster_controller/observed_cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ mod tests {
use restate_types::time::MillisSinceEpoch;
use restate_types::{GenerationalNodeId, PlainNodeId, Version};
use std::collections::{BTreeMap, HashMap};
use std::time::Duration;

impl ObservedClusterState {
pub fn remove_node_from_partition(
Expand Down Expand Up @@ -203,6 +204,7 @@ mod tests {
generational_node_id,
last_heartbeat_at: MillisSinceEpoch::now(),
partitions,
age: Duration::default(),
})
}

Expand Down
1 change: 1 addition & 0 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ mod tests {
generational_node_id: node_id,
last_heartbeat_at: MillisSinceEpoch::now(),
partitions,
age: Duration::default(),
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ mod tests {
let state = [(PartitionId::MIN, partition_processor_status)].into();
let response = msg.to_rpc_response(NodeStateResponse {
partition_processor_state: Some(state),
age: Duration::from_secs(100),
});

// We are not really sending something back to target, we just need to provide a known
Expand Down
2 changes: 2 additions & 0 deletions crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ impl TrimMode {
mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;

use crate::cluster_controller::service::state::{TrimMode, TrimPointsUnavailable};
use restate_types::cluster::cluster_state::{
Expand Down Expand Up @@ -901,6 +902,7 @@ mod tests {
generational_node_id,
last_heartbeat_at: MillisSinceEpoch::now(),
partitions,
age: Duration::default(),
})
}

Expand Down
2 changes: 2 additions & 0 deletions crates/node/src/roles/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures::StreamExt;

use restate_core::{
network::{Incoming, MessageRouterBuilder, MessageStream},
task_center::TaskCenterMonitoring,
worker_api::ProcessorsManagerHandle,
ShutdownError, TaskCenter, TaskKind,
};
Expand Down Expand Up @@ -64,6 +65,7 @@ impl BaseRole {
let _ = msg
.to_rpc_response(NodeStateResponse {
partition_processor_state: partition_state,
age: TaskCenter::with_current(|t| t.age()),
})
.try_send();
}
Expand Down
9 changes: 6 additions & 3 deletions crates/types/protobuf/restate/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ message SuspectNode {
}

message AliveNode {
restate.common.NodeId generational_node_id = 1;
restate.common.GenerationalNodeId generational_node_id = 1;
google.protobuf.Timestamp last_heartbeat_at = 2;
// partition id is u16 but protobuf doesn't support u16. This must be a value
// that's safe to convert to u16
map<uint32, PartitionProcessorStatus> partitions = 3;
// age of node since the daemon started in seconds
uint64 age_s = 4;
}

message DeadNode { google.protobuf.Timestamp last_seen_alive = 1; }
Expand Down Expand Up @@ -84,8 +86,9 @@ message BifrostProvider {
// only required if provider = "replicated"
optional ReplicationProperty replication_property = 2;
// only required if provider = "replicated"
// The default target for new nodesets. 0 (default) auto-chooses a nodeset-size that
// balances read and write availability. It's a reasonable default for most cases.
// The default target for new nodesets. 0 (default) auto-chooses a
// nodeset-size that balances read and write availability. It's a reasonable
// default for most cases.
uint32 target_nodeset_size = 3;
}

Expand Down
6 changes: 5 additions & 1 deletion crates/types/src/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::time::Instant;
use std::time::{Duration, Instant};

use prost_dto::IntoProst;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -97,6 +97,10 @@ pub struct AliveNode {
#[prost(required)]
pub generational_node_id: GenerationalNodeId,
pub partitions: BTreeMap<PartitionId, PartitionProcessorStatus>,
// age of daemon in seconds
#[prost(name=age_s)]
#[into_prost(map=Duration::as_secs, map_by_ref)]
pub age: Duration,
}

#[derive(Debug, Clone, IntoProst)]
Expand Down
5 changes: 4 additions & 1 deletion crates/types/src/net/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::{collections::BTreeMap, time::Duration};

use serde::{Deserialize, Serialize};
use serde_with::serde_as;
Expand All @@ -32,4 +32,7 @@ pub struct NodeStateResponse {
/// Partition processor status per partition. Is set to None if this node is not a `Worker` node
#[serde_as(as = "Option<serde_with::Seq<(_, _)>>")]
pub partition_processor_state: Option<BTreeMap<PartitionId, PartitionProcessorStatus>>,

/// age of node
pub age: Duration,
}
2 changes: 2 additions & 0 deletions tools/restatectl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ cling = { workspace = true }
crossterm = { version = "0.27.0" }
ctrlc = { version = "3.4" }
diff = "0.1.13"
enumset = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
json-patch = "2.0.0"
humantime = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
rlimit = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion tools/restatectl/src/commands/log/gen_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct GenerateLogMetadataOpts {
#[clap(long, required = true, value_delimiter=',', num_args = 1..)]
nodeset: Vec<PlainNodeId>,
/// The generational node id of the sequencer node, e.g. N1:1
#[clap(long, short)]
#[clap(long)]
sequencer: GenerationalNodeId,
/// The number of logs
#[clap(long, short)]
Expand Down
2 changes: 1 addition & 1 deletion tools/restatectl/src/commands/log/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct ReconfigureOpts {
#[clap(long, short)]
provider: Option<ProviderKind>,
/// Option segment index to seal. The tail segment is chosen automatically if not provided.
#[clap(long, short)]
#[clap(long)]
segment_index: Option<u32>,
/// The [minimum] expected metadata version
#[clap(long, short, default_value = "1")]
Expand Down
2 changes: 1 addition & 1 deletion tools/restatectl/src/commands/metadata/patch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct PatchValueOpts {
pub patch: String,

/// Expected version for conditional update
#[arg(short = 'e', long)]
#[arg(long)]
pub version: Option<u32>,

/// Preview the change without applying it
Expand Down
2 changes: 1 addition & 1 deletion tools/restatectl/src/commands/metadata/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct PutValueOpts {
doc: FileOrStdin,

/// Expected version for conditional update
#[arg(short = 'e', long)]
#[arg(long)]
version: Option<u32>,

/// Preview the change without applying it
Expand Down
2 changes: 1 addition & 1 deletion tools/restatectl/src/commands/metadata_server/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Resul
Ok(())
}

fn render_metadata_server_status(metadata_server_status: MetadataServerStatus) -> Cell {
pub fn render_metadata_server_status(metadata_server_status: MetadataServerStatus) -> Cell {
match metadata_server_status {
MetadataServerStatus::Unknown => Cell::new("UNKNOWN").fg(Color::Red),
MetadataServerStatus::StartingUp => Cell::new("Starting").fg(Color::Yellow),
Expand Down
3 changes: 1 addition & 2 deletions tools/restatectl/src/commands/partition/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ pub async fn list_partitions(
.generational_node_id
.as_ref()
.expect("alive partition has a node id");
let host_node =
GenerationalNodeId::new(host.id, host.generation.expect("generation"));
let host_node = GenerationalNodeId::from(*host);
let details = PartitionListEntry { host_node, status };
partitions.push((partition_id, details));

Expand Down
Loading

0 comments on commit 3fe3dd3

Please sign in to comment.