From 22b74b3395123cf1e2920f2db423c05bc4332c97 Mon Sep 17 00:00:00 2001 From: J-Loudet Date: Wed, 29 Jan 2025 10:38:00 +0100 Subject: [PATCH] refactor(zfctl): improve logs (#266) This commit brings some improvements to the outputted logs when launching `zfctl` and the `zfctl daemon` command. The most notable additions are: - when starting a Zenoh-Flow Daemon, its name and Zenoh ID will be logged, - the `RUST_LOG` environment variable will be used if specified, - the default logging level was set to `zfctl=info,zenoh_flow=info`. * zfctl/src/daemon_command.rs: - Added a `span` at the top of the `run` method to be used within the `async` block. - Reworked the code to not `panic!` when an error occurred but instead return it and/or add logs. * zfctl/src/instance_command.rs: - Properly display the id of the newly created instance. - Display a command to check for the status of a newly created instance. * zfctl/src/main.rs: - Read the environment variable RUST_LOG to control the logs. - Set the default logging to `zfctl=info,zenoh_flow=info`. - Instrument the async block using a span. * zfctl/src/run_local_command.rs: - Reworked the code to not panic when an error occurred but instead return it and/or add logs. * zfctl/src/utils.rs: - Reworked the code to not panic when an error occurred but instead return it and/or add logs. Signed-off-by: Julien Loudet --- zfctl/src/daemon_command.rs | 130 +++++++++++++++++++++++---------- zfctl/src/instance_command.rs | 18 +++-- zfctl/src/main.rs | 102 +++++++++++++++++--------- zfctl/src/run_local_command.rs | 29 +++----- zfctl/src/utils.rs | 62 ++++++++++------ 5 files changed, 220 insertions(+), 121 deletions(-) diff --git a/zfctl/src/daemon_command.rs b/zfctl/src/daemon_command.rs index 1d3174d7..e42f334e 100644 --- a/zfctl/src/daemon_command.rs +++ b/zfctl/src/daemon_command.rs @@ -20,6 +20,7 @@ use clap::{ArgGroup, Subcommand}; use comfy_table::{Row, Table}; use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM}; use signal_hook_async_std::Signals; +use tracing::Instrument; use zenoh::Session; use zenoh_flow_commons::{try_parse_from_file, Result, RuntimeId, Vars}; use zenoh_flow_daemon::{ @@ -91,60 +92,109 @@ pub(crate) enum DaemonCommand { impl DaemonCommand { pub async fn run(self, session: Session) -> Result<()> { + let span = tracing::info_span!(target: "zfctl", "daemon", name = tracing::field::Empty, zid = tracing::field::Empty); + let _guard = span.enter(); + match self { DaemonCommand::Start { name, configuration, } => { - let daemon = match configuration { - Some(path) => { - let (zenoh_flow_configuration, _) = - try_parse_from_file::(&path, Vars::default()) - .unwrap_or_else(|e| { - panic!( - "Failed to parse a Zenoh-Flow Configuration from < {} \ - >:\n{e:?}", - path.display() - ) - }); + let (daemon, daemon_name, zid) = + match configuration { + Some(path) => { + let zenoh_flow_configuration = match try_parse_from_file::< + ZenohFlowConfiguration, + >( + &path, Vars::default() + ) { + Ok((configuration, _)) => configuration, + Err(e) => { + anyhow::bail!( + "Failed to parse Zenoh-Flow configuration from < {} >: {e:?}", + path.display() + ); + } + }; - Daemon::spawn_from_config(session, zenoh_flow_configuration) - .await - .expect("Failed to spawn the Zenoh-Flow Daemon") - } - None => Daemon::spawn( - Runtime::builder(name.unwrap()) - .session(session) - .build() - .await - .expect("Failed to build the Zenoh-Flow Runtime"), - ) - .await - .expect("Failed to spawn the Zenoh-Flow Daemon"), - }; + let daemon_name = zenoh_flow_configuration.name.clone(); + let zid = session.zid(); + let daemon = + match Daemon::spawn_from_config(session, zenoh_flow_configuration) + .await + { + Ok(daemon) => daemon, + Err(e) => anyhow::bail!( + "Failed to spawn Zenoh-Flow Daemon < {daemon_name} >: {e:?}", + ), + }; - async_std::task::spawn(async move { - let mut signals = Signals::new([SIGTERM, SIGINT, SIGQUIT]) - .expect("Failed to create SignalsInfo for: [SIGTERM, SIGINT, SIGQUIT]"); + (daemon, daemon_name, zid) + } + None => { + // NOTE: We can safely unwrap here: in order to start a Zenoh-Flow Daemon, + // either a name or a configuration must be provided. + // + // If that branch gets executed it means that a name was necessarily + // provided (`clap` would complain otherwise). + let daemon_name = name.unwrap(); + let zid = session.zid(); + let runtime = match Runtime::builder(&daemon_name) + .session(session) + .build() + .await + { + Ok(runtime) => runtime, + Err(e) => anyhow::bail!( + "Failed to build Zenoh-Flow Runtime for < {daemon_name} >: {e:?}" + ), + }; - while let Some(signal) = signals.next().await { - match signal { - SIGTERM | SIGINT | SIGQUIT => { - tracing::info!("Received termination signal, shutting down."); - daemon.stop().await; - break; - } + let daemon = match Daemon::spawn(runtime).await { + Ok(daemon) => daemon, + Err(e) => { + anyhow::bail!( + "Failed to spawn Zenoh-Flow Daemon < {daemon_name} >: {e:?}" + ) + } + }; + + (daemon, daemon_name, zid) + } + }; + + span.record("name", daemon_name); + span.record("zid", format!("{zid}")); + + tracing::info!("Started Zenoh-Flow Daemon"); - signal => { - tracing::warn!("Ignoring signal ({signal})"); + let current_span = span.clone(); + + async_std::task::spawn( + async move { + let mut signals = Signals::new([SIGTERM, SIGINT, SIGQUIT]) + .expect("Failed to create SignalsInfo for: [SIGTERM, SIGINT, SIGQUIT]"); + + while let Some(signal) = signals.next().await { + match signal { + SIGTERM | SIGINT | SIGQUIT => { + tracing::info!("Received termination signal"); + daemon.stop().await; + break; + } + + signal => { + tracing::warn!("Ignoring signal ({signal})"); + } } } } - }) + .instrument(current_span), + ) .await; } DaemonCommand::List => { - let runtimes = get_all_runtimes(&session).await; + let runtimes = get_all_runtimes(&session).await?; let mut table = Table::new(); table.set_width(80); @@ -161,7 +211,7 @@ impl DaemonCommand { } => { let runtime_id = match (daemon_id, daemon_name) { (Some(id), _) => id, - (None, Some(name)) => get_runtime_by_name(&session, &name).await, + (None, Some(name)) => get_runtime_by_name(&session, &name).await?, (None, None) => { // This code is indeed unreachable because: // (1) The `group` macro has `required = true` which indicates that clap requires an entry for diff --git a/zfctl/src/instance_command.rs b/zfctl/src/instance_command.rs index 329afff9..bb4e4ad0 100644 --- a/zfctl/src/instance_command.rs +++ b/zfctl/src/instance_command.rs @@ -154,13 +154,17 @@ Caused by: }; match sample.result() { - Ok(sample) => { - tracing::info!( - "If successful, the instance will have the id: {:?}", - sample.payload().try_to_string() - ); - println!("{:?}", sample.payload().try_to_string()); - } + Ok(sample) => match sample.payload().try_to_string() { + Ok(instance_id) => { + tracing::info!("Instance: {instance_id}"); + tracing::info!( + "To check its status:\n\tzfctl instance status {instance_id}", + ); + } + Err(e) => { + tracing::error!("Failed to parse Instance ID: {e:?}"); + } + }, Err(err) => tracing::error!("Failed to create instance: {:?}", err), } } diff --git a/zfctl/src/main.rs b/zfctl/src/main.rs index a15628f6..01fdf3dd 100644 --- a/zfctl/src/main.rs +++ b/zfctl/src/main.rs @@ -20,14 +20,15 @@ use daemon_command::DaemonCommand; mod run_local_command; use run_local_command::RunLocalCommand; +use tracing::Instrument; +use tracing_subscriber::EnvFilter; mod utils; use std::path::PathBuf; -use anyhow::anyhow; use clap::{ArgGroup, Parser, Subcommand}; use utils::{get_random_runtime, get_runtime_by_name}; -use zenoh_flow_commons::{Result, RuntimeId}; +use zenoh_flow_commons::RuntimeId; const ZENOH_FLOW_INTERNAL_ERROR: &str = r#" `zfctl` encountered a fatal internal error. @@ -97,44 +98,79 @@ enum Command { } #[async_std::main] -async fn main() -> Result<()> { - // TODO Configure tracing such that: - // - if the environment variable RUST_LOG is set, it is applied, - // let a = std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV); - // - otherwise, provide a default that will only log INFO or above messages, for zfctl only. - let _ = tracing_subscriber::fmt::try_init(); +async fn main() { + let env_filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("zfctl=info,zenoh_flow=info")); + let subscriber = tracing_subscriber::fmt() + .with_env_filter(env_filter) + .with_target(false) + .finish(); + let _ = tracing::subscriber::set_global_default(subscriber); + + let span = tracing::info_span!("zfctl"); + let _guard = span.enter(); let zfctl = Zfctl::parse(); let zenoh_config = match zfctl.zenoh_configuration { - Some(path) => zenoh::Config::from_file(path.clone()).map_err(|e| { - anyhow!( - "Failed to parse the Zenoh configuration from < {} >:\n{e:?}", - path.display() - ) - })?, + Some(path) => match zenoh::Config::from_file(path.clone()) { + Ok(path) => path, + Err(e) => { + tracing::error!( + "Failed to parse Zenoh configuration from < {} >: {e:?}", + path.display() + ); + return; + } + }, None => zenoh::Config::default(), }; - let session = zenoh::open(zenoh_config) - .await - .map_err(|e| anyhow!("Failed to open Zenoh session:\n{:?}", e))?; - - match zfctl.command { - Command::Instance { - command, - daemon_id, - daemon_name, - } => { - let orchestrator_id = match (daemon_id, daemon_name) { - (Some(id), _) => id, - (None, Some(name)) => get_runtime_by_name(&session, &name).await, - (None, None) => get_random_runtime(&session).await, - }; - - command.run(session, orchestrator_id).await + async { + let session = match zenoh::open(zenoh_config).await { + Ok(session) => session, + Err(e) => { + tracing::error!("Failed to open Zenoh session: {e:?}"); + return; + } + }; + + tracing::info!("Using ZID: {}", session.zid()); + + let result = match zfctl.command { + Command::Instance { + command, + daemon_id, + daemon_name, + } => { + let orchestrator_id = match (daemon_id, daemon_name) { + (Some(id), _) => id, + (None, Some(name)) => match get_runtime_by_name(&session, &name).await { + Ok(id) => id, + Err(e) => { + tracing::error!("{e:?}"); + return; + } + }, + (None, None) => match get_random_runtime(&session).await { + Ok(id) => id, + Err(e) => { + tracing::error!("{e:?}"); + return; + } + }, + }; + + command.run(session, orchestrator_id).await + } + Command::Daemon(command) => command.run(session).await, + Command::RunLocal(command) => command.run(session).await, + }; + + if let Err(e) = result { + tracing::error!("{e:?}"); } - Command::Daemon(command) => command.run(session).await, - Command::RunLocal(command) => command.run(session).await, } + .in_current_span() + .await; } diff --git a/zfctl/src/run_local_command.rs b/zfctl/src/run_local_command.rs index 78e2d26b..862f1eb2 100644 --- a/zfctl/src/run_local_command.rs +++ b/zfctl/src/run_local_command.rs @@ -36,8 +36,7 @@ impl RunLocalCommand { .context(format!( "Failed to load Loader configuration from < {} >", &extensions_path.display() - )) - .unwrap(); + ))?; extensions } @@ -56,42 +55,38 @@ impl RunLocalCommand { .context(format!( "Failed to load data flow descriptor from < {} >", &self.flow.display() - )) - .unwrap(); + ))?; - let flattened_flow = FlattenedDataFlowDescriptor::try_flatten(data_flow, vars) - .context(format!( + let flattened_flow = + FlattenedDataFlowDescriptor::try_flatten(data_flow, vars).context(format!( "Failed to flattened data flow extracted from < {} >", &self.flow.display() - )) - .unwrap(); + ))?; let runtime_builder = Runtime::builder("zenoh-flow-standalone-runtime") + .session(session) .add_extensions(extensions) - .expect("Failed to add extensions") - .session(session); + .context("Failed to add extensions")?; let runtime = runtime_builder .build() .await - .expect("Failed to build the Zenoh-Flow runtime"); + .context("Failed to build the Zenoh-Flow runtime")?; let record = DataFlowRecord::try_new(&flattened_flow, runtime.id()) - .context("Failed to create a Record from the flattened data flow descriptor") - .unwrap(); + .context("Failed to create a Record from the flattened data flow descriptor")?; let instance_id = record.instance_id().clone(); let record_name = record.name().clone(); runtime .try_load_data_flow(record) .await - .context("Failed to load Record") - .unwrap(); + .context("Failed to load Record")?; runtime .try_start_instance(&instance_id) .await - .unwrap_or_else(|e| panic!("Failed to start data flow < {} >: {:?}", &instance_id, e)); + .context(format!("Failed to start data flow < {} >", &instance_id))?; let mut stdin = async_std::io::stdin(); let mut input = [0_u8]; @@ -113,7 +108,7 @@ impl RunLocalCommand { runtime .try_delete_instance(&instance_id) .await - .unwrap_or_else(|e| panic!("Failed to delete data flow < {} >: {:?}", &instance_id, e)); + .context(format!("Failed to delete data flow < {} >:", &instance_id))?; Ok(()) } diff --git a/zfctl/src/utils.rs b/zfctl/src/utils.rs index 982d536e..52004e44 100644 --- a/zfctl/src/utils.rs +++ b/zfctl/src/utils.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use rand::Rng; use zenoh::{query::ConsolidationMode, Session}; -use zenoh_flow_commons::RuntimeId; +use zenoh_flow_commons::{Result, RuntimeId}; use zenoh_flow_daemon::queries::{selector_all_runtimes, RuntimeInfo, RuntimesQuery}; /// Returns the list of [RuntimeInfo] of the reachable Zenoh-Flow Daemon(s). @@ -26,17 +26,27 @@ use zenoh_flow_daemon::queries::{selector_all_runtimes, RuntimeInfo, RuntimesQue /// - (internal error) the query to list the Zenoh-Flow Daemons could not be serialised by `serde_json`, /// - the query on the Zenoh network failed, /// - no Zenoh-Flow Daemon is reachable. -pub(crate) async fn get_all_runtimes(session: &Session) -> Vec { - let value = serde_json::to_vec(&RuntimesQuery::List) - .unwrap_or_else(|e| panic!("`serde_json` failed to serialize `RuntimeQuery::List`: {e:?}")); +pub(crate) async fn get_all_runtimes(session: &Session) -> Result> { + tracing::debug!("Fetching available Zenoh-Flow Daemon(s)"); + let value = match serde_json::to_vec(&RuntimesQuery::List) { + Ok(value) => value, + Err(e) => { + anyhow::bail!("`serde_json` failed to serialize `RuntimeQuery::List`: {e:?}"); + } + }; - let runtime_replies = session + let runtime_replies = match session .get(selector_all_runtimes()) .payload(value) // We want to address all the Zenoh-Flow daemons that are reachable on the Zenoh network. .consolidation(ConsolidationMode::None) .await - .unwrap_or_else(|e| panic!("Failed to query available daemons:\n{:?}", e)); + { + Ok(replies) => replies, + Err(e) => { + anyhow::bail!("Failed to send Query to Zenoh-Flow Daemon(s): {:?}", e); + } + }; let mut runtimes = Vec::new(); while let Ok(reply) = runtime_replies.recv_async().await { @@ -45,23 +55,21 @@ pub(crate) async fn get_all_runtimes(session: &Session) -> Vec { match serde_json::from_slice::(&sample.payload().to_bytes()) { Ok(runtime_info) => runtimes.push(runtime_info), Err(e) => { - tracing::error!("Failed to parse a reply as a `RuntimeId`:\n{:?}", e) + tracing::error!("Failed to parse a reply as a `RuntimeId`: {:?}", e) } } } - - Err(e) => tracing::warn!("A reply returned an error:\n{:?}", e), + Err(e) => tracing::warn!("A reply returned an error: {:?}", e), } } if runtimes.is_empty() { - panic!( - "No Zenoh-Flow daemon were detected. Have you checked if (i) they are up and (ii) \ - reachable through Zenoh?" - ); + anyhow::bail!("Found no Zenoh-Flow Daemon on the network"); } - runtimes + tracing::debug!("Found {} Zenoh-Flow Daemon(s)", runtimes.len()); + + Ok(runtimes) } /// Returns the unique identifier of the Zenoh-Flow Daemon that has the provided `name`. @@ -71,26 +79,26 @@ pub(crate) async fn get_all_runtimes(session: &Session) -> Vec { /// This function will panic if: /// - there is no Zenoh-Flow Daemon that has the provided name, /// - there are more than 1 Zenoh-Flow Daemon with the provided name. -pub(crate) async fn get_runtime_by_name(session: &Session, name: &str) -> RuntimeId { - let runtimes = get_all_runtimes(session).await; +pub(crate) async fn get_runtime_by_name(session: &Session, name: &str) -> Result { + let runtimes = get_all_runtimes(session).await?; let mut matching_runtimes = runtimes .iter() .filter(|&r_info| r_info.name.as_ref() == name) .collect_vec(); if matching_runtimes.is_empty() { - panic!("Found no Zenoh-Flow Daemon with name < {name} >"); + anyhow::bail!("Found no Zenoh-Flow Daemon with name < {name} >"); } else if matching_runtimes.len() > 1 { - tracing::error!("Found multiple Zenoh-Flow Daemons named < {name} >:"); + tracing::error!("Found multiple Zenoh-Flow Daemon named < {name} >:"); matching_runtimes.iter().for_each(|&r_info| { tracing::error!("- {} - (id) {}", r_info.name, r_info.id); }); - panic!( - "There are multiple Zenoh-Flow Daemons named < {name} >, please use their 'id' \ + anyhow::bail!( + "There are multiple Zenoh-Flow Daemons named < {name} >, please use their 'zid' \ instead" ); } else { - matching_runtimes.pop().unwrap().id.clone() + Ok(matching_runtimes.pop().unwrap().id.clone()) } } @@ -102,9 +110,15 @@ pub(crate) async fn get_runtime_by_name(session: &Session, name: &str) -> Runtim /// - (internal error) the query to list the Zenoh-Flow Daemons could not be serialised by `serde_json`, /// - the query on the Zenoh network failed, /// - no Zenoh-Flow Daemon is reachable. -pub(crate) async fn get_random_runtime(session: &Session) -> RuntimeId { - let mut runtimes = get_all_runtimes(session).await; +pub(crate) async fn get_random_runtime(session: &Session) -> Result { + let mut runtimes = get_all_runtimes(session).await?; let orchestrator = runtimes.remove(rand::thread_rng().gen_range(0..runtimes.len())); - orchestrator.id + tracing::info!( + "Selected Zenoh-Flow Daemon < {}: {} > as Orchestrator", + orchestrator.name, + orchestrator.id + ); + + Ok(orchestrator.id) }