diff --git a/zenoh-flow-daemon/src/configuration.rs b/zenoh-flow-daemon/src/daemon/configuration.rs similarity index 100% rename from zenoh-flow-daemon/src/configuration.rs rename to zenoh-flow-daemon/src/daemon/configuration.rs diff --git a/zenoh-flow-daemon/src/daemon.rs b/zenoh-flow-daemon/src/daemon/mod.rs similarity index 80% rename from zenoh-flow-daemon/src/daemon.rs rename to zenoh-flow-daemon/src/daemon/mod.rs index 0cb1ca28..40d6a1a4 100644 --- a/zenoh-flow-daemon/src/daemon.rs +++ b/zenoh-flow-daemon/src/daemon/mod.rs @@ -12,20 +12,18 @@ // ZettaScale Zenoh Team, // -#[cfg(not(feature = "plugin"))] -pub use crate::configuration::ZenohConfiguration; -pub use crate::configuration::{ExtensionsConfiguration, ZenohFlowConfiguration}; -pub use zenoh_flow_runtime::Runtime; +mod configuration; +mod queryables; -use crate::{instances, runtime}; +#[cfg(not(feature = "plugin"))] +pub use configuration::ZenohConfiguration; +pub use configuration::{ExtensionsConfiguration, ZenohFlowConfiguration}; +pub use zenoh_flow_runtime::{Extension, Extensions, Runtime}; -use anyhow::{anyhow, bail}; use flume::{Receiver, Sender}; -use serde::Deserialize; use std::sync::Arc; -use zenoh::{prelude::r#async::*, queryable::Query}; +use zenoh::prelude::r#async::*; use zenoh_flow_commons::{try_parse_from_file, Result, Vars}; -use zenoh_flow_runtime::Extensions; /// A Zenoh-Flow daemon declares 2 queryables: /// 1. `zenoh-flow//runtime` @@ -115,7 +113,7 @@ impl Daemon { let abort_ack = abort_ack_tx.clone(); if let Err(e) = - runtime::spawn_runtime_queryable(session.clone(), runtime.clone(), abort, abort_ack) + queryables::spawn_runtime_queryable(session.clone(), runtime.clone(), abort, abort_ack) .await { tracing::error!( @@ -126,7 +124,7 @@ impl Daemon { } if let Err(e) = - instances::spawn_instances_queryable(session, runtime.clone(), abort_rx, abort_ack_tx) + queryables::spawn_instances_queryable(session, runtime.clone(), abort_rx, abort_ack_tx) .await { tracing::error!( @@ -187,34 +185,3 @@ impl Daemon { } } } - -/// Validate a query and try to deserialize into an instance of `T`. -/// -/// This function checks that the query is correct: -/// - it has a payload, -/// - the encoding is "correct", -/// - the payload can be deserialized into an instance of `T`. -/// -/// If any check fails, an error message is logged and the query is dropped. -/// -/// After these checks, the method `process` is called on the variant of `InstancesQuery`. -pub(crate) async fn validate_query Deserialize<'a>>(query: &Query) -> Result { - let value = match query.value() { - Some(value) => value, - None => { - bail!("Received empty payload"); - } - }; - - if ![ - Encoding::APP_OCTET_STREAM, - Encoding::APP_JSON, - Encoding::TEXT_JSON, - ] - .contains(&value.encoding) - { - bail!("Encoding < {} > is not supported", value.encoding); - } - - serde_json::from_slice::(&value.payload.contiguous()).map_err(|e| anyhow!("{:?}", e)) -} diff --git a/zenoh-flow-daemon/src/instances/mod.rs b/zenoh-flow-daemon/src/daemon/queryables.rs similarity index 55% rename from zenoh-flow-daemon/src/instances/mod.rs rename to zenoh-flow-daemon/src/daemon/queryables.rs index 0e4406d0..445f49af 100644 --- a/zenoh-flow-daemon/src/instances/mod.rs +++ b/zenoh-flow-daemon/src/daemon/queryables.rs @@ -1,5 +1,5 @@ // -// Copyright (c) 2021 - 2023 ZettaScale Technology +// Copyright (c) 2021 - 2024 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at @@ -12,33 +12,20 @@ // ZettaScale Zenoh Team, // -mod queries; -use crate::daemon::validate_query; -use crate::selectors; - -pub use self::queries::InstancesQuery; - -mod abort; -mod create; -mod delete; -mod start; +use crate::queries::instances::InstancesQuery; +use crate::queries::runtime::RuntimesQuery; +use crate::queries::selectors; +use crate::queries::validate_query; use std::sync::Arc; use anyhow::bail; use flume::{Receiver, Sender}; use futures::select; -use serde::{Deserialize, Serialize}; use zenoh::prelude::r#async::*; use zenoh_flow_commons::Result; use zenoh_flow_runtime::Runtime; -#[derive(Debug, Deserialize, Serialize)] -pub enum Origin { - Client, - Daemon, -} - /// Spawns an async task to answer queries received on `zenoh-flow/{runtime_id}/instances`. pub(crate) async fn spawn_instances_queryable( zenoh_session: Arc, @@ -101,3 +88,66 @@ pub(crate) async fn spawn_instances_queryable( Ok(()) } + +pub(crate) async fn spawn_runtime_queryable( + zenoh_session: Arc, + runtime: Arc, + abort_rx: Receiver<()>, + abort_ack_tx: Sender<()>, +) -> Result<()> { + let ke_runtime = selectors::selector_runtimes(runtime.id()); + + let queryable = match zenoh_session + .declare_queryable(ke_runtime.clone()) + .res() + .await + { + Ok(queryable) => { + tracing::trace!("declared queryable < {} >", ke_runtime); + queryable + } + Err(e) => { + bail!("Failed to declare Zenoh queryable 'runtimes': {:?}", e) + } + }; + + async_std::task::spawn(async move { + loop { + select!( + _ = abort_rx.recv_async() => { + tracing::trace!("Received abort signal"); + break; + } + + query = queryable.recv_async() => { + match query { + Ok(query) => { + let runtime_query: RuntimesQuery = match validate_query(&query).await { + Ok(runtime_query) => runtime_query, + Err(e) => { + tracing::error!("Unable to parse `RuntimesQuery`: {:?}", e); + return; + } + }; + + let runtime = runtime.clone(); + async_std::task::spawn(async move { + runtime_query.process(query, runtime).await; + }); + } + Err(e) => { + tracing::error!("Queryable 'runtimes' dropped: {:?}", e); + return; + } + } + } + ) + } + + abort_ack_tx.send_async(()).await.unwrap_or_else(|e| { + tracing::error!("Queryable 'runtime' failed to acknowledge abort: {:?}", e); + }); + }); + + Ok(()) +} diff --git a/zenoh-flow-daemon/src/lib.rs b/zenoh-flow-daemon/src/lib.rs index 081e8d62..681843e3 100644 --- a/zenoh-flow-daemon/src/lib.rs +++ b/zenoh-flow-daemon/src/lib.rs @@ -12,16 +12,5 @@ // ZettaScale Zenoh Team, // -pub(crate) mod configuration; -pub(crate) mod instances; -pub(crate) mod runtime; -pub(crate) mod selectors; - pub mod daemon; - -pub mod queries { - pub use crate::instances::{InstancesQuery, Origin}; - pub use crate::runtime::{RuntimeInfo, RuntimeStatus, RuntimesQuery}; - pub use crate::selectors::*; - pub use zenoh_flow_runtime::InstanceStatus; -} +pub mod queries; diff --git a/zenoh-flow-daemon/src/instances/abort.rs b/zenoh-flow-daemon/src/queries/instances/abort.rs similarity index 98% rename from zenoh-flow-daemon/src/instances/abort.rs rename to zenoh-flow-daemon/src/queries/instances/abort.rs index e3222180..c2667921 100644 --- a/zenoh-flow-daemon/src/instances/abort.rs +++ b/zenoh-flow-daemon/src/queries/instances/abort.rs @@ -13,7 +13,7 @@ // use super::{InstancesQuery, Origin}; -use crate::selectors; +use crate::queries::selectors; use std::sync::Arc; diff --git a/zenoh-flow-daemon/src/instances/create.rs b/zenoh-flow-daemon/src/queries/instances/create.rs similarity index 99% rename from zenoh-flow-daemon/src/instances/create.rs rename to zenoh-flow-daemon/src/queries/instances/create.rs index d82eee54..a6d74fe5 100644 --- a/zenoh-flow-daemon/src/instances/create.rs +++ b/zenoh-flow-daemon/src/queries/instances/create.rs @@ -12,11 +12,10 @@ // ZettaScale Zenoh Team, // -use std::sync::Arc; - -use crate::selectors; - use super::InstancesQuery; +use crate::queries::selectors; + +use std::sync::Arc; use anyhow::Context; use zenoh::prelude::r#async::*; diff --git a/zenoh-flow-daemon/src/instances/delete.rs b/zenoh-flow-daemon/src/queries/instances/delete.rs similarity index 99% rename from zenoh-flow-daemon/src/instances/delete.rs rename to zenoh-flow-daemon/src/queries/instances/delete.rs index ff4ed66c..41ef8596 100644 --- a/zenoh-flow-daemon/src/instances/delete.rs +++ b/zenoh-flow-daemon/src/queries/instances/delete.rs @@ -13,7 +13,7 @@ // use super::{InstancesQuery, Origin}; -use crate::selectors; +use crate::queries::selectors; use std::sync::Arc; diff --git a/zenoh-flow-daemon/src/instances/queries.rs b/zenoh-flow-daemon/src/queries/instances/mod.rs similarity index 96% rename from zenoh-flow-daemon/src/instances/queries.rs rename to zenoh-flow-daemon/src/queries/instances/mod.rs index 7fde4eb7..f91f4ce0 100644 --- a/zenoh-flow-daemon/src/instances/queries.rs +++ b/zenoh-flow-daemon/src/queries/instances/mod.rs @@ -1,5 +1,5 @@ // -// Copyright (c) 2021 - 2024 ZettaScale Technology +// Copyright (c) 2021 - 2023 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at @@ -12,8 +12,10 @@ // ZettaScale Zenoh Team, // -use super::{abort, create, delete, start}; -use crate::instances::Origin; +mod abort; +mod create; +mod delete; +mod start; use std::{fmt::Debug, sync::Arc}; @@ -25,6 +27,12 @@ use zenoh_flow_descriptors::FlattenedDataFlowDescriptor; use zenoh_flow_records::DataFlowRecord; use zenoh_flow_runtime::Runtime; +#[derive(Debug, Deserialize, Serialize)] +pub enum Origin { + Client, + Daemon, +} + async fn reply(query: Query, data: Result) -> Result<()> { let sample = match data { Ok(data) => match serde_json::to_vec(&data) { diff --git a/zenoh-flow-daemon/src/instances/start.rs b/zenoh-flow-daemon/src/queries/instances/start.rs similarity index 99% rename from zenoh-flow-daemon/src/instances/start.rs rename to zenoh-flow-daemon/src/queries/instances/start.rs index 1dfb1e8e..b1f38486 100644 --- a/zenoh-flow-daemon/src/instances/start.rs +++ b/zenoh-flow-daemon/src/queries/instances/start.rs @@ -13,7 +13,7 @@ // use super::{abort, InstancesQuery, Origin}; -use crate::selectors; +use crate::queries::selectors; use std::sync::Arc; diff --git a/zenoh-flow-daemon/src/queries/mod.rs b/zenoh-flow-daemon/src/queries/mod.rs new file mode 100644 index 00000000..69876efb --- /dev/null +++ b/zenoh-flow-daemon/src/queries/mod.rs @@ -0,0 +1,59 @@ +// +// Copyright (c) 2021 - 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub(crate) mod instances; +pub(crate) mod runtime; +pub(crate) mod selectors; + +pub use instances::{InstancesQuery, Origin}; +pub use runtime::{RuntimeInfo, RuntimeStatus, RuntimesQuery}; +pub use selectors::*; +pub use zenoh_flow_runtime::InstanceStatus; + +use anyhow::{anyhow, bail}; +use serde::Deserialize; +use zenoh::prelude::*; +use zenoh::queryable::Query; +use zenoh_flow_commons::Result; + +/// Validate a query and try to deserialize into an instance of `T`. +/// +/// This function checks that the query is correct: +/// - it has a payload, +/// - the encoding is "correct", +/// - the payload can be deserialized into an instance of `T`. +/// +/// If any check fails, an error message is logged and the query is dropped. +/// +/// After these checks, the method `process` is called on the variant of `InstancesQuery`. +pub(crate) async fn validate_query Deserialize<'a>>(query: &Query) -> Result { + let value = match query.value() { + Some(value) => value, + None => { + bail!("Received empty payload"); + } + }; + + if ![ + Encoding::APP_OCTET_STREAM, + Encoding::APP_JSON, + Encoding::TEXT_JSON, + ] + .contains(&value.encoding) + { + bail!("Encoding < {} > is not supported", value.encoding); + } + + serde_json::from_slice::(&value.payload.contiguous()).map_err(|e| anyhow!("{:?}", e)) +} diff --git a/zenoh-flow-daemon/src/runtime.rs b/zenoh-flow-daemon/src/queries/runtime.rs similarity index 59% rename from zenoh-flow-daemon/src/runtime.rs rename to zenoh-flow-daemon/src/queries/runtime.rs index e219f449..d6f58d50 100644 --- a/zenoh-flow-daemon/src/runtime.rs +++ b/zenoh-flow-daemon/src/queries/runtime.rs @@ -14,18 +14,13 @@ use std::{collections::HashMap, sync::Arc}; -use anyhow::bail; -use flume::{Receiver, Sender}; -use futures::select; use serde::{Deserialize, Serialize}; use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind}; use zenoh::prelude::r#async::*; use zenoh::queryable::Query; -use zenoh_flow_commons::{InstanceId, Result, RuntimeId}; +use zenoh_flow_commons::{InstanceId, RuntimeId}; use zenoh_flow_runtime::{InstanceState, Runtime}; -use crate::{daemon::validate_query, selectors::selector_runtimes}; - #[derive(Debug, Deserialize, Serialize)] pub enum RuntimesQuery { List, @@ -102,66 +97,3 @@ Caused by: } } } - -pub(crate) async fn spawn_runtime_queryable( - zenoh_session: Arc, - runtime: Arc, - abort_rx: Receiver<()>, - abort_ack_tx: Sender<()>, -) -> Result<()> { - let ke_runtime = selector_runtimes(runtime.id()); - - let queryable = match zenoh_session - .declare_queryable(ke_runtime.clone()) - .res() - .await - { - Ok(queryable) => { - tracing::trace!("declared queryable < {} >", ke_runtime); - queryable - } - Err(e) => { - bail!("Failed to declare Zenoh queryable 'runtimes': {:?}", e) - } - }; - - async_std::task::spawn(async move { - loop { - select!( - _ = abort_rx.recv_async() => { - tracing::trace!("Received abort signal"); - break; - } - - query = queryable.recv_async() => { - match query { - Ok(query) => { - let runtime_query: RuntimesQuery = match validate_query(&query).await { - Ok(runtime_query) => runtime_query, - Err(e) => { - tracing::error!("Unable to parse `RuntimesQuery`: {:?}", e); - return; - } - }; - - let runtime = runtime.clone(); - async_std::task::spawn(async move { - runtime_query.process(query, runtime).await; - }); - } - Err(e) => { - tracing::error!("Queryable 'runtimes' dropped: {:?}", e); - return; - } - } - } - ) - } - - abort_ack_tx.send_async(()).await.unwrap_or_else(|e| { - tracing::error!("Queryable 'runtime' failed to acknowledge abort: {:?}", e); - }); - }); - - Ok(()) -} diff --git a/zenoh-flow-daemon/src/selectors.rs b/zenoh-flow-daemon/src/queries/selectors.rs similarity index 100% rename from zenoh-flow-daemon/src/selectors.rs rename to zenoh-flow-daemon/src/queries/selectors.rs