Skip to content

Commit

Permalink
style(daemon): better module separation (#222)
Browse files Browse the repository at this point in the history
This crate exposes two sets of elements:
- the Zenoh-Flow daemon and how to create it,
- a set of Queries to interact and manage a Daemon from the outside

This commit provides a better logical separation than what was done
in #221. In particular, the functions to spawn the queryables are located
in the daemon module.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
J-Loudet authored Mar 21, 2024
1 parent f0a18d9 commit 2b22ef0
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 151 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,18 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

#[cfg(not(feature = "plugin"))]
pub use crate::configuration::ZenohConfiguration;
pub use crate::configuration::{ExtensionsConfiguration, ZenohFlowConfiguration};
pub use zenoh_flow_runtime::Runtime;
mod configuration;
mod queryables;

use crate::{instances, runtime};
#[cfg(not(feature = "plugin"))]
pub use configuration::ZenohConfiguration;
pub use configuration::{ExtensionsConfiguration, ZenohFlowConfiguration};
pub use zenoh_flow_runtime::{Extension, Extensions, Runtime};

use anyhow::{anyhow, bail};
use flume::{Receiver, Sender};
use serde::Deserialize;
use std::sync::Arc;
use zenoh::{prelude::r#async::*, queryable::Query};
use zenoh::prelude::r#async::*;
use zenoh_flow_commons::{try_parse_from_file, Result, Vars};
use zenoh_flow_runtime::Extensions;

/// A Zenoh-Flow daemon declares 2 queryables:
/// 1. `zenoh-flow/<uuid>/runtime`
Expand Down Expand Up @@ -115,7 +113,7 @@ impl Daemon {
let abort_ack = abort_ack_tx.clone();

if let Err(e) =
runtime::spawn_runtime_queryable(session.clone(), runtime.clone(), abort, abort_ack)
queryables::spawn_runtime_queryable(session.clone(), runtime.clone(), abort, abort_ack)
.await
{
tracing::error!(
Expand All @@ -126,7 +124,7 @@ impl Daemon {
}

if let Err(e) =
instances::spawn_instances_queryable(session, runtime.clone(), abort_rx, abort_ack_tx)
queryables::spawn_instances_queryable(session, runtime.clone(), abort_rx, abort_ack_tx)
.await
{
tracing::error!(
Expand Down Expand Up @@ -187,34 +185,3 @@ impl Daemon {
}
}
}

/// Validate a query and try to deserialize into an instance of `T`.
///
/// This function checks that the query is correct:
/// - it has a payload,
/// - the encoding is "correct",
/// - the payload can be deserialized into an instance of `T`.
///
/// If any check fails, an error message is logged and the query is dropped.
///
/// After these checks, the method `process` is called on the variant of `InstancesQuery`.
pub(crate) async fn validate_query<T: for<'a> Deserialize<'a>>(query: &Query) -> Result<T> {
let value = match query.value() {
Some(value) => value,
None => {
bail!("Received empty payload");
}
};

if ![
Encoding::APP_OCTET_STREAM,
Encoding::APP_JSON,
Encoding::TEXT_JSON,
]
.contains(&value.encoding)
{
bail!("Encoding < {} > is not supported", value.encoding);
}

serde_json::from_slice::<T>(&value.payload.contiguous()).map_err(|e| anyhow!("{:?}", e))
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
// Copyright (c) 2021 - 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
Expand All @@ -12,33 +12,20 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

mod queries;
use crate::daemon::validate_query;
use crate::selectors;

pub use self::queries::InstancesQuery;

mod abort;
mod create;
mod delete;
mod start;
use crate::queries::instances::InstancesQuery;
use crate::queries::runtime::RuntimesQuery;
use crate::queries::selectors;
use crate::queries::validate_query;

use std::sync::Arc;

use anyhow::bail;
use flume::{Receiver, Sender};
use futures::select;
use serde::{Deserialize, Serialize};
use zenoh::prelude::r#async::*;
use zenoh_flow_commons::Result;
use zenoh_flow_runtime::Runtime;

#[derive(Debug, Deserialize, Serialize)]
pub enum Origin {
Client,
Daemon,
}

/// Spawns an async task to answer queries received on `zenoh-flow/{runtime_id}/instances`.
pub(crate) async fn spawn_instances_queryable(
zenoh_session: Arc<Session>,
Expand Down Expand Up @@ -101,3 +88,66 @@ pub(crate) async fn spawn_instances_queryable(

Ok(())
}

pub(crate) async fn spawn_runtime_queryable(
zenoh_session: Arc<Session>,
runtime: Arc<Runtime>,
abort_rx: Receiver<()>,
abort_ack_tx: Sender<()>,
) -> Result<()> {
let ke_runtime = selectors::selector_runtimes(runtime.id());

let queryable = match zenoh_session
.declare_queryable(ke_runtime.clone())
.res()
.await
{
Ok(queryable) => {
tracing::trace!("declared queryable < {} >", ke_runtime);
queryable
}
Err(e) => {
bail!("Failed to declare Zenoh queryable 'runtimes': {:?}", e)
}
};

async_std::task::spawn(async move {
loop {
select!(
_ = abort_rx.recv_async() => {
tracing::trace!("Received abort signal");
break;
}

query = queryable.recv_async() => {
match query {
Ok(query) => {
let runtime_query: RuntimesQuery = match validate_query(&query).await {
Ok(runtime_query) => runtime_query,
Err(e) => {
tracing::error!("Unable to parse `RuntimesQuery`: {:?}", e);
return;
}
};

let runtime = runtime.clone();
async_std::task::spawn(async move {
runtime_query.process(query, runtime).await;
});
}
Err(e) => {
tracing::error!("Queryable 'runtimes' dropped: {:?}", e);
return;
}
}
}
)
}

abort_ack_tx.send_async(()).await.unwrap_or_else(|e| {
tracing::error!("Queryable 'runtime' failed to acknowledge abort: {:?}", e);
});
});

Ok(())
}
13 changes: 1 addition & 12 deletions zenoh-flow-daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,5 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

pub(crate) mod configuration;
pub(crate) mod instances;
pub(crate) mod runtime;
pub(crate) mod selectors;

pub mod daemon;

pub mod queries {
pub use crate::instances::{InstancesQuery, Origin};
pub use crate::runtime::{RuntimeInfo, RuntimeStatus, RuntimesQuery};
pub use crate::selectors::*;
pub use zenoh_flow_runtime::InstanceStatus;
}
pub mod queries;
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//

use super::{InstancesQuery, Origin};
use crate::selectors;
use crate::queries::selectors;

use std::sync::Arc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use std::sync::Arc;

use crate::selectors;

use super::InstancesQuery;
use crate::queries::selectors;

use std::sync::Arc;

use anyhow::Context;
use zenoh::prelude::r#async::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//

use super::{InstancesQuery, Origin};
use crate::selectors;
use crate::queries::selectors;

use std::sync::Arc;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (c) 2021 - 2024 ZettaScale Technology
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
Expand All @@ -12,8 +12,10 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use super::{abort, create, delete, start};
use crate::instances::Origin;
mod abort;
mod create;
mod delete;
mod start;

use std::{fmt::Debug, sync::Arc};

Expand All @@ -25,6 +27,12 @@ use zenoh_flow_descriptors::FlattenedDataFlowDescriptor;
use zenoh_flow_records::DataFlowRecord;
use zenoh_flow_runtime::Runtime;

#[derive(Debug, Deserialize, Serialize)]
pub enum Origin {
Client,
Daemon,
}

async fn reply<T: Serialize + Debug>(query: Query, data: Result<T>) -> Result<()> {
let sample = match data {
Ok(data) => match serde_json::to_vec(&data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//

use super::{abort, InstancesQuery, Origin};
use crate::selectors;
use crate::queries::selectors;

use std::sync::Arc;

Expand Down
59 changes: 59 additions & 0 deletions zenoh-flow-daemon/src/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//
// Copyright (c) 2021 - 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

pub(crate) mod instances;
pub(crate) mod runtime;
pub(crate) mod selectors;

pub use instances::{InstancesQuery, Origin};
pub use runtime::{RuntimeInfo, RuntimeStatus, RuntimesQuery};
pub use selectors::*;
pub use zenoh_flow_runtime::InstanceStatus;

use anyhow::{anyhow, bail};
use serde::Deserialize;
use zenoh::prelude::*;
use zenoh::queryable::Query;
use zenoh_flow_commons::Result;

/// Validate a query and try to deserialize into an instance of `T`.
///
/// This function checks that the query is correct:
/// - it has a payload,
/// - the encoding is "correct",
/// - the payload can be deserialized into an instance of `T`.
///
/// If any check fails, an error message is logged and the query is dropped.
///
/// After these checks, the method `process` is called on the variant of `InstancesQuery`.
pub(crate) async fn validate_query<T: for<'a> Deserialize<'a>>(query: &Query) -> Result<T> {
let value = match query.value() {
Some(value) => value,
None => {
bail!("Received empty payload");
}
};

if ![
Encoding::APP_OCTET_STREAM,
Encoding::APP_JSON,
Encoding::TEXT_JSON,
]
.contains(&value.encoding)
{
bail!("Encoding < {} > is not supported", value.encoding);
}

serde_json::from_slice::<T>(&value.payload.contiguous()).map_err(|e| anyhow!("{:?}", e))
}
Loading

0 comments on commit 2b22ef0

Please sign in to comment.