Skip to content

Commit

Permalink
refactor(daemon): expose modules queries and daemon (#221)
Browse files Browse the repository at this point in the history
The `queries` module exposes (and groups) the structures needed to interact with
a Zenoh-Flow daemon from the outside.

The `daemon` module exposes (and groups) the structure needed to start a
Zenoh-Flow daemon.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
J-Loudet authored Mar 16, 2024
1 parent 088ee19 commit f0a18d9
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 211 deletions.
220 changes: 220 additions & 0 deletions zenoh-flow-daemon/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
//
// Copyright (c) 2021 - 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

#[cfg(not(feature = "plugin"))]
pub use crate::configuration::ZenohConfiguration;
pub use crate::configuration::{ExtensionsConfiguration, ZenohFlowConfiguration};
pub use zenoh_flow_runtime::Runtime;

use crate::{instances, runtime};

use anyhow::{anyhow, bail};
use flume::{Receiver, Sender};
use serde::Deserialize;
use std::sync::Arc;
use zenoh::{prelude::r#async::*, queryable::Query};
use zenoh_flow_commons::{try_parse_from_file, Result, Vars};
use zenoh_flow_runtime::Extensions;

/// A Zenoh-Flow daemon declares 2 queryables:
/// 1. `zenoh-flow/<uuid>/runtime`
/// 2. `zenoh-flow/<uuid>/instances`
const NUMBER_QUERYABLES: usize = 2;

pub struct Daemon {
abort_tx: Sender<()>,
abort_ack_rx: Receiver<()>,
}

impl Daemon {
/// Spawn a new Zenoh-Flow Daemon with its [runtime] configured via a [configuration].
///
/// Note that this configuration can be parsed from a file. This function is for instance leveraged by the Zenoh
/// plugin for Zenoh-Flow: the configuration of Zenoh-Flow is parsed from Zenoh's configuration file.
///
/// # Errors
///
/// This function can fail for the following reasons:
/// - the [extensions] section in the configuration points to a file and that file is not a valid declaration of
/// [extensions],
/// - the [extensions] could not be added to the Runtime (see the list of potential reasons
/// [here](zenoh_flow_runtime::RuntimeBuilder::add_extensions())),
/// - if the feature `plugin` is disabled (it is by default):
/// - the Zenoh
///
/// [extensions]: Extensions
/// [runtime]: Runtime
/// [configuration]: ZenohFlowConfiguration
pub async fn spawn_from_config(
#[cfg(feature = "plugin")] zenoh_session: Arc<Session>,
configuration: ZenohFlowConfiguration,
) -> Result<Self> {
#[cfg(not(feature = "plugin"))]
let zenoh_config = match configuration.zenoh {
ZenohConfiguration::File(path) => {
zenoh::prelude::Config::from_file(path).map_err(|e| anyhow!("{e:?}"))
}
ZenohConfiguration::Configuration(config) => Ok(config),
}?;

#[cfg(not(feature = "plugin"))]
let zenoh_session = zenoh::open(zenoh_config)
.res()
.await
.map(|session| session.into_arc())
.map_err(|e| anyhow!("{e:?}"))?;

let extensions = if let Some(extensions) = configuration.extensions {
match extensions {
ExtensionsConfiguration::File(path) => {
try_parse_from_file::<Extensions>(path, Vars::default()).map(|(ext, _)| ext)
}
ExtensionsConfiguration::Extensions(extensions) => Ok(extensions),
}?
} else {
Extensions::default()
};

let runtime = Runtime::builder(configuration.name)
.add_extensions(extensions)?
.session(zenoh_session)
.build()
.await?;

Daemon::spawn(runtime).await
}

/// Starts the Zenoh-Flow daemon.
///
/// - creates a Zenoh-Flow runtime,
/// - spawns 2 queryables to serve external requests on the runtime:
/// 1. `zenoh-flow/<uuid>/runtime`: for everything that relates to the runtime.
/// 2. `zenoh-flow/<uuid>/instances`: for everything that relates to the data flow instances.
pub async fn spawn(runtime: Runtime) -> Result<Self> {
// Channels to gracefully stop the Zenoh-Flow daemon:
// - `abort_?x` to tell the queryables that they have to stop,
// - `abort_ack_?x` for the queryables to inform the runtime that they did stop.
let (abort_tx, abort_rx) = flume::bounded::<()>(NUMBER_QUERYABLES);
let (abort_ack_tx, abort_ack_rx) = flume::bounded::<()>(NUMBER_QUERYABLES);

let runtime = Arc::new(runtime);

let session = runtime.session();
let abort = abort_rx.clone();
let abort_ack = abort_ack_tx.clone();

if let Err(e) =
runtime::spawn_runtime_queryable(session.clone(), runtime.clone(), abort, abort_ack)
.await
{
tracing::error!(
"The Zenoh-Flow daemon encountered a fatal error:\n{:?}\nAborting",
e
);
// TODO: Clean everything up before aborting.
}

if let Err(e) =
instances::spawn_instances_queryable(session, runtime.clone(), abort_rx, abort_ack_tx)
.await
{
tracing::error!(
"The Zenoh-Flow daemon encountered a fatal error:\n{:?}\nAborting",
e
);
// TODO: Clean everything up before aborting.
}

Ok(Daemon {
abort_tx,
abort_ack_rx,
})
}

/// Gracefully stops the Zenoh-Flow daemon.
///
/// This method will first stop the queryables this daemon declared (to not process new requests) and then stop all
/// the data flow instances that are running.
pub async fn stop(&self) {
for iteration in 0..NUMBER_QUERYABLES {
tracing::trace!(
"Sending abort signal to queryable ({}/{})",
iteration + 1,
NUMBER_QUERYABLES
);
self.abort_tx.send_async(()).await.unwrap_or_else(|e| {
tracing::error!(
"Failed to send abort signal to queryable ({}/{}): {:?}",
iteration + 1,
NUMBER_QUERYABLES,
e
);
});
}

// TODO: Abort all the operations on the runtime.
// self._runtime.abort();

// TODO Introduce a timer: if, for whatever reason, a queryable fails to send an acknowledgment we should not
// block the stopping procedure.
//
// Maybe wait for 60 seconds maximum?
for iteration in 0..NUMBER_QUERYABLES {
self.abort_ack_rx.recv_async().await.unwrap_or_else(|e| {
tracing::error!(
"Failed to receive abort acknowledgment ({}/{}): {:?}",
iteration + 1,
NUMBER_QUERYABLES,
e
);
});
tracing::trace!(
"Received abort acknowledgment {}/{}",
iteration + 1,
NUMBER_QUERYABLES
);
}
}
}

/// Validate a query and try to deserialize into an instance of `T`.
///
/// This function checks that the query is correct:
/// - it has a payload,
/// - the encoding is "correct",
/// - the payload can be deserialized into an instance of `T`.
///
/// If any check fails, an error message is logged and the query is dropped.
///
/// After these checks, the method `process` is called on the variant of `InstancesQuery`.
pub(crate) async fn validate_query<T: for<'a> Deserialize<'a>>(query: &Query) -> Result<T> {
let value = match query.value() {
Some(value) => value,
None => {
bail!("Received empty payload");
}
};

if ![
Encoding::APP_OCTET_STREAM,
Encoding::APP_JSON,
Encoding::TEXT_JSON,
]
.contains(&value.encoding)
{
bail!("Encoding < {} > is not supported", value.encoding);
}

serde_json::from_slice::<T>(&value.payload.contiguous()).map_err(|e| anyhow!("{:?}", e))
}
3 changes: 2 additions & 1 deletion zenoh-flow-daemon/src/instances/abort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use crate::{selectors, InstancesQuery, Origin};
use super::{InstancesQuery, Origin};
use crate::selectors;

use std::sync::Arc;

Expand Down
4 changes: 2 additions & 2 deletions zenoh-flow-daemon/src/instances/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use super::InstancesQuery;
use crate::{selectors, Origin};
use super::{InstancesQuery, Origin};
use crate::selectors;

use std::sync::Arc;

Expand Down
3 changes: 2 additions & 1 deletion zenoh-flow-daemon/src/instances/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
//

mod queries;
use crate::{selectors, validate_query};
use crate::daemon::validate_query;
use crate::selectors;

pub use self::queries::InstancesQuery;

Expand Down
2 changes: 1 addition & 1 deletion zenoh-flow-daemon/src/instances/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//

use super::{abort, create, delete, start};
use crate::Origin;
use crate::instances::Origin;

use std::{fmt::Debug, sync::Arc};

Expand Down
4 changes: 2 additions & 2 deletions zenoh-flow-daemon/src/instances/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use super::abort;
use crate::{selectors, InstancesQuery, Origin};
use super::{abort, InstancesQuery, Origin};
use crate::selectors;

use std::sync::Arc;

Expand Down
Loading

0 comments on commit f0a18d9

Please sign in to comment.