Skip to content

Commit

Permalink
refacto(zfctl): improve display of instance status command
Browse files Browse the repository at this point in the history
The previous version only showed the state of the entire data flow.

This commit introduces the following changes:
- separated the state of an instance from its status in two different
  structures, `InstanceState` and `InstanceStatus`
- renamed the Runtime method `get_status` into `get_instance_status`
- made `zfctl` display, for each Runtime managing a data flow, all the nodes
  that this Runtime manages in addition to the state of the instance

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
Julien Loudet committed Feb 1, 2024
1 parent 6e5bcfe commit c77f1c8
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 52 deletions.
11 changes: 7 additions & 4 deletions zenoh-flow-daemon-2/src/instances/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ impl InstancesQuery {
InstancesQuery::Status(instance_id) => {
if let Err(e) = reply(
query,
runtime.get_status(&instance_id).await.ok_or_else(|| {
anyhow!("Found no data flow with instance id < {} >", instance_id)
}),
runtime
.get_instance_status(&instance_id)
.await
.ok_or_else(|| {
anyhow!("Found no data flow with instance id < {} >", instance_id)
}),
)
.await
{
Expand All @@ -129,7 +132,7 @@ impl InstancesQuery {
}

InstancesQuery::List => {
if let Err(e) = reply(query, Ok(runtime.instances_status().await)).await {
if let Err(e) = reply(query, Ok(runtime.instances_state().await)).await {
tracing::error!("Failed to reply to 'List' query: {:?}", e);
}
}
Expand Down
2 changes: 1 addition & 1 deletion zenoh-flow-daemon-2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

mod instances;
pub use instances::{InstancesQuery, Origin};
pub use zenoh_flow_runtime::InstanceStatus;
pub use zenoh_flow_runtime::{InstanceState, InstanceStatus};

pub mod selectors;

Expand Down
6 changes: 3 additions & 3 deletions zenoh-flow-daemon-2/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind};
use zenoh::prelude::r#async::*;
use zenoh::queryable::Query;
use zenoh_flow_commons::{InstanceId, Result, RuntimeId};
use zenoh_flow_runtime::{InstanceStatus, Runtime};
use zenoh_flow_runtime::{InstanceState, Runtime};

use crate::{selectors::selector_runtimes, validate_query};

Expand All @@ -46,7 +46,7 @@ pub struct RuntimeStatus {
pub operating_system: Option<String>,
pub cpus: usize,
pub ram_total: u64,
pub data_flows_status: HashMap<InstanceId, (Arc<str>, InstanceStatus)>,
pub data_flows_status: HashMap<InstanceId, (Arc<str>, InstanceState)>,
}

impl RuntimesQuery {
Expand All @@ -64,7 +64,7 @@ impl RuntimesQuery {
}

RuntimesQuery::Status => {
let data_flows_status = runtime.instances_status().await;
let data_flows_status = runtime.instances_state().await;

// TODO For better performance, we should initialise this structure once and simply refresh it whenever
// we want to access some data about the machine.
Expand Down
52 changes: 40 additions & 12 deletions zenoh-flow-runtime/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,41 @@
use crate::runners::Runner;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt::Display, ops::Deref};
use zenoh_flow_commons::{NodeId, Result};
use zenoh_flow_commons::{NodeId, Result, RuntimeId};
use zenoh_flow_records::DataFlowRecord;

pub struct DataFlowInstance {
status: InstanceStatus,
state: InstanceState,
pub(crate) record: DataFlowRecord,
pub(crate) runners: HashMap<NodeId, Runner>,
}

#[derive(Clone, Copy, Deserialize, Serialize, Debug)]
pub enum InstanceStatus {
pub enum InstanceState {
Loaded,
Running,
Aborted,
}

impl Display for InstanceStatus {
impl Display for InstanceState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let repr = match self {
InstanceStatus::Loaded => "Loaded",
InstanceStatus::Running => "Running",
InstanceStatus::Aborted => "Aborted",
InstanceState::Loaded => "Loaded",
InstanceState::Running => "Running",
InstanceState::Aborted => "Aborted",
};

write!(f, "{}", repr)
}
}

#[derive(Deserialize, Serialize, Debug)]
pub struct InstanceStatus {
pub runtime_id: RuntimeId,
pub state: InstanceState,
pub nodes: Vec<NodeId>,
}

impl Deref for DataFlowInstance {
type Target = DataFlowRecord;

Expand All @@ -54,7 +61,7 @@ impl Deref for DataFlowInstance {
impl DataFlowInstance {
pub fn new(record: DataFlowRecord) -> Self {
Self {
status: InstanceStatus::Loaded,
state: InstanceState::Loaded,
record,
runners: HashMap::default(),
}
Expand All @@ -66,7 +73,7 @@ impl DataFlowInstance {
tracing::trace!("Started node < {} >", node_id);
}

self.status = InstanceStatus::Running;
self.state = InstanceState::Running;
Ok(())
}

Expand All @@ -76,11 +83,32 @@ impl DataFlowInstance {
tracing::trace!("Aborted node < {} >", node_id);
}

self.status = InstanceStatus::Aborted;
self.state = InstanceState::Aborted;
Ok(())
}

pub fn status(&self) -> &InstanceStatus {
&self.status
pub fn state(&self) -> &InstanceState {
&self.state
}

pub fn status(&self, runtime_id: &RuntimeId) -> InstanceStatus {
InstanceStatus {
runtime_id: runtime_id.clone(),
state: self.state,
nodes: self
.mapping
.get(runtime_id)
.map(|node_ids| {
node_ids
.iter()
.filter(|&node_id| {
!(self.senders.contains_key(node_id)
|| self.receivers.contains_key(node_id))
})
.cloned()
.collect()
})
.unwrap_or_default(),
}
}
}
2 changes: 1 addition & 1 deletion zenoh-flow-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//

mod instance;
pub use instance::{DataFlowInstance, InstanceStatus};
pub use instance::{DataFlowInstance, InstanceState, InstanceStatus};

mod loader;
pub use loader::{Extensions, Loader};
Expand Down
24 changes: 14 additions & 10 deletions zenoh-flow-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

mod load;

use crate::{instance::DataFlowInstance, loader::Loader, InstanceStatus};
use crate::{
instance::{DataFlowInstance, InstanceStatus},
loader::Loader,
InstanceState,
};

use std::{
collections::HashMap,
Expand Down Expand Up @@ -75,19 +79,19 @@ impl Runtime {

/// Returns information regarding the data flows that are running on this Zenoh-Flow runtime.
///
/// For each instance of a data flow, returns its unique identifier, its name and its status.
pub async fn instances_status(&self) -> HashMap<InstanceId, (Arc<str>, InstanceStatus)> {
/// For each instance of a data flow, returns its unique identifier, its name and its state.
pub async fn instances_state(&self) -> HashMap<InstanceId, (Arc<str>, InstanceState)> {
let flows = self.flows.read().await;
let mut status = HashMap::with_capacity(flows.len());
let mut states = HashMap::with_capacity(flows.len());
for (instance_id, instance_lck) in flows.iter() {
let instance = instance_lck.read().await;
status.insert(
states.insert(
instance_id.clone(),
(instance.name().clone(), *instance.status()),
(instance.name().clone(), *instance.state()),
);
}

status
states
}

/// Returns an atomically counted reference over the Zenoh session this Zenoh-Flow runtime leverages.
Expand Down Expand Up @@ -147,9 +151,9 @@ impl Runtime {
/// loaded. In particular, this means that each node has successfully called its constructor.
/// - [Running](InstanceStatus::Running) when the nodes that this runtime manages are running.
/// - [Aborted](InstanceStatus::Aborted) when the nodes were previously running and their execution was aborted.
pub async fn get_status(&self, id: &InstanceId) -> Option<InstanceStatus> {
pub async fn get_instance_status(&self, id: &InstanceId) -> Option<InstanceStatus> {
if let Some(instance) = self.flows.read().await.get(id) {
return Some(*instance.read().await.status());
return Some(instance.read().await.status(&self.runtime_id));
}

None
Expand Down Expand Up @@ -182,7 +186,7 @@ impl Runtime {
pub async fn try_abort_instance(&self, id: &InstanceId) -> Result<()> {
let instance = self.try_get_instance(id).await?;

if !matches!(instance.read().await.status(), &InstanceStatus::Running) {
if !matches!(instance.read().await.state(), &InstanceState::Running) {
return Ok(());
}

Expand Down
43 changes: 31 additions & 12 deletions zfctl/src/instance_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use crate::row;

use super::{parse_key_val, ZENOH_FLOW_INTERNAL_ERROR};

use std::path::PathBuf;

use anyhow::{anyhow, bail};
use clap::Subcommand;
use comfy_table::Table;
use uuid::Uuid;
use zenoh::prelude::r#async::*;
use zenoh_flow_commons::{Result, RuntimeId, Vars};
use zenoh_flow_daemon_2::{selectors, InstancesQuery, Origin};
use zenoh_flow_daemon_2::{selectors, InstanceStatus, InstancesQuery, Origin};
use zenoh_flow_descriptors::{DataFlowDescriptor, FlattenedDataFlowDescriptor};

#[derive(Subcommand)]
Expand Down Expand Up @@ -169,24 +172,40 @@ Caused by:
}
}
InstancesQuery::Status(_) => {
tracing::info!("Runtime | Status");
let mut table = Table::new();
table.set_width(80);
table.set_header(row!("Runtime", "Instance State", "Node"));

while let Ok(response) = reply.recv_async().await {
match response.sample {
Ok(sample) => {
// TODO: Magic number "1"
//
// In the daemon, when processing a status query, the key expression returned has the id
// of the runtime in 2nd position (counts start at 0… hence `nth(1)` to get the 2nd)
let runtime_id = sample.key_expr.chunks().nth(1).unwrap();
tracing::info!(
"{} {}",
runtime_id,
String::from_utf8_lossy(&sample.payload.contiguous())
);
match serde_json::from_slice::<InstanceStatus>(
&sample.value.payload.contiguous(),
) {
Ok(mut status) => {
if let Some(node_id) = status.nodes.pop() {
table.add_row(row!(
status.runtime_id,
status.state,
node_id
));
status.nodes.iter().for_each(|node_id| {
table.add_row(row!("", "", node_id));
});
}
}
Err(e) => tracing::error!(
"Failed to parse 'status' reply from < {} >: {:?}",
response.replier_id,
e
),
}
}
Err(err) => tracing::error!("{:?}", err),
}
}

println!("{table}");
}
_ => {}
}
Expand Down
10 changes: 10 additions & 0 deletions zfctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ If the above error log does not help you troubleshoot the reason, you can contac
- GitHub: https://github.com/eclipse-zenoh/zenoh-flow
"#;

/// Macro to facilitate the creation of a [Row](comfy_table::Row) where its contents are not of the same type.
#[macro_export]
macro_rules! row {
(
$( $cell: expr ),*
) => {
comfy_table::Row::from(vec![ $( &$cell as &dyn std::fmt::Display ),*])
};
}

#[derive(Parser)]
struct Zfctl {
#[command(subcommand)]
Expand Down
11 changes: 2 additions & 9 deletions zfctl/src/runtime_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use super::row;

use std::time::Duration;

use anyhow::{anyhow, bail};
Expand All @@ -24,15 +26,6 @@ use zenoh_flow_daemon_2::{
selectors::selector_runtimes,
};

/// Macro to facilitate the creation of a [Row](comfy_table::Row) where its contents are not of the same type.
macro_rules! row {
(
$( $cell: expr ),*
) => {
comfy_table::Row::from(vec![ $( &$cell as &dyn std::fmt::Display ),*])
};
}

use crate::ZENOH_FLOW_INTERNAL_ERROR;

pub(crate) async fn get_all_runtimes(session: &Session) -> Result<Vec<RuntimeInfo>> {
Expand Down

0 comments on commit c77f1c8

Please sign in to comment.