Skip to content

Commit

Permalink
feat: simply fuse daemon and runtime commands without any modification
Browse files Browse the repository at this point in the history
  • Loading branch information
Hennzau authored and J-Loudet committed Jan 6, 2025
1 parent bbf8eac commit 05c8fe7
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 192 deletions.
160 changes: 156 additions & 4 deletions zfctl/src/daemon_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,28 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use std::path::PathBuf;
use std::{path::PathBuf, time::Duration};

use anyhow::anyhow;
use async_std::stream::StreamExt;
use clap::{ArgGroup, Subcommand};
use comfy_table::{Row, Table};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook_async_std::Signals;
use zenoh::Session;
use zenoh_flow_commons::{try_parse_from_file, Result, Vars};
use zenoh_flow_daemon::daemon::{Daemon, ZenohFlowConfiguration};
use zenoh_flow_commons::{try_parse_from_file, Result, RuntimeId, Vars};
use zenoh_flow_daemon::{
daemon::{Daemon, ZenohFlowConfiguration},
queries::{selector_runtimes, RuntimeStatus, RuntimesQuery},
};
use zenoh_flow_runtime::Runtime;

use crate::{
row,
utils::{get_all_runtimes, get_runtime_by_name},
ZENOH_FLOW_INTERNAL_ERROR,
};

#[derive(Subcommand)]
pub(crate) enum DaemonCommand {
/// Launch a Zenoh-Flow Daemon.
Expand Down Expand Up @@ -53,10 +64,41 @@ pub(crate) enum DaemonCommand {
#[arg(short = 'z', long, verbatim_doc_comment)]
zenoh_configuration: Option<PathBuf>,
},
/// List all the Zenoh-Flow runtimes reachable on the Zenoh network.
List,
/// Returns the status of the provided Zenoh-Flow runtime.
///
/// The status consists of general information regarding the runtime and the
/// machine it runs on:
/// - the name associated with the Zenoh-Flow runtime,
/// - the number of CPUs the machine running the Zenoh-Flow runtime has,
/// - the total amount of RAM the machine running the Zenoh-Flow runtime has,
/// - for each data flow the Zenoh-Flow runtime manages (partially or not):
/// - its unique identifier,
/// - its name,
/// - its status.
#[command(verbatim_doc_comment)]
#[command(group(
ArgGroup::new("exclusive")
.args(&["runtime_id", "runtime_name"])
.required(true)
.multiple(false)
))]
Status {
/// The unique identifier of the Zenoh-Flow runtime to contact.
#[arg(short = 'i', long = "id")]
runtime_id: Option<RuntimeId>,
/// The name of the Zenoh-Flow runtime to contact.
///
/// Note that if several runtimes share the same name, the first to
/// answer will be selected.
#[arg(short = 'n', long = "name")]
runtime_name: Option<String>,
},
}

impl DaemonCommand {
pub async fn run(self, _session: Session) -> Result<()> {
pub async fn run(self, session: Session) -> Result<()> {
match self {
DaemonCommand::Start {
name,
Expand Down Expand Up @@ -124,6 +166,116 @@ impl DaemonCommand {
})
.await;
}
DaemonCommand::List => {
let runtimes = get_all_runtimes(&session).await;

let mut table = Table::new();
table.set_width(80);
table.set_header(Row::from(vec!["Identifier", "Name"]));
runtimes.iter().for_each(|info| {
table.add_row(Row::from(vec![&info.id.to_string(), info.name.as_ref()]));
});

println!("{table}");
}
DaemonCommand::Status {
runtime_id,
runtime_name,
} => {
let runtime_id = match (runtime_id, runtime_name) {
(Some(id), _) => id,
(None, Some(name)) => get_runtime_by_name(&session, &name).await,
(None, None) => {
// This code is indeed unreachable because:
// (1) The `group` macro has `required = true` which indicates that clap requires an entry for
// any group.
// (2) The `group` macro has `multiple = false` which indicates that only a single entry for
// any group is accepted.
// (3) The `runtime_id` and `runtime_name` fields belong to the same group "runtime".
//
// => A single entry for the group "runtime" is required (and mandatory).
unreachable!()
}
};

let selector = selector_runtimes(&runtime_id);

let value = serde_json::to_vec(&RuntimesQuery::Status).map_err(|e| {
tracing::error!(
"serde_json failed to serialize `RuntimeQuery::Status`: {:?}",
e
);
anyhow!(ZENOH_FLOW_INTERNAL_ERROR)
})?;

let reply = session
.get(selector)
.payload(value)
.timeout(Duration::from_secs(5))
.await
.map_err(|e| {
anyhow!(
"Failed to query Zenoh-Flow runtime < {} >: {:?}",
runtime_id,
e
)
})?;

while let Ok(reply) = reply.recv_async().await {
match reply.result() {
Ok(sample) => {
match serde_json::from_slice::<RuntimeStatus>(
&sample.payload().to_bytes(),
) {
Ok(runtime_status) => {
let mut table = Table::new();
table.set_width(80);
table.add_row(row!("Identifier", runtime_id));
table.add_row(row!("Name", runtime_status.name));
table.add_row(row!(
"Host name",
runtime_status.hostname.unwrap_or_else(|| "N/A".into())
));
table.add_row(row!(
"Operating System",
runtime_status
.operating_system
.unwrap_or_else(|| "N/A".into())
));
table.add_row(row!(
"Arch",
runtime_status.architecture.unwrap_or_else(|| "N/A".into())
));
table.add_row(row!("# CPUs", runtime_status.cpus));
table.add_row(row!(
"# RAM",
bytesize::to_string(runtime_status.ram_total, true)
));
println!("{table}");

table = Table::new();
table.set_width(80);
table.set_header(row!("Instance Uuid", "Name", "Status"));
runtime_status.data_flows_status.iter().for_each(
|(uuid, (name, status))| {
table.add_row(row!(uuid, name, status));
},
);
println!("{table}");
}
Err(e) => {
tracing::error!(
"Failed to parse reply as a `RuntimeStatus`: {:?}",
e
)
}
}
}

Err(e) => tracing::error!("Reply to runtime status failed with: {:?}", e),
}
}
}
}

Ok(())
Expand Down
8 changes: 0 additions & 8 deletions zfctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
mod instance_command;
use instance_command::InstanceCommand;

mod runtime_command;
use runtime_command::RuntimeCommand;

mod daemon_command;
use daemon_command::DaemonCommand;

Expand Down Expand Up @@ -83,10 +80,6 @@ enum Command {
runtime_name: Option<String>,
},

/// To interact with a Zenoh-Flow runtime.
#[command(subcommand)]
Runtime(RuntimeCommand),

/// To interact with a Zenoh-Flow daemon.
#[command(subcommand)]
Daemon(DaemonCommand),
Expand Down Expand Up @@ -154,7 +147,6 @@ async fn main() -> Result<()> {

command.run(session, orchestrator_id).await
}
Command::Runtime(command) => command.run(session).await,
Command::Daemon(command) => command.run(session).await,
Command::RunLocal {
flow,
Expand Down
Loading

0 comments on commit 05c8fe7

Please sign in to comment.