Skip to content

Commit

Permalink
refactor(zfctl): improve logs (#266)
Browse files Browse the repository at this point in the history
This commit brings some improvements to the outputted logs when
launching `zfctl` and the `zfctl daemon` command.

The most notable additions are:
- when starting a Zenoh-Flow Daemon, its name and Zenoh ID will be
  logged,
- the `RUST_LOG` environment variable will be used if specified,
- the default logging level was set to `zfctl=info,zenoh_flow=info`.

* zfctl/src/daemon_command.rs:
  - Added a `span` at the top of the `run` method to be used within the
    `async` block.
  - Reworked the code to not `panic!` when an error occurred but instead
    return it and/or add logs.
* zfctl/src/instance_command.rs:
  - Properly display the id of the newly created instance.
  - Display a command to check for the status of a newly created
    instance.
* zfctl/src/main.rs:
  - Read the environment variable RUST_LOG to control the logs.
  - Set the default logging to `zfctl=info,zenoh_flow=info`.
  - Instrument the async block using a span.
* zfctl/src/run_local_command.rs:
  - Reworked the code to not panic when an error occurred but instead
    return it and/or add logs.
* zfctl/src/utils.rs:
  - Reworked the code to not panic when an error occurred but instead
    return it and/or add logs.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
J-Loudet authored Jan 29, 2025
1 parent 5f5e358 commit 22b74b3
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 121 deletions.
130 changes: 90 additions & 40 deletions zfctl/src/daemon_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use clap::{ArgGroup, Subcommand};
use comfy_table::{Row, Table};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook_async_std::Signals;
use tracing::Instrument;
use zenoh::Session;
use zenoh_flow_commons::{try_parse_from_file, Result, RuntimeId, Vars};
use zenoh_flow_daemon::{
Expand Down Expand Up @@ -91,60 +92,109 @@ pub(crate) enum DaemonCommand {

impl DaemonCommand {
pub async fn run(self, session: Session) -> Result<()> {
let span = tracing::info_span!(target: "zfctl", "daemon", name = tracing::field::Empty, zid = tracing::field::Empty);
let _guard = span.enter();

match self {
DaemonCommand::Start {
name,
configuration,
} => {
let daemon = match configuration {
Some(path) => {
let (zenoh_flow_configuration, _) =
try_parse_from_file::<ZenohFlowConfiguration>(&path, Vars::default())
.unwrap_or_else(|e| {
panic!(
"Failed to parse a Zenoh-Flow Configuration from < {} \
>:\n{e:?}",
path.display()
)
});
let (daemon, daemon_name, zid) =
match configuration {
Some(path) => {
let zenoh_flow_configuration = match try_parse_from_file::<
ZenohFlowConfiguration,
>(
&path, Vars::default()
) {
Ok((configuration, _)) => configuration,
Err(e) => {
anyhow::bail!(
"Failed to parse Zenoh-Flow configuration from < {} >: {e:?}",
path.display()
);
}
};

Daemon::spawn_from_config(session, zenoh_flow_configuration)
.await
.expect("Failed to spawn the Zenoh-Flow Daemon")
}
None => Daemon::spawn(
Runtime::builder(name.unwrap())
.session(session)
.build()
.await
.expect("Failed to build the Zenoh-Flow Runtime"),
)
.await
.expect("Failed to spawn the Zenoh-Flow Daemon"),
};
let daemon_name = zenoh_flow_configuration.name.clone();
let zid = session.zid();
let daemon =
match Daemon::spawn_from_config(session, zenoh_flow_configuration)
.await
{
Ok(daemon) => daemon,
Err(e) => anyhow::bail!(
"Failed to spawn Zenoh-Flow Daemon < {daemon_name} >: {e:?}",
),
};

async_std::task::spawn(async move {
let mut signals = Signals::new([SIGTERM, SIGINT, SIGQUIT])
.expect("Failed to create SignalsInfo for: [SIGTERM, SIGINT, SIGQUIT]");
(daemon, daemon_name, zid)
}
None => {
// NOTE: We can safely unwrap here: in order to start a Zenoh-Flow Daemon,
// either a name or a configuration must be provided.
//
// If that branch gets executed it means that a name was necessarily
// provided (`clap` would complain otherwise).
let daemon_name = name.unwrap();
let zid = session.zid();
let runtime = match Runtime::builder(&daemon_name)
.session(session)
.build()
.await
{
Ok(runtime) => runtime,
Err(e) => anyhow::bail!(
"Failed to build Zenoh-Flow Runtime for < {daemon_name} >: {e:?}"
),
};

while let Some(signal) = signals.next().await {
match signal {
SIGTERM | SIGINT | SIGQUIT => {
tracing::info!("Received termination signal, shutting down.");
daemon.stop().await;
break;
}
let daemon = match Daemon::spawn(runtime).await {
Ok(daemon) => daemon,
Err(e) => {
anyhow::bail!(
"Failed to spawn Zenoh-Flow Daemon < {daemon_name} >: {e:?}"
)
}
};

(daemon, daemon_name, zid)
}
};

span.record("name", daemon_name);
span.record("zid", format!("{zid}"));

tracing::info!("Started Zenoh-Flow Daemon");

signal => {
tracing::warn!("Ignoring signal ({signal})");
let current_span = span.clone();

async_std::task::spawn(
async move {
let mut signals = Signals::new([SIGTERM, SIGINT, SIGQUIT])
.expect("Failed to create SignalsInfo for: [SIGTERM, SIGINT, SIGQUIT]");

while let Some(signal) = signals.next().await {
match signal {
SIGTERM | SIGINT | SIGQUIT => {
tracing::info!("Received termination signal");
daemon.stop().await;
break;
}

signal => {
tracing::warn!("Ignoring signal ({signal})");
}
}
}
}
})
.instrument(current_span),
)
.await;
}
DaemonCommand::List => {
let runtimes = get_all_runtimes(&session).await;
let runtimes = get_all_runtimes(&session).await?;

let mut table = Table::new();
table.set_width(80);
Expand All @@ -161,7 +211,7 @@ impl DaemonCommand {
} => {
let runtime_id = match (daemon_id, daemon_name) {
(Some(id), _) => id,
(None, Some(name)) => get_runtime_by_name(&session, &name).await,
(None, Some(name)) => get_runtime_by_name(&session, &name).await?,
(None, None) => {
// This code is indeed unreachable because:
// (1) The `group` macro has `required = true` which indicates that clap requires an entry for
Expand Down
18 changes: 11 additions & 7 deletions zfctl/src/instance_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,17 @@ Caused by:
};

match sample.result() {
Ok(sample) => {
tracing::info!(
"If successful, the instance will have the id: {:?}",
sample.payload().try_to_string()
);
println!("{:?}", sample.payload().try_to_string());
}
Ok(sample) => match sample.payload().try_to_string() {
Ok(instance_id) => {
tracing::info!("Instance: {instance_id}");
tracing::info!(
"To check its status:\n\tzfctl instance status {instance_id}",
);
}
Err(e) => {
tracing::error!("Failed to parse Instance ID: {e:?}");
}
},
Err(err) => tracing::error!("Failed to create instance: {:?}", err),
}
}
Expand Down
102 changes: 69 additions & 33 deletions zfctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ use daemon_command::DaemonCommand;

mod run_local_command;
use run_local_command::RunLocalCommand;
use tracing::Instrument;
use tracing_subscriber::EnvFilter;

mod utils;
use std::path::PathBuf;

use anyhow::anyhow;
use clap::{ArgGroup, Parser, Subcommand};
use utils::{get_random_runtime, get_runtime_by_name};
use zenoh_flow_commons::{Result, RuntimeId};
use zenoh_flow_commons::RuntimeId;

const ZENOH_FLOW_INTERNAL_ERROR: &str = r#"
`zfctl` encountered a fatal internal error.
Expand Down Expand Up @@ -97,44 +98,79 @@ enum Command {
}

#[async_std::main]
async fn main() -> Result<()> {
// TODO Configure tracing such that:
// - if the environment variable RUST_LOG is set, it is applied,
// let a = std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV);
// - otherwise, provide a default that will only log INFO or above messages, for zfctl only.
let _ = tracing_subscriber::fmt::try_init();
async fn main() {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("zfctl=info,zenoh_flow=info"));
let subscriber = tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.finish();
let _ = tracing::subscriber::set_global_default(subscriber);

let span = tracing::info_span!("zfctl");
let _guard = span.enter();

let zfctl = Zfctl::parse();

let zenoh_config = match zfctl.zenoh_configuration {
Some(path) => zenoh::Config::from_file(path.clone()).map_err(|e| {
anyhow!(
"Failed to parse the Zenoh configuration from < {} >:\n{e:?}",
path.display()
)
})?,
Some(path) => match zenoh::Config::from_file(path.clone()) {
Ok(path) => path,
Err(e) => {
tracing::error!(
"Failed to parse Zenoh configuration from < {} >: {e:?}",
path.display()
);
return;
}
},
None => zenoh::Config::default(),
};

let session = zenoh::open(zenoh_config)
.await
.map_err(|e| anyhow!("Failed to open Zenoh session:\n{:?}", e))?;

match zfctl.command {
Command::Instance {
command,
daemon_id,
daemon_name,
} => {
let orchestrator_id = match (daemon_id, daemon_name) {
(Some(id), _) => id,
(None, Some(name)) => get_runtime_by_name(&session, &name).await,
(None, None) => get_random_runtime(&session).await,
};

command.run(session, orchestrator_id).await
async {
let session = match zenoh::open(zenoh_config).await {
Ok(session) => session,
Err(e) => {
tracing::error!("Failed to open Zenoh session: {e:?}");
return;
}
};

tracing::info!("Using ZID: {}", session.zid());

let result = match zfctl.command {
Command::Instance {
command,
daemon_id,
daemon_name,
} => {
let orchestrator_id = match (daemon_id, daemon_name) {
(Some(id), _) => id,
(None, Some(name)) => match get_runtime_by_name(&session, &name).await {
Ok(id) => id,
Err(e) => {
tracing::error!("{e:?}");
return;
}
},
(None, None) => match get_random_runtime(&session).await {
Ok(id) => id,
Err(e) => {
tracing::error!("{e:?}");
return;
}
},
};

command.run(session, orchestrator_id).await
}
Command::Daemon(command) => command.run(session).await,
Command::RunLocal(command) => command.run(session).await,
};

if let Err(e) = result {
tracing::error!("{e:?}");
}
Command::Daemon(command) => command.run(session).await,
Command::RunLocal(command) => command.run(session).await,
}
.in_current_span()
.await;
}
29 changes: 12 additions & 17 deletions zfctl/src/run_local_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ impl RunLocalCommand {
.context(format!(
"Failed to load Loader configuration from < {} >",
&extensions_path.display()
))
.unwrap();
))?;

extensions
}
Expand All @@ -56,42 +55,38 @@ impl RunLocalCommand {
.context(format!(
"Failed to load data flow descriptor from < {} >",
&self.flow.display()
))
.unwrap();
))?;

let flattened_flow = FlattenedDataFlowDescriptor::try_flatten(data_flow, vars)
.context(format!(
let flattened_flow =
FlattenedDataFlowDescriptor::try_flatten(data_flow, vars).context(format!(
"Failed to flattened data flow extracted from < {} >",
&self.flow.display()
))
.unwrap();
))?;

let runtime_builder = Runtime::builder("zenoh-flow-standalone-runtime")
.session(session)
.add_extensions(extensions)
.expect("Failed to add extensions")
.session(session);
.context("Failed to add extensions")?;

let runtime = runtime_builder
.build()
.await
.expect("Failed to build the Zenoh-Flow runtime");
.context("Failed to build the Zenoh-Flow runtime")?;

let record = DataFlowRecord::try_new(&flattened_flow, runtime.id())
.context("Failed to create a Record from the flattened data flow descriptor")
.unwrap();
.context("Failed to create a Record from the flattened data flow descriptor")?;

let instance_id = record.instance_id().clone();
let record_name = record.name().clone();
runtime
.try_load_data_flow(record)
.await
.context("Failed to load Record")
.unwrap();
.context("Failed to load Record")?;

runtime
.try_start_instance(&instance_id)
.await
.unwrap_or_else(|e| panic!("Failed to start data flow < {} >: {:?}", &instance_id, e));
.context(format!("Failed to start data flow < {} >", &instance_id))?;

let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
Expand All @@ -113,7 +108,7 @@ impl RunLocalCommand {
runtime
.try_delete_instance(&instance_id)
.await
.unwrap_or_else(|e| panic!("Failed to delete data flow < {} >: {:?}", &instance_id, e));
.context(format!("Failed to delete data flow < {} >:", &instance_id))?;

Ok(())
}
Expand Down
Loading

0 comments on commit 22b74b3

Please sign in to comment.