From 42d67d04773257e4f1437b0f84e222844df4ab11 Mon Sep 17 00:00:00 2001 From: J-Loudet Date: Thu, 14 Mar 2024 18:22:25 +0100 Subject: [PATCH] docs(runtime): improve overall documentation (#219) Signed-off-by: Julien Loudet --- zenoh-flow-runtime/src/instance.rs | 75 ++++++++++- zenoh-flow-runtime/src/lib.rs | 11 ++ zenoh-flow-runtime/src/loader/extensions.rs | 117 +++++++++++++++++- zenoh-flow-runtime/src/loader/mod.rs | 68 +++++++--- .../src/runners/builtin/zenoh/source.rs | 2 +- zenoh-flow-runtime/src/runtime/builder.rs | 95 +++++++++++++- zenoh-flow-runtime/src/runtime/load.rs | 88 +++++++++++-- zenoh-flow-runtime/src/runtime/mod.rs | 71 ++++++++--- 8 files changed, 480 insertions(+), 47 deletions(-) diff --git a/zenoh-flow-runtime/src/instance.rs b/zenoh-flow-runtime/src/instance.rs index 14199536..3f990f00 100644 --- a/zenoh-flow-runtime/src/instance.rs +++ b/zenoh-flow-runtime/src/instance.rs @@ -13,24 +13,64 @@ // use crate::runners::Runner; -use serde::{Deserialize, Serialize}; + use std::{collections::HashMap, fmt::Display, ops::Deref}; + +use serde::{Deserialize, Serialize}; use uhlc::{Timestamp, HLC}; use zenoh_flow_commons::{NodeId, Result, RuntimeId}; use zenoh_flow_records::DataFlowRecord; +/// A `DataFlowInstance` keeps track of the parts of a data flow managed by the Zenoh-Flow runtime. +/// +/// A `DataFlowInstance` structure is thus *local* to a Zenoh-Flow runtime. For a data flow that spawns on multiple +/// runtimes, there will be one such structure at each runtime. +/// +/// All instances will share the same [record](DataFlowRecord) but their internal state will differ. pub struct DataFlowInstance { pub(crate) state: InstanceState, pub(crate) record: DataFlowRecord, pub(crate) runners: HashMap, } +/// The different states of a [DataFlowInstance]. +/// +/// Note that *a state is tied to a Zenoh-Flow [runtime]*: if a data flow is distributed across multiple Zenoh-Flow +/// runtimes, their respective state for the same instance could be different (but should eventually converge). +/// +/// [runtime]: crate::Runtime #[derive(Clone, Deserialize, Serialize, Debug)] pub enum InstanceState { + /// A [runtime] listing a [DataFlowInstance] in the `Creating` state is in the process of loading all the nodes it + /// manages. + /// + /// [runtime]: crate::Runtime Creating(Timestamp), + /// A [runtime] listing a [DataFlowInstance] in the `Loaded` state successfully instantiated all the nodes it manages + /// and is ready to start them. + /// + /// A `Loaded` data flow can be started or deleted. + /// + /// [runtime]: crate::Runtime Loaded(Timestamp), + /// A [runtime] listing a [DataFlowInstance] in the `Running` state has (re)started all the nodes it manages. + /// + /// A `Running` data flow can be aborted or deleted. + /// + /// [runtime]: crate::Runtime Running(Timestamp), + /// A [runtime] listing a [DataFlowInstance] in the `Aborted` state has abruptly stopped all the nodes it manages. + /// + /// An `Aborted` data flow can be restarted or deleted. + /// + /// [runtime]: crate::Runtime Aborted(Timestamp), + /// A [runtime] listing a [DataFlowInstance] in the `Failed` state failed to load at least one of the nodes of this + /// instance it manages. + /// + /// A data flow in the `Failed` state can only be deleted. + /// + /// [runtime]: crate::Runtime Failed((Timestamp, String)), } @@ -48,10 +88,22 @@ impl Display for InstanceState { } } +/// The `InstanceStatus` provides information about the data flow instance. +/// +/// It details: +/// - from which runtime this information comes from (through its [identifier](RuntimeId)), +/// - the [state](InstanceState) of the data flow instance, +/// - the list of nodes (through their [identifier](NodeId)) the runtime manages --- and thus for which the state +/// applies. +/// +/// This information is what is displayed by the `zfctl` tool when requesting the status of a data flow instance. #[derive(Deserialize, Serialize, Debug)] pub struct InstanceStatus { + /// The identifier of the [runtime](crate::Runtime) this information comes from. pub runtime_id: RuntimeId, + /// The state of the data flow instance --- on this runtime. pub state: InstanceState, + /// The nodes managed by this runtime, for which the state applies. pub nodes: Vec, } @@ -64,7 +116,8 @@ impl Deref for DataFlowInstance { } impl DataFlowInstance { - pub fn new(record: DataFlowRecord, hlc: &HLC) -> Self { + /// Creates a new `DataFlowInstance`, setting its state to [Creating](InstanceState::Creating). + pub(crate) fn new(record: DataFlowRecord, hlc: &HLC) -> Self { Self { state: InstanceState::Creating(hlc.new_timestamp()), record, @@ -72,6 +125,16 @@ impl DataFlowInstance { } } + /// (re-)Starts the `DataFlowInstance`. + /// + /// The [hlc](HLC) is required to keep track of when this call was made. + /// + /// # Errors + /// + /// This method can fail when attempting to re-start: when re-starting a data flow, the method + /// [on_resume] is called for each node and is faillible. + /// + /// [on_resume]: zenoh_flow_nodes::prelude::Node::on_resume() pub async fn start(&mut self, hlc: &HLC) -> Result<()> { for (node_id, runner) in self.runners.iter_mut() { runner.start().await?; @@ -82,6 +145,9 @@ impl DataFlowInstance { Ok(()) } + /// Aborts the `DataFlowInstance`. + /// + /// The [hlc](HLC) is required to keep track of when this call was made. pub async fn abort(&mut self, hlc: &HLC) { for (node_id, runner) in self.runners.iter_mut() { runner.abort().await; @@ -91,10 +157,15 @@ impl DataFlowInstance { self.state = InstanceState::Aborted(hlc.new_timestamp()); } + /// Returns the [state](InstanceState) of this `DataFlowInstance`. pub fn state(&self) -> &InstanceState { &self.state } + /// Returns the [status](InstanceStatus) of this `DataFlowInstance`. + /// + /// This structure was intended as a way to retrieve and display information about the instance. This is what the + /// `zfctl` tool leverages for its `instance status` command. pub fn status(&self, runtime_id: &RuntimeId) -> InstanceStatus { InstanceStatus { runtime_id: runtime_id.clone(), diff --git a/zenoh-flow-runtime/src/lib.rs b/zenoh-flow-runtime/src/lib.rs index 490649d2..8c04fd15 100644 --- a/zenoh-flow-runtime/src/lib.rs +++ b/zenoh-flow-runtime/src/lib.rs @@ -12,6 +12,16 @@ // ZettaScale Zenoh Team, // +//! This crate exposes the structures driving the execution of a data flow: the [Runtime] and the [DataFlowInstance]. +//! +//! If the feature `zenoh` is enabled (it is by default), this crate additionally re-exports the structures from +//! [Zenoh](zenoh) that allow opening a [Session](zenoh::Session) *asynchronously*. +//! +//! Users interested in exposing a Zenoh-Flow runtime should find everything in the [Runtime] and [RuntimeBuilder]. +//! +//! Users interested in fetching the state of a data flow instance should look into the [DataFlowInstance], +//! [InstanceState] and [InstanceStatus] structures. These structures are leveraged by the `zfctl` command line tool. + mod instance; pub use instance::{DataFlowInstance, InstanceState, InstanceStatus}; @@ -26,6 +36,7 @@ mod runners; mod runtime; pub use runtime::{DataFlowErr, Runtime, RuntimeBuilder}; +/// A re-export of the Zenoh structures needed to open a [Session](zenoh::Session) asynchronously. #[cfg(feature = "zenoh")] pub mod zenoh { pub use zenoh::config::{client, empty, peer}; diff --git a/zenoh-flow-runtime/src/loader/extensions.rs b/zenoh-flow-runtime/src/loader/extensions.rs index c8e09c42..6cdd2b8f 100644 --- a/zenoh-flow-runtime/src/loader/extensions.rs +++ b/zenoh-flow-runtime/src/loader/extensions.rs @@ -27,7 +27,28 @@ use serde::{Deserialize, Deserializer}; use zenoh_flow_commons::Result; use zenoh_flow_nodes::{OperatorFn, SinkFn, SourceFn}; -// Convenient shortcut. +/// A convenient wrapper for a set of [Extension]. +/// +/// The main purpose of this structure is to facilitate parsing. +/// +/// # Example configuration +/// +/// ``` +/// # use zenoh_flow_runtime::Extensions; +/// # let yaml = r#" +/// - file_extension: py +/// libraries: +/// source: /home/zenoh-flow/extension/libpy_source.so +/// operator: /home/zenoh-flow/extension/libpy_operator.so +/// sink: /home/zenoh-flow/extension/libpy_sink.so +/// +/// - file_extension: js +/// libraries: +/// source: /home/zenoh-flow/extension/libwasm_source.so +/// operator: /home/zenoh-flow/extension/libwasm_operator.so +/// sink: /home/zenoh-flow/extension/libwasm_sink.so +/// # "#; +/// # serde_yaml::from_str::(yaml).unwrap(); #[derive(Default, Debug, Clone, Deserialize, PartialEq, Eq)] #[repr(transparent)] pub struct Extensions( @@ -79,7 +100,7 @@ impl Extensions { /// This method will return an error if any of the library: /// - does not expose the correct symbol (see these macros: [1], [2], [3]), /// - was not compiled with the same Rust version, - /// - was not using the same Zenoh-Flow version as this Zenoh-Flow [runtime](crate::Runtime). + /// - was not using the same version of Zenoh-Flow as this [runtime](crate::Runtime). /// /// [1]: zenoh_flow_nodes::prelude::export_source /// [2]: zenoh_flow_nodes::prelude::export_operator @@ -104,6 +125,38 @@ impl Extensions { } } +/// An `Extension` associates a file extension (e.g. `.py`) to a set of shared libraries. +/// +/// This details how a Zenoh-Flow runtime should load nodes that have the [url](url::Url) of their implementation with +/// this extension. +/// +/// Zenoh-Flow only supports node implementation in the form of [shared libraries]. To support additional implementation +/// --- for instance [Python scripts] --- a Zenoh-Flow runtime needs to be informed on (i) which shared libraries it +/// should load and (ii) how it should make these shared libraries "load" the node implementation. +/// +/// To support an extension on a Zenoh-Flow runtime, one can either detail them in the configuration file of the runtime +/// or through the dedicated [method](crate::RuntimeBuilder::add_extension()). +/// +/// # Example configuration +/// +/// (Yaml) +/// +/// ``` +/// # use zenoh_flow_runtime::Extension; +/// # let yaml = r#" +/// file_extension: py +/// libraries: +/// source: /home/zenoh-flow/libpy_source.so +/// operator: /home/zenoh-flow/libpy_operator.so +/// sink: /home/zenoh-flow/libpy_sink.so +/// # "#; +/// # serde_yaml::from_str::(yaml).unwrap(); +/// ``` +/// +/// [shared libraries]: std::env::consts::DLL_EXTENSION +/// [Python scripts]: https://github.com/eclipse-zenoh/zenoh-flow-python +// NOTE: We separate the libraries in its own dedicated structure to have that same textual representation (YAML/JSON). +// There is no real need to do so. #[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)] pub struct Extension { pub(crate) file_extension: Arc, @@ -112,22 +165,82 @@ pub struct Extension { impl Extension { /// Returns the file extension associated with this extension. + /// + /// # Example + /// + /// ``` + /// # use zenoh_flow_runtime::Extension; + /// # let yaml = r#" + /// # file_extension: py + /// # libraries: + /// # source: /home/zenoh-flow/libpy_source.so + /// # operator: /home/zenoh-flow/libpy_operator.so + /// # sink: /home/zenoh-flow/libpy_sink.so + /// # "#; + /// # let extension = serde_yaml::from_str::(yaml).unwrap(); + /// assert_eq!(extension.file_extension(), "py"); + /// ``` pub fn file_extension(&self) -> &str { &self.file_extension } /// Returns the [path](PathBuf) of the shared library responsible for loading Source nodes for this file extension. + /// + /// # Example + /// + /// ``` + /// # use zenoh_flow_runtime::Extension; + /// # let yaml = r#" + /// # file_extension: py + /// # libraries: + /// # source: /home/zenoh-flow/libpy_source.so + /// # operator: /home/zenoh-flow/libpy_operator.so + /// # sink: /home/zenoh-flow/libpy_sink.so + /// # "#; + /// # let extension = serde_yaml::from_str::(yaml).unwrap(); + /// assert_eq!(extension.source().to_str(), Some("/home/zenoh-flow/libpy_source.so")); + /// ``` pub fn source(&self) -> &PathBuf { &self.libraries.source } /// Returns the [path](PathBuf) of the shared library responsible for loading Operator nodes for this file /// extension. + /// + /// # Example + /// + /// ``` + /// # use zenoh_flow_runtime::Extension; + /// # let yaml = r#" + /// # file_extension: py + /// # libraries: + /// # source: /home/zenoh-flow/libpy_source.so + /// # operator: /home/zenoh-flow/libpy_operator.so + /// # sink: /home/zenoh-flow/libpy_sink.so + /// # "#; + /// # let extension = serde_yaml::from_str::(yaml).unwrap(); + /// assert_eq!(extension.operator().to_str(), Some("/home/zenoh-flow/libpy_operator.so")); + /// ``` pub fn operator(&self) -> &PathBuf { &self.libraries.operator } /// Returns the [path](PathBuf) of the shared library responsible for loading Sink nodes for this file extension. + /// + /// # Example + /// + /// ``` + /// # use zenoh_flow_runtime::Extension; + /// # let yaml = r#" + /// # file_extension: py + /// # libraries: + /// # source: /home/zenoh-flow/libpy_source.so + /// # operator: /home/zenoh-flow/libpy_operator.so + /// # sink: /home/zenoh-flow/libpy_sink.so + /// # "#; + /// # let extension = serde_yaml::from_str::(yaml).unwrap(); + /// assert_eq!(extension.sink().to_str(), Some("/home/zenoh-flow/libpy_sink.so")); + /// ``` pub fn sink(&self) -> &PathBuf { &self.libraries.sink } diff --git a/zenoh-flow-runtime/src/loader/mod.rs b/zenoh-flow-runtime/src/loader/mod.rs index c300fa91..74feddec 100644 --- a/zenoh-flow-runtime/src/loader/mod.rs +++ b/zenoh-flow-runtime/src/loader/mod.rs @@ -117,15 +117,28 @@ pub(crate) fn try_get_constructor( } /// The dynamic library loader. -/// Before loading it verifies if the versions are compatible -/// and if the symbols are presents. -/// It loads the files in different way depending on the operating system. -/// In particular the scope of the symbols is different between Unix and -/// Windows. -/// In Unix system the symbols are loaded with the flags: /// -/// - `RTLD_NOW` load all the symbols when loading the library. -/// - `RTLD_LOCAL` keep all the symbols local. +/// This structure is responsible for: +/// 1. loading the shared libraries containing the implementation of the nodes, +/// 2. keeping track of these libraries to avoid loading several times the same one, +/// 3. leveraging the [Extension]s to load "non-standard" node implementation. +/// +/// Note that "non-standard" libraries are libraries that have an extension that is different than +/// [DLL_EXTENSION](std::env::consts::DLL_EXTENSION) --- e.g. different than `.so` on Linux-based systems. +/// +/// Before calling the constructor of any node, the loader will perform the following checks: +/// - it will check that the node implementation was compiled with the same version of the Rust compiler than the +/// Zenoh-Flow runtime it belongs to, +/// - it will check that the node implementation was using the same version of the Zenoh-Flow library than the +/// Zenoh-Flow runtime it belongs to. +/// +/// To do these checks, the loader is expecting to find specific symbols (different for each type of node). These +/// symbols are automatically exported via the respective procedural macros: [export_source], [export_operator], +/// [export_sink]. +/// +/// [export_source]: zenoh_flow_nodes::prelude::export_source +/// [export_operator]: zenoh_flow_nodes::prelude::export_operator +/// [export_sink]: zenoh_flow_nodes::prelude::export_sink #[derive(Default)] pub(crate) struct Loader { pub(crate) extensions: Extensions, @@ -166,14 +179,15 @@ impl Loader { operator: impl Into, sink: impl Into, ) -> Result> { - self.extensions.try_add_extension( - file_extension, - source.into(), - operator.into(), - sink.into(), - ) + self.extensions + .try_add_extension(file_extension, source, operator, sink) } + /// This method will free the shared libraries that are no longer being used. + /// + /// Every time a data flow is created, each node will receive an `Arc` of the shared library it + /// depends. Once a data flow is deleted, the `strong_count` of all the libraries used is diminished by one and this + /// method is called after to see if any count has reached 1. If so, that library can be safely dropped. pub(crate) fn remove_unused_libraries(&mut self) { let number_libraries = self.libraries.len(); self.libraries @@ -184,6 +198,21 @@ impl Loader { ); } + /// Given a [Url] and a [NodeSymbol], attempt to load the node constructor. + /// + /// This method will first look into its cache of shared libraries and check if it does not already know of a + /// library with the same Url. + /// + /// If it does then it will reuse this library. + /// + /// If not, it will attempt to load it and check its compatibility. + /// + /// # Errors + /// + /// This method can fail for the following reasons: + /// - the provided Url specifies a scheme that we do not support (for now only "file://" is supported), + /// - we failed to load the library from the provided Url (e.g. file not found), + /// - the library does not expose the correct symbol or is not compatible with this Zenoh-Flow runtime. pub(crate) fn try_load_constructor( &mut self, url: &Url, @@ -210,7 +239,16 @@ impl Loader { Ok((constructor, library)) } - /// TODO@J-Loudet + /// Given the string representation of a path, attempts to load a library. + /// + /// This method will look at the file extension to determine if it should leverage the [Extensions] or not. + /// + /// # Errors + /// + /// This method can fail if: + /// - the extension of the path is not supported (i.e. not [DLL_EXTENSION] and not in the [Extensions]), + /// - there is no file in the provided path, + /// - the libloading crate failed to create a `Library` using the provided path. pub(crate) fn try_load_library_from_uri( &self, path: &str, diff --git a/zenoh-flow-runtime/src/runners/builtin/zenoh/source.rs b/zenoh-flow-runtime/src/runners/builtin/zenoh/source.rs index d7c2c8bb..b93043a1 100644 --- a/zenoh-flow-runtime/src/runners/builtin/zenoh/source.rs +++ b/zenoh-flow-runtime/src/runners/builtin/zenoh/source.rs @@ -72,7 +72,7 @@ impl<'a> ZenohSource<'a> { futs: Arc::new(Mutex::new(Vec::with_capacity(key_exprs.len()))), }; - // NOTE: Calling this function avoids repeating code initializing the `futs` and `subscribers`. + // NOTE: Calling this function avoids repeating code initialising the `futs` and `subscribers`. zenoh_source.on_resume().await?; Ok(zenoh_source) diff --git a/zenoh-flow-runtime/src/runtime/builder.rs b/zenoh-flow-runtime/src/runtime/builder.rs index 2d64e036..87ffba89 100644 --- a/zenoh-flow-runtime/src/runtime/builder.rs +++ b/zenoh-flow-runtime/src/runtime/builder.rs @@ -28,6 +28,7 @@ use zenoh_flow_commons::{Result, RuntimeId}; /// /// Most of the internals of the Runtime can left to their default values. Leveraging a builder pattern thus simplifies /// the creation of a Runtime. +#[must_use = "The Runtime will not be generated unless you `build()` it"] pub struct RuntimeBuilder { name: Arc, hlc: Option, @@ -63,6 +64,16 @@ impl RuntimeBuilder { /// /// ⚠️ *If the runtime identifier is set first and a Session is provided after, the runtime identifier will be /// overwritten by the identifier of the Zenoh session*. + /// + /// # Example + /// + /// ```no_run + /// use zenoh_flow_runtime::Runtime; + /// use zenoh_flow_commons::RuntimeId; + /// + /// let builder = Runtime::builder("demo") + /// .runtime_id(RuntimeId::rand()); + /// ``` pub fn runtime_id(mut self, runtime_id: impl Into) -> Result { self.runtime_id = Some(runtime_id.into()); @@ -87,6 +98,23 @@ impl RuntimeBuilder { /// # Runtime identifier /// /// If a [Session] is provided, the Zenoh-Flow runtime will re-use the identifier of the Session as its identifier. + /// + /// # Example + /// + /// ```no_run + /// use zenoh_flow_runtime::Runtime; + /// use zenoh_flow_runtime::{zenoh, zenoh::AsyncResolve}; + /// # async_std::task::block_on(async { + /// + /// let zenoh_session = zenoh::open(zenoh::peer()) + /// .res_async() + /// .await + /// .expect("Failed to open Session") + /// .into_arc(); + /// let builder = Runtime::builder("demo") + /// .session(zenoh_session); + /// # }); + /// ``` #[cfg(feature = "zenoh")] pub fn session(mut self, session: Arc) -> Self { self.session = Some(session); @@ -94,6 +122,16 @@ impl RuntimeBuilder { } /// Forces the hybrid logical clock the Runtime should use. + /// + /// # Example + /// + /// ```no_run + /// use zenoh_flow_runtime::Runtime; + /// use uhlc::HLC; + /// + /// let builder = Runtime::builder("demo") + /// .hlc(HLC::default()); + /// ``` pub fn hlc(mut self, hlc: HLC) -> Self { self.hlc = Some(hlc); self @@ -106,10 +144,27 @@ impl RuntimeBuilder { /// /// # Errors /// - /// This method will fail if any of the extension is not valid. See [here](Extensions::try_add_extension) for a + /// This method will fail if any of the extension is not valid. See [here](RuntimeBuilder::add_extension) for a /// complete list of error cases. /// /// Note that the extensions are added *after* checking them all. + /// + /// # Example + /// + /// ```no_run + /// use zenoh_flow_runtime::{Extensions, Runtime}; + /// use zenoh_flow_commons::{try_parse_from_file, Vars}; + /// + /// let (extensions, _) = try_parse_from_file::( + /// "/home/zenoh-flow/extensions.ext", + /// Vars::default() + /// ) + /// .expect("Failed to parse Extensions"); + /// + /// let builder = Runtime::builder("demo") + /// .add_extensions(extensions) + /// .expect("Failed to add set of extensions"); + /// ``` pub fn add_extensions(mut self, extensions: Extensions) -> Result { for extension in extensions.values() { extension.libraries.validate()?; @@ -120,15 +175,35 @@ impl RuntimeBuilder { Ok(self) } - /// Attempts to add a single [Extension] to the list of extensions supported by this Runtime. + /// Attempts to add a single [Extension](crate::Extension) to the list of extensions supported by this Runtime. /// /// If a previous extension was already declared for the same file extension, the newly added extension will /// override the previous value. /// /// # Errors /// - /// This method will fail if the extension is not valid. See [here](Extensions::try_add_extension) for a complete - /// list of error cases. + /// This method will return an error if any of the library: + /// - does not expose the correct symbol (see these macros: [1], [2], [3]), + /// - was not compiled with the same Rust version, + /// - was not using the same version of Zenoh-Flow as this [runtime](crate::Runtime). + /// + /// # Example + /// + /// ```no_run + /// use zenoh_flow_runtime::Runtime; + /// let builder = Runtime::builder("demo") + /// .add_extension( + /// "py", + /// "/home/zenoh-flow/libpy_source.so", + /// "/home/zenoh-flow/libpy_operator.so", + /// "/home/zenoh-flow/libpy_sink.so", + /// ) + /// .expect("Failed to add 'py' extension"); + /// ``` + /// + /// [1]: zenoh_flow_nodes::prelude::export_source + /// [2]: zenoh_flow_nodes::prelude::export_operator + /// [3]: zenoh_flow_nodes::prelude::export_sink pub fn add_extension( mut self, file_extension: impl Into, @@ -147,6 +222,18 @@ impl RuntimeBuilder { /// /// This method can fail if the `zenoh` feature is enabled (it is by default), no [Session] was provided to the /// builder and the creation of a Session failed. + /// + /// # Example + /// + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh_flow_runtime::Runtime; + /// let runtime = Runtime::builder("demo") + /// .build() + /// .await + /// .expect("Failed to build Zenoh-Flow runtime"); + /// # }); + /// ``` pub async fn build(self) -> Result { #[cfg(feature = "zenoh")] let session = match self.session { diff --git a/zenoh-flow-runtime/src/runtime/load.rs b/zenoh-flow-runtime/src/runtime/load.rs index 130a6568..1f47868c 100644 --- a/zenoh-flow-runtime/src/runtime/load.rs +++ b/zenoh-flow-runtime/src/runtime/load.rs @@ -46,11 +46,25 @@ use zenoh_flow_records::DataFlowRecord; pub(crate) type Channels = HashMap; impl Runtime { - /// TODO@J-Loudet + /// Attempts to load the provided [DataFlowRecord], creating a new [DataFlowInstance] in this `Runtime`. + /// + /// Upon creation the [DataFlowInstance] will be put in the [Creating](InstanceState::Creating) state. Once all the + /// nodes managed by this Runtime have been successfully loaded, the instance will be put in the + /// [Loaded](InstanceState::Loaded) state. + /// + /// # Errors + /// + /// This method can fail for the following reasons: + /// - the data flow was not valid; more specifically, at least one link was connecting two nodes that are running on + /// different runtimes (the current one and another), + /// - the runtime failed to load: an operator, a source, a sink, + /// - the runtime encountered an internal error: + /// - a channel was not created for a node, + /// - a Zenoh built-in source failed to declare its subscriber. pub async fn try_load_data_flow(&self, data_flow: DataFlowRecord) -> Result<()> { // ----------------------------------- // The following code tries to do two things: - // 1. minimizing the amount of time `self.flows` is locked, + // 1. minimising the amount of time `self.flows` is locked, // 2. ensuring that which ever thread accesses a data flow from `self.flows` will access it **after** it was // fully loaded. // @@ -169,7 +183,7 @@ impl Runtime { /// # Errors /// /// The only scenario in which this method fails is if we did not correctly processed the data flow descriptor and - /// ended up having a link with nodes on two different runtime. + /// ended up having a link with nodes on two different runtimes. fn create_channels(&self, record: &DataFlowRecord) -> Result { let nodes_runtime = match record.mapping().get(&self.runtime_id) { Some(nodes) => nodes, @@ -232,7 +246,17 @@ The problematic link is: Ok(channels) } - /// TODO@J-Loudet + /// Attempts to load the Operators from the provided [DataFlowRecord], returning a list of [Runners]. + /// + /// This method will first filter the Operators from the [DataFlowRecord], keeping only those assigned to the + /// current runtime. + /// + /// # Errors + /// + /// This method can fail for the following reasons: + /// - a channel was not created for one of the Operators managed by this runtime, + /// - the call to `try_load_constructor` failed, + /// - the call to the actual constructor failed. async fn try_load_operators( &self, record: &DataFlowRecord, @@ -277,7 +301,18 @@ The channels for the Inputs and Outputs of Operator < {} > were not created. Ok(runners) } - /// TODO@J-Loudet + /// Attempts to load the Sources from the provided [DataFlowRecord], returning a list of [Runners]. + /// + /// This method will first filter the Sources from the [DataFlowRecord], keeping only those assigned to the + /// current runtime. + /// + /// # Errors + /// + /// This method can fail for the following reasons: + /// - a channel was not created for one of the Sources managed by this runtime, + /// - the call to create a Zenoh built-in Source failed, + /// - the call to `try_load_constructor` failed, + /// - the call to the actual constructor failed. async fn try_load_sources( &self, record: &DataFlowRecord, @@ -338,7 +373,18 @@ Maybe change the features in the Cargo.toml? Ok(runners) } - /// TODO@J-Loudet + /// Attempts to load the Sinks from the provided [DataFlowRecord], returning a list of [Runners]. + /// + /// This method will first filter the Sinks from the [DataFlowRecord], keeping only those assigned to the + /// current runtime. + /// + /// # Errors + /// + /// This method can fail for the following reasons: + /// - a channel was not created for one of the Sinks managed by this runtime, + /// - the call to create a Zenoh built-in Sink failed, + /// - the call to `try_load_constructor` failed, + /// - the call to the actual constructor failed. async fn try_load_sinks( &self, record: &DataFlowRecord, @@ -405,7 +451,16 @@ Maybe change the features in the Cargo.toml? Ok(runners) } - /// TODO@J-Loudet + /// Attempts to load the Zenoh Receivers from the provided [DataFlowRecord], returning a list of [Runners]. + /// + /// This method will first filter the Receivers from the [DataFlowRecord], keeping only those assigned to the + /// current runtime. + /// + /// # Errors + /// + /// This method can fail for the following reasons: + /// - a channel was not created for one of the Receivers managed by this runtime, + /// - the creation of a Receiver failed. #[cfg(feature = "zenoh")] async fn try_load_receivers( &self, @@ -446,7 +501,16 @@ The channels for the Outputs of Connector Receiver < {} > were not created. Ok(runners) } - /// TODO@J-Loudet + /// Attempts to load the Zenoh Senders from the provided [DataFlowRecord], returning a list of [Runners]. + /// + /// This method will first filter the Senders from the [DataFlowRecord], keeping only those assigned to the + /// current runtime. + /// + /// # Errors + /// + /// This method can fail for the following reasons: + /// - a channel was not created for one of the Senders managed by this runtime, + /// - the creation of a Receiver failed. #[cfg(feature = "zenoh")] fn try_load_senders( &self, @@ -491,6 +555,14 @@ The channels for the Inputs of Connector Sender < {} > were not created. Ok(runners) } + /// Attempts to load the constructor of the node implementation located at [Url]. + /// + /// This method is a convenience wrapper that automates locking and releasing the lock over the internal Loader + /// --- that actually loads the constructor. + /// + /// # Errors + /// + /// This method can fail if the Loader failed to load the constructor. async fn try_load_constructor( &self, url: &Url, diff --git a/zenoh-flow-runtime/src/runtime/mod.rs b/zenoh-flow-runtime/src/runtime/mod.rs index 46f174ba..4e8e0413 100644 --- a/zenoh-flow-runtime/src/runtime/mod.rs +++ b/zenoh-flow-runtime/src/runtime/mod.rs @@ -40,6 +40,14 @@ use zenoh_flow_commons::SharedMemoryConfiguration; use zenoh_flow_commons::{InstanceId, Result, RuntimeId}; use zenoh_flow_records::DataFlowRecord; +/// A Zenoh-Flow runtime manages a subset of the nodes of [DataFlowInstance]\(s\). +/// +/// In order to start a data flow, a Zenoh-Flow runtime should first be created and then tasked with loading and +/// starting the nodes *it is responsible for*. +/// +/// It is important to note here that, on its own, a Zenoh-Flow runtime will not contact other runtimes to coordinate +/// the start of a data flow. If a data flow spawns on multiple runtimes, one needs to start each separately. Otherwise, +/// the Zenoh-Flow daemon structure has been especially designed to address this use-case. pub struct Runtime { pub(crate) name: Arc, pub(crate) runtime_id: RuntimeId, @@ -52,10 +60,12 @@ pub struct Runtime { flows: RwLock>>>, } +/// This enumeration defines the errors one can face when trying to access a [DataFlowInstance]. #[derive(Error, Debug)] pub enum DataFlowErr { #[error("found no data flow with the provided id")] NotFound, + /// A [DataFlowInstance] in a failed state cannot be manipulated, only deleted. #[error("the data flow is in a failed state, unable to process the request")] FailedState, } @@ -73,6 +83,19 @@ impl Display for Runtime { } impl Runtime { + /// Create a new builder in order to construct a `Runtime`. + /// + /// # Example + /// + /// ```no_run + /// use zenoh_flow_runtime::Runtime; + /// # async_std::task::block_on(async { + /// let runtime = Runtime::builder("demo") + /// .build() + /// .await + /// .expect("Failed to build the Runtime"); + /// # }); + /// ``` pub fn builder(name: impl Into) -> RuntimeBuilder { RuntimeBuilder::new(name) } @@ -89,13 +112,12 @@ impl Runtime { self.name.clone() } + /// Returns a shared pointer over the [HLC] used by this Runtime. pub fn hlc(&self) -> Arc { self.hlc.clone() } - /// Returns information regarding the data flows that are running on this Zenoh-Flow runtime. - /// - /// For each instance of a data flow, returns its unique identifier, its name and its state. + /// Returns the [InstanceState] of the [DataFlowInstance]\(s\) managed by this Zenoh-Flow runtime. pub async fn instances_state(&self) -> HashMap, InstanceState)> { let flows = self.flows.read().await; let mut states = HashMap::with_capacity(flows.len()); @@ -110,13 +132,13 @@ impl Runtime { states } - /// Returns an atomically counted reference over the Zenoh session this Zenoh-Flow runtime leverages. + /// Returns a shared pointer over the Zenoh [session](Session) used by this Runtime. #[cfg(feature = "zenoh")] pub fn session(&self) -> Arc { self.session.clone() } - /// Returns the [DataFlowRecord] associated with the provided instance, *if that instance is not in a failed state*. + /// Returns the [DataFlowRecord] associated with the provided instance. /// /// # Errors /// @@ -140,14 +162,8 @@ impl Runtime { Err(DataFlowErr::NotFound) } - /// Returns the [status](InstanceStatus) of the provided data flow instance or `None` if this runtime does not + /// Returns the [status](InstanceStatus) of the provided data flow instance or [None] if this runtime does not /// manage this instance. - /// - /// The possible values are: - /// - [Loaded](InstanceStatus::Loaded) when all the nodes that this runtime manages have been successfully - /// loaded. In particular, this means that each node has successfully called its constructor. - /// - [Running](InstanceStatus::Running) when the nodes that this runtime manages are running. - /// - [Aborted](InstanceStatus::Aborted) when the nodes were previously running and their execution was aborted. pub async fn get_instance_status(&self, id: &InstanceId) -> Option { if let Some(instance) = self.flows.read().await.get(id) { return Some(instance.read().await.status(&self.runtime_id)); @@ -164,7 +180,7 @@ impl Runtime { /// - this runtime manages no data flow with the provided instance id, /// - the data flow is in a failed state. /// - /// Data Flow that are in a failed state cannot be started or aborted. They can provide their status or be deleted. + /// Data Flows that are in a failed state cannot be started or aborted. They can provide their status or be deleted. async fn try_get_instance( &self, id: &InstanceId, @@ -180,6 +196,18 @@ impl Runtime { Ok(instance) } + /// Attempts to (re-)start the [DataFlowInstance] identified by the provided `id`. + /// + /// Note that this method is idempotent: calling it on an already running data flow will do nothing. + /// + /// # Errors + /// + /// This method can fail for the following reason: + /// - no data flow with the provided id was found, + /// - the data flow is in a failed state, + /// - the data flow is restarted and the [on_resume] method of one of the nodes (managed by the runtime) failed. + /// + /// [on_resume]: zenoh_flow_nodes::prelude::Node::on_resume() #[tracing::instrument(name = "start", skip(self, id), fields(instance = %id))] pub async fn try_start_instance(&self, id: &InstanceId) -> Result<()> { let instance = self.try_get_instance(id).await?; @@ -192,9 +220,15 @@ impl Runtime { Ok(()) } - /// TODO@J-Loudet + /// Attempts to abort the [DataFlowInstance] identified by the provided `id`. + /// + /// Note that this method is idempotent: calling it on an already aborted data flow will do nothing. + /// + /// # Errors /// - /// - abort all nodes + /// This method can fail for the following reasons: + /// - no data flow with the provided id was found, + /// - the data flow is in a failed state. #[tracing::instrument(name = "abort", skip(self, id), fields(instance = %id))] pub async fn try_abort_instance(&self, id: &InstanceId) -> Result<()> { let instance = self.try_get_instance(id).await?; @@ -212,6 +246,13 @@ impl Runtime { Ok(()) } + /// Attempts to delete the [DataFlowInstance] identified by the provided `id`. + /// + /// # Errors + /// + /// This method can fail for the following reasons: + /// - no data flow with the provided id was found, + /// - another action is currently being performed on the data flow. #[tracing::instrument(name = "delete", skip(self, id), fields(instance = %id))] pub async fn try_delete_instance(&self, id: &InstanceId) -> Result<()> { let instance = {