diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f3447fb..269f49e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -83,13 +83,11 @@ jobs: run: cargo test --verbose env: CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse - ASYNC_STD_THREAD_COUNT: 4 - name: Run doctests run: cargo test --doc env: CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse - ASYNC_STD_THREAD_COUNT: 4 # NOTE: In GitHub repository settings, the "Require status checks to pass # before merging" branch protection rule ensures that commits are only merged diff --git a/Cargo.lock b/Cargo.lock index c8bf8d7..6a79c43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4051,6 +4051,39 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "test-case" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb2550dd13afcd286853192af8601920d959b14c401fcece38071d53bf0768a8" +dependencies = [ + "test-case-macros", +] + +[[package]] +name = "test-case-core" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adcb7fd841cd518e279be3d5a3eb0636409487998a4aff22f3de87b81e88384f" +dependencies = [ + "cfg-if 1.0.0", + "proc-macro2", + "quote", + "syn 2.0.72", +] + +[[package]] +name = "test-case-macros" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", + "test-case-core", +] + [[package]] name = "textwrap" version = "0.16.1" @@ -4266,9 +4299,9 @@ dependencies = [ "bytes", "libc", "mio", - "num_cpus", - "pin-project-lite 0.2.13", - "socket2 0.5.6", + "pin-project-lite 0.2.14", + "signal-hook-registry", + "socket2 0.5.7", "tokio-macros", "windows-sys 0.48.0", ] @@ -5157,11 +5190,11 @@ dependencies = [ name = "zenoh-bridge-ros1" version = "0.11.0-dev" dependencies = [ - "async-std", "clap", "ctrlc", "lazy_static", "serde_json", + "tokio", "tracing", "zenoh", "zenoh-plugin-rest", @@ -5482,7 +5515,6 @@ name = "zenoh-plugin-ros1" version = "0.11.0-dev" dependencies = [ "async-global-executor", - "async-std", "async-trait", "atoi", "ctrlc", @@ -5501,6 +5533,8 @@ dependencies = [ "serial_test", "strum", "strum_macros", + "test-case", + "tokio", "tracing", "xml-rpc", "zenoh", diff --git a/Cargo.toml b/Cargo.toml index 43cd5af..2230566 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,6 @@ categories = ["network-programming"] [workspace.dependencies] atoi = "2.0.0" -async-std = "=1.12.0" async-trait = "0.1" clap = "3.2.23" ctrlc = "3.2.5" @@ -49,8 +48,11 @@ flume = "0.11" hex = "0.4.3" xml-rpc = "0.0.12" rustc_version = "0.4" +test-case = { version = "3.3" } +tokio = { version = "1.35.1", features = ["process"] } tracing = "0.1" zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [ + "internal", "unstable", "plugins", ] } diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 0329cf6..b50efce 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -118,6 +118,18 @@ //// The string format is [0-9]+(ns|us|ms|[smhdwy]) //// // ros_master_polling_interval: "100ms", + + //// + //// work_thread_num: The number of worker thread in TOKIO runtime (default: 2) + //// The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime. + //// + // work_thread_num: 2, + + //// + //// max_block_thread_num: The number of blocking thread in TOKIO runtime (default: 50) + //// The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime. + //// + // max_block_thread_num: 50, }, //// diff --git a/zenoh-bridge-ros1/Cargo.toml b/zenoh-bridge-ros1/Cargo.toml index d34e06f..4ab9043 100644 --- a/zenoh-bridge-ros1/Cargo.toml +++ b/zenoh-bridge-ros1/Cargo.toml @@ -24,12 +24,12 @@ description = "Zenoh bridge for ROS1" publish = false [dependencies] -async-std = { workspace = true, features = ["unstable", "attributes"] } clap = { workspace = true } ctrlc = { workspace = true } tracing = { workspace = true } lazy_static = { workspace = true } serde_json = { workspace = true } +tokio = { workspace = true } zenoh = { workspace = true } zenoh-plugin-trait = { workspace = true } zenoh-plugin-ros1 = { workspace = true } diff --git a/zenoh-bridge-ros1/src/main.rs b/zenoh-bridge-ros1/src/main.rs index c77c30c..c7da7dd 100644 --- a/zenoh-bridge-ros1/src/main.rs +++ b/zenoh-bridge-ros1/src/main.rs @@ -13,8 +13,8 @@ // use std::str::FromStr; -use async_std::channel::unbounded; use clap::{App, Arg}; +use tokio::sync::mpsc::unbounded_channel; use zenoh::{ config::{Config, ZenohId}, internal::{plugins::PluginsManager, runtime::RuntimeBuilder}, @@ -187,6 +187,16 @@ Bridge polls ROS1 master to get information on local topics. This option is the Accepted value:' A string such as 100ms, 2s, 5m The string format is [0-9]+(ns|us|ms|[smhdwy])"# + )) + .arg(Arg::from_usage( +r#"--work_thread_num=[usize] \ +'The number of worker thread in TOKIO runtime (default: 2) +The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime.'"# + )) + .arg(Arg::from_usage( +r#"--max_block_thread_num=[usize] \ +'The number of blocking thread in TOKIO runtime (default: 50) +The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime.'"# )); let args = app.get_matches(); @@ -252,15 +262,11 @@ The string format is [0-9]+(ns|us|ms|[smhdwy])"# config } -#[async_std::main] +#[tokio::main] async fn main() { - let (sender, receiver) = unbounded(); - ctrlc::set_handler(move || { - sender - .send_blocking(()) - .expect("Error handling Ctrl+C signal") - }) - .expect("Error setting Ctrl+C handler"); + let (sender, mut receiver) = unbounded_channel(); + ctrlc::set_handler(move || sender.send(()).expect("Error handling Ctrl+C signal")) + .expect("Error setting Ctrl+C handler"); zenoh::init_log_from_env_or("z=info"); tracing::info!( diff --git a/zenoh-plugin-ros1/Cargo.toml b/zenoh-plugin-ros1/Cargo.toml index e06aff6..fc05c23 100644 --- a/zenoh-plugin-ros1/Cargo.toml +++ b/zenoh-plugin-ros1/Cargo.toml @@ -41,6 +41,8 @@ flume = { workspace = true } futures = { workspace = true } git-version = { workspace = true } lazy_static = { workspace = true } +test-case = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -63,10 +65,6 @@ ctrlc = { workspace = true } # TODO: https://zettascale.atlassian.net/browse/ZEN-291 # zenoh-plugin-ros1 = { path = ".", features = ["test"]} -[dependencies.async-std] -version = "=1.12.0" -features = ["unstable", "attributes"] - [build-dependencies] rustc_version = { workspace = true } diff --git a/zenoh-plugin-ros1/examples/ros1_pub.rs b/zenoh-plugin-ros1/examples/ros1_pub.rs index bcfc625..f89b3b9 100644 --- a/zenoh-plugin-ros1/examples/ros1_pub.rs +++ b/zenoh-plugin-ros1/examples/ros1_pub.rs @@ -17,7 +17,7 @@ use zenoh_plugin_ros1::ros_to_zenoh_bridge::{ environment::Environment, ros1_master_ctrl::Ros1MasterCtrl, Ros1ToZenohBridge, }; -#[async_std::main] +#[tokio::main] async fn main() { // initiate logging zenoh::try_init_log_from_env(); @@ -80,5 +80,5 @@ async fn main() { std::thread::sleep(core::time::Duration::from_secs(1)); } }; - async_std::task::spawn_blocking(working_loop).await; + tokio::task::spawn_blocking(working_loop).await.unwrap(); } diff --git a/zenoh-plugin-ros1/examples/ros1_service.rs b/zenoh-plugin-ros1/examples/ros1_service.rs index 4364d6f..e7b2633 100644 --- a/zenoh-plugin-ros1/examples/ros1_service.rs +++ b/zenoh-plugin-ros1/examples/ros1_service.rs @@ -16,7 +16,7 @@ use zenoh_plugin_ros1::ros_to_zenoh_bridge::{ environment::Environment, ros1_master_ctrl::Ros1MasterCtrl, Ros1ToZenohBridge, }; -#[async_std::main] +#[tokio::main] async fn main() { // initiate logging zenoh::try_init_log_from_env(); @@ -86,6 +86,6 @@ async fn main() { println!("Zenoh got error: {}", e); } } - async_std::task::sleep(core::time::Duration::from_secs(1)).await; + tokio::time::sleep(core::time::Duration::from_secs(1)).await; } } diff --git a/zenoh-plugin-ros1/examples/ros1_standalone_pub.rs b/zenoh-plugin-ros1/examples/ros1_standalone_pub.rs index 67bf2aa..8fc6fe8 100644 --- a/zenoh-plugin-ros1/examples/ros1_standalone_pub.rs +++ b/zenoh-plugin-ros1/examples/ros1_standalone_pub.rs @@ -12,17 +12,15 @@ // ZettaScale Zenoh Team, // -use async_std::channel::unbounded; +use tokio::sync::mpsc::unbounded_channel; use zenoh_plugin_ros1::ros_to_zenoh_bridge::environment::Environment; -#[async_std::main] +#[tokio::main] async fn main() { - let (sender, receiver) = unbounded(); + let (sender, mut receiver) = unbounded_channel(); ctrlc::set_handler(move || { tracing::info!("Catching Ctrl+C..."); - sender - .send_blocking(()) - .expect("Error handling Ctrl+C signal") + sender.send(()).expect("Error handling Ctrl+C signal") }) .expect("Error setting Ctrl+C handler"); @@ -59,5 +57,5 @@ async fn main() { } tracing::info!("Caught Ctrl+C, stopping..."); }; - async_std::task::spawn_blocking(working_loop).await; + tokio::task::spawn_blocking(working_loop).await.unwrap(); } diff --git a/zenoh-plugin-ros1/examples/ros1_standalone_sub.rs b/zenoh-plugin-ros1/examples/ros1_standalone_sub.rs index aa6e0d7..3f254a2 100644 --- a/zenoh-plugin-ros1/examples/ros1_standalone_sub.rs +++ b/zenoh-plugin-ros1/examples/ros1_standalone_sub.rs @@ -12,17 +12,15 @@ // ZettaScale Zenoh Team, // -use async_std::channel::unbounded; +use tokio::sync::mpsc::unbounded_channel; use zenoh_plugin_ros1::ros_to_zenoh_bridge::environment::Environment; -#[async_std::main] +#[tokio::main] async fn main() { - let (sender, receiver) = unbounded(); + let (sender, mut receiver) = unbounded_channel(); ctrlc::set_handler(move || { tracing::info!("Catching Ctrl+C..."); - sender - .send_blocking(()) - .expect("Error handling Ctrl+C signal") + sender.send(()).expect("Error handling Ctrl+C signal") }) .expect("Error setting Ctrl+C handler"); diff --git a/zenoh-plugin-ros1/examples/ros1_sub.rs b/zenoh-plugin-ros1/examples/ros1_sub.rs index 1ca81b4..7b857b7 100644 --- a/zenoh-plugin-ros1/examples/ros1_sub.rs +++ b/zenoh-plugin-ros1/examples/ros1_sub.rs @@ -17,7 +17,7 @@ use zenoh_plugin_ros1::ros_to_zenoh_bridge::{ environment::Environment, ros1_master_ctrl::Ros1MasterCtrl, Ros1ToZenohBridge, }; -#[async_std::main] +#[tokio::main] async fn main() { // initiate logging zenoh::try_init_log_from_env(); @@ -73,6 +73,6 @@ async fn main() { loop { println!("Zenoh Publisher: publishing data..."); zenoh_publisher.put(data.clone()).await.unwrap(); - async_std::task::sleep(core::time::Duration::from_secs(1)).await; + tokio::time::sleep(core::time::Duration::from_secs(1)).await; } } diff --git a/zenoh-plugin-ros1/src/lib.rs b/zenoh-plugin-ros1/src/lib.rs index 4418e21..34911e8 100644 --- a/zenoh-plugin-ros1/src/lib.rs +++ b/zenoh-plugin-ros1/src/lib.rs @@ -13,11 +13,16 @@ // #![recursion_limit = "1024"] -use std::time::Duration; +use std::{ + future::Future, + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, +}; use ros_to_zenoh_bridge::{ environment::Environment, ros1_master_ctrl::Ros1MasterCtrl, Ros1ToZenohBridge, }; +use tokio::task::JoinHandle; use zenoh::{ internal::{ plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin}, @@ -27,8 +32,72 @@ use zenoh::{ }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; +use crate::ros_to_zenoh_bridge::environment; + pub mod ros_to_zenoh_bridge; +lazy_static::lazy_static! { + static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(environment::DEFAULT_WORK_THREAD_NUM); + static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(environment::DEFAULT_MAX_BLOCK_THREAD_NUM); + // The global runtime is used in the dynamic plugins, which we can't get the current runtime + static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst)) + .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst)) + .enable_all() + .build() + .expect("Unable to create runtime"); +} +#[inline(always)] +pub(crate) fn spawn_blocking_runtime(func: F) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), use the current runtime + rt.spawn_blocking(func) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), reuse the global runtime + TOKIO_RUNTIME.spawn_blocking(func) + } + } +} +#[inline(always)] +pub(crate) fn spawn_runtime(task: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), use the current runtime + rt.spawn(task) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), reuse the global runtime + TOKIO_RUNTIME.spawn(task) + } + } +} +#[inline(always)] +pub(crate) fn blockon_runtime(task: F) -> F::Output { + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), use the current runtime + tokio::task::block_in_place(|| rt.block_on(task)) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), reuse the global runtime + tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task)) + } + } +} + // The struct implementing the ZenohPlugin and ZenohPlugin traits pub struct Ros1Plugin {} @@ -70,6 +139,9 @@ impl Plugin for Ros1Plugin { entry.set(str.trim_matches('"').to_string()); } } + // Setup the thread numbers + WORK_THREAD_NUM.store(Environment::work_thread_num().get(), Ordering::SeqCst); + MAX_BLOCK_THREAD_NUM.store(Environment::max_block_thread_num().get(), Ordering::SeqCst); drop(config); @@ -87,16 +159,16 @@ impl RunningPluginTrait for Ros1PluginInstance {} impl Drop for Ros1PluginInstance { fn drop(&mut self) { if Environment::with_rosmaster().get() { - async_std::task::block_on(Ros1MasterCtrl::without_ros1_master()); + blockon_runtime(Ros1MasterCtrl::without_ros1_master()); } } } impl Ros1PluginInstance { fn new(runtime: &Runtime) -> ZResult { - let bridge: ZResult = async_std::task::block_on(async { + let bridge: ZResult = blockon_runtime(async { if Environment::with_rosmaster().get() { Ros1MasterCtrl::with_ros1_master().await?; - async_std::task::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; } // create a zenoh Session that shares the same Runtime as zenohd diff --git a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge.rs b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge.rs index 354d824..9a37dda 100644 --- a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge.rs +++ b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge.rs @@ -17,11 +17,12 @@ use std::sync::{ Arc, }; -use async_std::task::JoinHandle; +use tokio::task::JoinHandle; use tracing::error; use zenoh::{self, Result as ZResult, Session}; use self::{environment::Environment, ros1_to_zenoh_bridge_impl::work_cycle}; +use crate::spawn_runtime; #[cfg(feature = "test")] pub mod aloha_declaration; @@ -95,7 +96,7 @@ impl Ros1ToZenohBridge { let flag = Arc::new(AtomicBool::new(true)); Self { flag: flag.clone(), - task_handle: Box::new(async_std::task::spawn(Self::run(session, flag))), + task_handle: Box::new(spawn_runtime(Self::run(session, flag))), } } @@ -120,7 +121,10 @@ impl Ros1ToZenohBridge { } async fn async_await(&mut self) { - self.task_handle.as_mut().await; + self.task_handle + .as_mut() + .await + .expect("Unable to complete the task"); } } impl Drop for Ros1ToZenohBridge { diff --git a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/abstract_bridge.rs b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/abstract_bridge.rs index 5531854..087a1be 100644 --- a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/abstract_bridge.rs +++ b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/abstract_bridge.rs @@ -22,6 +22,7 @@ use super::{ bridge_type::BridgeType, ros1_client, topic_descriptor::TopicDescriptor, topic_utilities::make_zenoh_key, zenoh_client, }; +use crate::{blockon_runtime, spawn_blocking_runtime, spawn_runtime}; pub struct AbstractBridge { _impl: BridgeIml, @@ -92,7 +93,7 @@ impl Ros1ToZenohClient { query: rosrust::RawMessage, zenoh_client: &zenoh_client::ZenohClient, ) -> rosrust::ServiceResult { - return async_std::task::block_on(Self::do_zenoh_query(key, query, zenoh_client)); + return blockon_runtime(Self::do_zenoh_query(key, query, zenoh_client)); } async fn do_zenoh_query( @@ -158,7 +159,7 @@ impl Ros1ToZenohService { let topic_in_arc = Arc::new(topic.clone()); let queryable = zenoh_client .make_queryable(make_zenoh_key(topic), move |query| { - async_std::task::spawn(Self::on_query( + spawn_runtime(Self::on_query( client_in_arc.clone(), query, topic_in_arc.clone(), @@ -204,7 +205,7 @@ impl Ros1ToZenohService { ) { // rosrust is synchronous, so we will use spawn_blocking. If there will be an async mode some day for the rosrust, // than reply_to_query can be refactored to async very easily - let res = async_std::task::spawn_blocking(move || { + let res = spawn_blocking_runtime(move || { let description = RawMessageDescription { msg_definition: String::from("*"), md5sum: topic.md5.clone(), @@ -212,7 +213,8 @@ impl Ros1ToZenohService { }; ros1_client.req_with_description(&rosrust::RawMessage(payload), description) }) - .await; + .await + .expect("Unable to compete the task"); match Self::reply_to_query(res, &query).await { Ok(_) => {} Err(e) => { @@ -315,7 +317,7 @@ impl ZenohToRos1 { let subscriber = zenoh_client .subscribe(make_zenoh_key(topic), move |sample| { let publisher_in_arc_cloned = publisher_in_arc.clone(); - async_std::task::spawn_blocking(move || { + spawn_blocking_runtime(move || { let data = sample.payload().into::>(); debug!("Zenoh -> ROS1: sending {} bytes!", data.len()); match publisher_in_arc_cloned.send(rosrust::RawMessage(data)) { diff --git a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/aloha_declaration.rs b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/aloha_declaration.rs index bb66826..739af75 100644 --- a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/aloha_declaration.rs +++ b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/aloha_declaration.rs @@ -30,6 +30,8 @@ use zenoh::{ Session, }; +use crate::spawn_runtime; + pub struct AlohaDeclaration { monitor_running: Arc, } @@ -42,7 +44,7 @@ impl Drop for AlohaDeclaration { impl AlohaDeclaration { pub fn new(session: Arc, key: OwnedKeyExpr, beacon_period: Duration) -> Self { let monitor_running = Arc::new(AtomicBool::new(true)); - async_std::task::spawn(Self::aloha_monitor_task( + spawn_runtime(Self::aloha_monitor_task( beacon_period, monitor_running.clone(), key, @@ -87,10 +89,8 @@ impl AlohaDeclaration { // start publisher in ALOHA style... let period_ns = beacon_period.as_nanos(); let aloha_wait: u128 = rand::random::() % period_ns; - async_std::task::sleep(Duration::from_nanos( - aloha_wait.try_into().unwrap(), - )) - .await; + tokio::time::sleep(Duration::from_nanos(aloha_wait.try_into().unwrap())) + .await; if remote_beacons.load(std::sync::atomic::Ordering::SeqCst) == 0 { Self::start_beacon_task( beacon_period, @@ -109,7 +109,7 @@ impl AlohaDeclaration { } } } - async_std::task::sleep(beacon_period).await; + tokio::time::sleep(beacon_period).await; } Self::stop_beacon_task(beacon_task_flag.clone()); } @@ -121,7 +121,7 @@ impl AlohaDeclaration { running: Arc, ) { running.store(true, std::sync::atomic::Ordering::SeqCst); - async_std::task::spawn(Self::aloha_publishing_task( + spawn_runtime(Self::aloha_publishing_task( beacon_period, key, session, @@ -149,7 +149,7 @@ impl AlohaDeclaration { while running.load(std::sync::atomic::Ordering::Relaxed) { let _res = publisher.put(ZBuf::default()).await; - async_std::task::sleep(beacon_period).await; + tokio::time::sleep(beacon_period).await; } } } diff --git a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/aloha_subscription.rs b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/aloha_subscription.rs index 5945b4b..daf2032 100644 --- a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/aloha_subscription.rs +++ b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/aloha_subscription.rs @@ -21,12 +21,14 @@ use std::{ time::Duration, }; -use async_std::sync::Mutex; use flume::Receiver; use futures::{join, Future, FutureExt}; +use tokio::sync::Mutex; use tracing::error; use zenoh::{key_expr::OwnedKeyExpr, prelude::*, sample::Sample, Result as ZResult, Session}; +use crate::spawn_runtime; + struct AlohaResource { activity: AtomicBool, } @@ -80,7 +82,7 @@ impl AlohaSubscription { { let task_running = Arc::new(AtomicBool::new(true)); - async_std::task::spawn(AlohaSubscription::task( + spawn_runtime(AlohaSubscription::task( task_running.clone(), key, beacon_period, @@ -184,7 +186,7 @@ impl AlohaSubscription { val.1.reset(); }); - async_std::task::sleep(accumulate_period).await; + tokio::time::sleep(accumulate_period).await; for (key, val) in accumulating_resources.lock().await.iter() { if !val.is_active() { diff --git a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/environment.rs b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/environment.rs index b5a9c80..c6885a2 100644 --- a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/environment.rs +++ b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/environment.rs @@ -20,6 +20,9 @@ use tracing::error; use super::bridging_mode::BridgingMode; +pub(crate) const DEFAULT_WORK_THREAD_NUM: usize = 2; +pub(crate) const DEFAULT_MAX_BLOCK_THREAD_NUM: usize = 50; + #[derive(Clone)] pub struct Entry<'a, Tvar> where @@ -80,6 +83,12 @@ impl<'a> From> for Entry<'a, String> { } } +impl<'a> From> for Entry<'a, String> { + fn from(item: Entry<'a, usize>) -> Entry<'a, String> { + Entry::new(item.name, item.default.to_string()) + } +} + #[derive(Clone, Default)] pub struct CustomBridgingModes { pub modes: HashMap, @@ -169,6 +178,14 @@ impl Environment { ); } + pub fn work_thread_num() -> Entry<'static, usize> { + return Entry::new("WORK_THREAD_NUM", DEFAULT_WORK_THREAD_NUM); + } + + pub fn max_block_thread_num() -> Entry<'static, usize> { + return Entry::new("MAX_BLOCK_THREAD_NUM", DEFAULT_MAX_BLOCK_THREAD_NUM); + } + pub fn env() -> Vec> { [ Self::ros_master_uri(), @@ -185,6 +202,8 @@ impl Environment { Self::client_topic_custom_bridging_mode().into(), Self::master_polling_interval().into(), Self::with_rosmaster().into(), + Self::work_thread_num().into(), + Self::max_block_thread_num().into(), ] .to_vec() } diff --git a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/ros1_master_ctrl.rs b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/ros1_master_ctrl.rs index 45f8bff..25e2a95 100644 --- a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/ros1_master_ctrl.rs +++ b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/ros1_master_ctrl.rs @@ -12,11 +12,11 @@ // ZettaScale Zenoh Team, // -use async_std::{ +use atoi::atoi; +use tokio::{ process::{Child, Command}, sync::Mutex, }; -use atoi::atoi; use tracing::error; use zenoh::{ internal::{bail, zasynclock, zerror}, @@ -25,7 +25,9 @@ use zenoh::{ use crate::ros_to_zenoh_bridge::environment::Environment; -static ROSMASTER: Mutex> = Mutex::new(None); +lazy_static::lazy_static! { + static ref ROSMASTER: Mutex> = Mutex::new(None); +} pub struct Ros1MasterCtrl; impl Ros1MasterCtrl { @@ -55,20 +57,12 @@ impl Ros1MasterCtrl { let mut locked = zasynclock!(ROSMASTER); assert!(locked.is_some()); match locked.take() { - Some(mut child) => match child.kill() { - Ok(_) => { - if let Err(e) = child.status().await { - error!("Error stopping child rosmaster: {}", e); - } - } + Some(mut child) => match child.kill().await { + Ok(_) => {} Err(e) => error!("Error sending kill cmd to child rosmaster: {}", e), }, None => match Command::new("killall").arg("rosmaster").spawn() { - Ok(mut child) => { - if let Err(e) = child.status().await { - error!("Error stopping foreign rosmaster: {}", e); - } - } + Ok(_) => {} Err(e) => error!( "Error executing killall command to stop foreign rosmaster: {}", e diff --git a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/ros1_to_zenoh_bridge_impl.rs b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/ros1_to_zenoh_bridge_impl.rs index c0e9bf8..c1f2e89 100644 --- a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/ros1_to_zenoh_bridge_impl.rs +++ b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/ros1_to_zenoh_bridge_impl.rs @@ -20,7 +20,7 @@ use std::{ time::Duration, }; -use async_std::sync::Mutex; +use tokio::sync::Mutex; use tracing::{debug, error}; use zenoh::{self, internal::zasynclock, Result as ZResult}; @@ -29,9 +29,12 @@ use super::{ resource_cache::Ros1ResourceCache, ros1_client::Ros1Client, }; -use crate::ros_to_zenoh_bridge::{ - bridges_storage::BridgesStorage, discovery::LocalResources, environment::Environment, - ros1_client, topic_mapping, zenoh_client, +use crate::{ + ros_to_zenoh_bridge::{ + bridges_storage::BridgesStorage, discovery::LocalResources, environment::Environment, + ros1_client, topic_mapping, zenoh_client, + }, + spawn_blocking_runtime, }; #[derive(PartialEq, Clone, Copy)] @@ -170,7 +173,7 @@ where while flag.load(Relaxed) { let cl = ros1_client.clone(); - let (ros1_state, returned_cache) = async_std::task::spawn_blocking(move || { + let (ros1_state, returned_cache) = spawn_blocking_runtime(move || { ( topic_mapping::Ros1TopicMapping::topic_mapping( cl.as_ref(), @@ -179,7 +182,8 @@ where ros1_resource_cache, ) }) - .await; + .await + .expect("Unable to complete the task"); ros1_resource_cache = returned_cache; debug!("ros state: {:#?}", ros1_state); @@ -195,7 +199,7 @@ where self.report_bridge_statistics(&locked); } - async_std::task::sleep({ + tokio::time::sleep({ if smth_changed { poll_interval / 2 } else { @@ -213,7 +217,7 @@ where Self::cleanup(&mut locked); self.report_bridge_statistics(&locked); } - async_std::task::sleep(poll_interval).await; + tokio::time::sleep(poll_interval).await; } } } diff --git a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/test_helpers.rs b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/test_helpers.rs index 81db0b0..91c80db 100644 --- a/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/test_helpers.rs +++ b/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/test_helpers.rs @@ -23,7 +23,6 @@ use std::{ time::Duration, }; -use async_std::prelude::FutureExt; use async_trait::async_trait; use futures::Future; use rosrust::{Client, RawMessage, RawMessageDescription}; @@ -46,6 +45,7 @@ use super::{ topic_utilities::make_topic_key, zenoh_client, }; +use crate::{spawn_blocking_runtime, spawn_runtime}; pub struct IsolatedPort { pub port: u16, @@ -118,7 +118,7 @@ where if waiter() { return true; } - async_std::task::sleep(Duration::from_millis(sleep_millis)).await; + tokio::time::sleep(Duration::from_millis(sleep_millis)).await; } false } @@ -130,10 +130,10 @@ where { let w = async { while !waiter().await { - async_std::task::sleep(Duration::from_millis(10)).await + tokio::time::sleep(Duration::from_millis(10)).await } }; - w.timeout(timeout).await.is_ok() + tokio::time::timeout(timeout, w).await.is_ok() } pub struct RunningBridge { @@ -150,7 +150,7 @@ impl RunningBridge { ros_status: Arc::new(Mutex::new(RosStatus::Unknown)), bridge_status: Arc::new(Mutex::new(BridgeStatus::default())), }; - async_std::task::spawn(Self::run( + spawn_runtime(Self::run( ros_master_uri, config, result.flag.clone(), @@ -668,16 +668,14 @@ pub trait Publisher: Sync { impl Publisher for ZenohPublisher { fn put(&self, data: Vec) { let inner = self.inner.clone(); - async_std::task::spawn_blocking(move || inner.put(data).wait().unwrap()); + spawn_blocking_runtime(move || inner.put(data).wait().unwrap()); } } #[async_trait] impl Publisher for ROS1Publisher { fn put(&self, data: Vec) { let inner = self.inner.clone(); - async_std::task::spawn_blocking(move || { - inner.data.send(rosrust::RawMessage(data)).unwrap() - }); + spawn_blocking_runtime(move || inner.data.send(rosrust::RawMessage(data)).unwrap()); } async fn ready(&self) -> bool { @@ -687,7 +685,7 @@ impl Publisher for ROS1Publisher { #[async_trait] impl Publisher for ZenohQuery { fn put(&self, data: Vec) { - async_std::task::spawn(Self::query_loop( + spawn_runtime(Self::query_loop( self.inner.clone(), self.key.clone(), self.running.clone(), @@ -715,7 +713,7 @@ impl Publisher for ROS1Client { msg_type: self.topic.datatype.clone(), }; - async_std::task::spawn_blocking(|| { + spawn_blocking_runtime(|| { Self::query_loop(description, running, data, cycles, ros1_client) }); } @@ -728,10 +726,9 @@ impl Publisher for ROS1Client { }; let data = (0..10).collect(); let ros1_client = self.ros1_client.clone(); - async_std::task::spawn_blocking(move || { - Self::make_query(description, &data, &ros1_client).is_ok() - }) - .await + spawn_blocking_runtime(move || Self::make_query(description, &data, &ros1_client).is_ok()) + .await + .unwrap_or(false) } } diff --git a/zenoh-plugin-ros1/tests/aloha_declaration_test.rs b/zenoh-plugin-ros1/tests/aloha_declaration_test.rs index cac6202..88db5bb 100644 --- a/zenoh-plugin-ros1/tests/aloha_declaration_test.rs +++ b/zenoh-plugin-ros1/tests/aloha_declaration_test.rs @@ -19,7 +19,8 @@ use std::{ time::Duration, }; -use async_std::{prelude::FutureExt, sync::Mutex}; +use test_case::test_case; +use tokio::sync::Mutex; use zenoh::{key_expr::OwnedKeyExpr, prelude::*, session::OpenBuilder, Result as ZResult, Session}; use zenoh_plugin_ros1::ros_to_zenoh_bridge::{ aloha_declaration, aloha_subscription, test_helpers::IsolatedConfig, @@ -57,22 +58,25 @@ fn make_session(cfg: &IsolatedConfig) -> Arc { session_builder(cfg).wait().unwrap().into_arc() } -fn make_subscription( +async fn make_subscription( session: Arc, beacon_period: Duration, ) -> aloha_subscription::AlohaSubscription { - async_std::task::block_on(subscription_builder(session, beacon_period).build()).unwrap() + subscription_builder(session, beacon_period) + .build() + .await + .expect("Failed to make subscription") } -#[test] -fn aloha_instantination_one_instance() { +#[tokio::test(flavor = "multi_thread")] +async fn aloha_instantination_one_instance() { let session = make_session(&IsolatedConfig::default()); let _declaration = declaration_builder(session.clone(), Duration::from_secs(1)); - let _subscription = make_subscription(session, Duration::from_secs(1)); + let _subscription = make_subscription(session, Duration::from_secs(1)).await; } -#[test] -fn aloha_instantination_many_instances() { +#[tokio::test(flavor = "multi_thread")] +async fn aloha_instantination_many_instances() { let cfg = IsolatedConfig::default(); let mut sessions = Vec::new(); let mut declarations = Vec::new(); @@ -84,7 +88,7 @@ fn aloha_instantination_many_instances() { } for session in sessions.iter() { - subscriptions.push(make_subscription(session.clone(), Duration::from_secs(1))); + subscriptions.push(make_subscription(session.clone(), Duration::from_secs(1)).await); } } @@ -117,7 +121,7 @@ impl<'a> PPCMeasurement<'a> { pub async fn measure_ppc(&self) -> usize { self.ppc.store(0, std::sync::atomic::Ordering::SeqCst); - async_std::task::sleep(self.measurement_period).await; + tokio::time::sleep(self.measurement_period).await; self.ppc.load(std::sync::atomic::Ordering::SeqCst) } } @@ -184,7 +188,7 @@ impl DeclarationCollector { || !self.to_be_undeclared.lock().await.is_empty() || expected != *self.resources.lock().await { - async_std::task::sleep(core::time::Duration::from_millis(1)).await; + tokio::time::sleep(core::time::Duration::from_millis(1)).await; } } } @@ -245,7 +249,7 @@ async fn test_state_transition<'a>( } collector.wait(result).await; - async_std::task::sleep(beacon_period).await; + tokio::time::sleep(beacon_period).await; while ppc_measurer.measure_ppc().await != { let mut res = 1; if state.declarators_count == 0 { @@ -275,99 +279,48 @@ async fn run_aloha(beacon_period: Duration, scenario: Vec) { .unwrap(); for scene in scenario { println!("Transiting State: {}", scene.declarators_count); - test_state_transition( - &cfg, - beacon_period, - &mut declaring_sessions, - &mut declarations, - &mut collector, - &ppc_measurer, - &scene, + tokio::time::timeout( + TIMEOUT, + test_state_transition( + &cfg, + beacon_period, + &mut declaring_sessions, + &mut declarations, + &mut collector, + &ppc_measurer, + &scene, + ), ) - .timeout(TIMEOUT) .await .expect("Timeout waiting state transition!"); } } -#[test] -fn aloha_declare_one() { - async_std::task::block_on(run_aloha( - Duration::from_millis(100), - [State::default().declarators(1)].into_iter().collect(), - )); -} - -#[test] -fn aloha_declare_many() { - async_std::task::block_on(run_aloha( - Duration::from_millis(100), - [State::default().declarators(10)].into_iter().collect(), - )); -} - -#[test] -fn aloha_declare_many_one_many() { - async_std::task::block_on(run_aloha( - Duration::from_millis(100), - [ - State::default().declarators(10), - State::default().declarators(1), - State::default().declarators(10), - ] - .into_iter() - .collect(), - )); -} - -#[test] -fn aloha_declare_one_zero_one() { - async_std::task::block_on(run_aloha( - Duration::from_millis(100), - [ - State::default().declarators(1), - State::default().declarators(0), - State::default().declarators(1), - ] - .into_iter() - .collect(), - )); -} - -#[test] -fn aloha_declare_many_zero_many() { - async_std::task::block_on(run_aloha( - Duration::from_millis(100), - [ - State::default().declarators(10), - State::default().declarators(0), - State::default().declarators(10), - ] - .into_iter() - .collect(), - )); -} - -#[test] -fn aloha_many_scenarios() { - async_std::task::block_on(run_aloha( - Duration::from_millis(100), - [ - State::default().declarators(1), - State::default().declarators(10), - State::default().declarators(1), - State::default().declarators(10), - State::default().declarators(1), - State::default().declarators(10), - State::default().declarators(0), - State::default().declarators(1), - State::default().declarators(10), - State::default().declarators(1), - State::default().declarators(0), - State::default().declarators(10), - State::default().declarators(1), - ] - .into_iter() - .collect(), - )); +#[test_case([State::default().declarators(1)].into_iter().collect(); "one")] +#[test_case([State::default().declarators(10)].into_iter().collect(); "many")] +#[test_case([State::default().declarators(10), + State::default().declarators(1), + State::default().declarators(10)].into_iter().collect(); "many one many")] +#[test_case([State::default().declarators(1), + State::default().declarators(0), + State::default().declarators(1)].into_iter().collect(); "one zero one")] +#[test_case([State::default().declarators(10), + State::default().declarators(0), + State::default().declarators(10)].into_iter().collect(); "many zero many")] +#[test_case([State::default().declarators(1), + State::default().declarators(10), + State::default().declarators(1), + State::default().declarators(10), + State::default().declarators(1), + State::default().declarators(10), + State::default().declarators(0), + State::default().declarators(1), + State::default().declarators(10), + State::default().declarators(1), + State::default().declarators(0), + State::default().declarators(10), + State::default().declarators(1)].into_iter().collect(); "many scenarios")] +#[tokio::test(flavor = "multi_thread")] +async fn aloha_declare(vec_state: Vec) { + run_aloha(Duration::from_millis(100), vec_state).await; } diff --git a/zenoh-plugin-ros1/tests/bridge_to_bridge.rs b/zenoh-plugin-ros1/tests/bridge_to_bridge.rs index 1adfa0f..326326b 100644 --- a/zenoh-plugin-ros1/tests/bridge_to_bridge.rs +++ b/zenoh-plugin-ros1/tests/bridge_to_bridge.rs @@ -21,9 +21,9 @@ use std::{ time::Duration, }; -use async_std::prelude::FutureExt; use rosrust::RawMessage; use strum_macros::Display; +use test_case::test_case; use tracing::{debug, trace}; use zenoh::key_expr::{KeyExpr, OwnedKeyExpr}; use zenoh_plugin_ros1::ros_to_zenoh_bridge::{ @@ -136,9 +136,9 @@ async fn async_create_bridge() { system1.wait_state_synch().await; } -#[test] -fn create_bridge() { - async_std::task::block_on(async_create_bridge()); +#[tokio::test(flavor = "multi_thread")] +async fn create_bridge() { + async_create_bridge().await; } async fn async_create_bridge_and_init() { @@ -147,9 +147,9 @@ async fn async_create_bridge_and_init() { system1.with_ros().with_bridge().wait_state_synch().await; } -#[test] -fn create_bridge_and_init() { - async_std::task::block_on(async_create_bridge_and_init()); +#[tokio::test(flavor = "multi_thread")] +async fn create_bridge_and_init() { + async_create_bridge_and_init().await; } async fn async_create_bridge_and_reinit_ros() { @@ -163,9 +163,9 @@ async fn async_create_bridge_and_reinit_ros() { } } -#[test] -fn create_bridge_and_reinit_ros() { - async_std::task::block_on(async_create_bridge_and_reinit_ros()); +#[tokio::test(flavor = "multi_thread")] +async fn create_bridge_and_reinit_ros() { + async_create_bridge_and_reinit_ros().await; } async fn async_create_bridge_and_reinit_bridge() { @@ -179,9 +179,9 @@ async fn async_create_bridge_and_reinit_bridge() { } } -#[test] -fn create_bridge_and_reinit_bridge() { - async_std::task::block_on(async_create_bridge_and_reinit_bridge()); +#[tokio::test(flavor = "multi_thread")] +async fn create_bridge_and_reinit_bridge() { + async_create_bridge_and_reinit_bridge().await; } struct SrcDstPair { @@ -219,7 +219,7 @@ impl SrcDstPair { data.push((i % 255) as u8); } - async { + tokio::time::timeout(Duration::from_secs(30), async { while { self.src.put(data.clone()); !test_helpers::wait_async_fn( @@ -230,8 +230,7 @@ impl SrcDstPair { } { debug!("Restarting ping-pong!"); } - } - .timeout(Duration::from_secs(30)) + }) .await .is_ok() } @@ -254,7 +253,7 @@ impl SrcDstPair { self.counter.store(0, Relaxed); while !(result > 0.0 || duration >= 10000) { - async_std::task::sleep(core::time::Duration::from_millis(duration_milliseconds)).await; + tokio::time::sleep(core::time::Duration::from_millis(duration_milliseconds)).await; duration += duration_milliseconds; result += self.counter.load(Relaxed) as f64; } @@ -318,8 +317,6 @@ async fn async_bridge_2_bridge(instances: u32, mode: std::collections::HashSet) { + async_bridge_2_bridge(instances, mode).await; } diff --git a/zenoh-plugin-ros1/tests/discovery_test.rs b/zenoh-plugin-ros1/tests/discovery_test.rs index 90e58cc..73c1feb 100644 --- a/zenoh-plugin-ros1/tests/discovery_test.rs +++ b/zenoh-plugin-ros1/tests/discovery_test.rs @@ -17,8 +17,8 @@ use std::{ time::Duration, }; -use async_std::prelude::FutureExt; use multiset::HashMultiSet; +use test_case::test_case; use zenoh::{key_expr::keyexpr, prelude::*, session::OpenBuilder, Session}; use zenoh_plugin_ros1::ros_to_zenoh_bridge::{ discovery::{self, LocalResources, RemoteResources}, @@ -43,19 +43,19 @@ fn make_session(cfg: &IsolatedConfig) -> Arc { fn make_local_resources(session: Arc) -> LocalResources { LocalResources::new("*".to_owned(), "*".to_owned(), session) } -fn make_remote_resources(session: Arc) -> RemoteResources { - async_std::task::block_on(remote_resources_builder(session).build()) +async fn make_remote_resources(session: Arc) -> RemoteResources { + remote_resources_builder(session).build().await } -#[test] -fn discovery_instantination_one_instance() { +#[tokio::test(flavor = "multi_thread")] +async fn discovery_instantination_one_instance() { let session = make_session(&IsolatedConfig::default()); - let _remote = make_remote_resources(session.clone()); + let _remote = make_remote_resources(session.clone()).await; let _local = make_local_resources(session); } -#[test] -fn discovery_instantination_many_instances() { +#[tokio::test(flavor = "multi_thread")] +async fn discovery_instantination_many_instances() { let cfg = IsolatedConfig::default(); let mut sessions = Vec::new(); for _i in 0..10 { @@ -64,7 +64,7 @@ fn discovery_instantination_many_instances() { let mut discoveries = Vec::new(); for session in sessions.iter() { - let remote = make_remote_resources(session.clone()); + let remote = make_remote_resources(session.clone()).await; let local = make_local_resources(session.clone()); discoveries.push((remote, local)); } @@ -159,7 +159,7 @@ impl DiscoveryCollector { expected: HashMultiSet, ) { while expected != *container.lock().unwrap() { - async_std::task::sleep(core::time::Duration::from_millis(10)).await; + tokio::time::sleep(core::time::Duration::from_millis(10)).await; } } } @@ -291,117 +291,46 @@ async fn run_discovery(scenario: Vec) { .await; for scene in scenario { - test_state_transition(&local_resources, &rcv, &scene) - .timeout(TIMEOUT) - .await - .expect("Timeout waiting state transition!"); + tokio::time::timeout( + TIMEOUT, + test_state_transition(&local_resources, &rcv, &scene), + ) + .await + .expect("Timeout waiting state transition!"); } } -#[test] -fn discover_single_publisher() { - async_std::task::block_on(run_discovery( - [State::default().publishers(1, 1)].into_iter().collect(), - )); -} -#[test] -fn discover_single_subscriber() { - async_std::task::block_on(run_discovery( - [State::default().subscribers(1, 1)].into_iter().collect(), - )); -} -#[test] -fn discover_single_service() { - async_std::task::block_on(run_discovery( - [State::default().services(1, 1)].into_iter().collect(), - )); -} -#[test] -fn discover_single_client() { - async_std::task::block_on(run_discovery( - [State::default().clients(1, 1)].into_iter().collect(), - )); -} -#[test] -fn discover_single_transition() { - async_std::task::block_on(run_discovery( - [ - State::default().publishers(1, 1), - State::default().subscribers(1, 1), - State::default().services(1, 1), - State::default().clients(1, 1), - ] - .into_iter() - .collect(), - )); -} -#[test] -fn discover_single_transition_with_zero_state() { - async_std::task::block_on(run_discovery( - [ - State::default().publishers(1, 1), - State::default(), - State::default().subscribers(1, 1), - State::default(), - State::default().services(1, 1), - State::default(), - State::default().clients(1, 1), - ] - .into_iter() - .collect(), - )); -} - -#[test] -fn discover_multiple_publishers() { - async_std::task::block_on(run_discovery( - [State::default().publishers(100, 1)].into_iter().collect(), - )); -} -#[test] -fn discover_multiple_subscribers() { - async_std::task::block_on(run_discovery( - [State::default().subscribers(100, 1)].into_iter().collect(), - )); -} -#[test] -fn discover_multiple_services() { - async_std::task::block_on(run_discovery( - [State::default().services(100, 1)].into_iter().collect(), - )); -} -#[test] -fn discover_multiple_clients() { - async_std::task::block_on(run_discovery( - [State::default().clients(100, 1)].into_iter().collect(), - )); -} -#[test] -fn discover_multiple_transition() { - async_std::task::block_on(run_discovery( - [ - State::default().publishers(100, 1), - State::default().subscribers(100, 1), - State::default().services(100, 1), - State::default().clients(100, 1), - ] - .into_iter() - .collect(), - )); -} -#[test] -fn discover_multiple_transition_with_zero_state() { - async_std::task::block_on(run_discovery( - [ - State::default().publishers(100, 1), - State::default(), - State::default().subscribers(100, 1), - State::default(), - State::default().services(100, 1), - State::default(), - State::default().clients(100, 1), - ] - .into_iter() - .collect(), - )); +#[test_case([State::default().publishers(1, 1)].into_iter().collect(); "single_publisher")] +#[test_case([State::default().subscribers(1, 1)].into_iter().collect(); "single_subscriber")] +#[test_case([State::default().services(1, 1)].into_iter().collect(); "single_service")] +#[test_case([State::default().clients(1, 1)].into_iter().collect(); "single_client")] +#[test_case([State::default().publishers(1, 1), + State::default().subscribers(1, 1), + State::default().services(1, 1), + State::default().clients(1, 1),].into_iter().collect(); "single_transition")] +#[test_case([State::default().publishers(1, 1), + State::default(), + State::default().subscribers(1, 1), + State::default(), + State::default().services(1, 1), + State::default(), + State::default().clients(1, 1),].into_iter().collect(); "single_transition_with_zero_state")] +#[test_case([State::default().publishers(100, 1)].into_iter().collect(); "multiple_publisher")] +#[test_case([State::default().subscribers(100, 1)].into_iter().collect(); "multiple_subscriber")] +#[test_case([State::default().services(100, 1)].into_iter().collect(); "multiple_service")] +#[test_case([State::default().clients(100, 1)].into_iter().collect(); "multiple_client")] +#[test_case([State::default().publishers(100, 1), + State::default().subscribers(100, 1), + State::default().services(100, 1), + State::default().clients(100, 1),].into_iter().collect(); "multiple_transition")] +#[test_case([State::default().publishers(100, 1), + State::default(), + State::default().subscribers(100, 1), + State::default(), + State::default().services(100, 1), + State::default(), + State::default().clients(100, 1),].into_iter().collect(); "multiple_transition_with_zero_state")] +#[tokio::test(flavor = "multi_thread")] +async fn discover(vec_state: Vec) { + run_discovery(vec_state).await; } diff --git a/zenoh-plugin-ros1/tests/ping_pong_test.rs b/zenoh-plugin-ros1/tests/ping_pong_test.rs index 708fe78..17eb216 100644 --- a/zenoh-plugin-ros1/tests/ping_pong_test.rs +++ b/zenoh-plugin-ros1/tests/ping_pong_test.rs @@ -22,7 +22,6 @@ use std::{ time::Duration, }; -use async_std::prelude::FutureExt; use strum_macros::Display; use tracing::{debug, trace}; use zenoh::{key_expr::KeyExpr, prelude::*}; @@ -37,96 +36,96 @@ use zenoh_plugin_ros1::ros_to_zenoh_bridge::{ }, }; -#[test] -fn env_checks_no_master_init_and_exit_immed() { +#[tokio::test(flavor = "multi_thread")] +async fn env_checks_no_master_init_and_exit_immed() { let roscfg = IsolatedROSMaster::default(); let _ros_env = ROSEnvironment::new(roscfg.port.port); let bridge = RunningBridge::new(IsolatedConfig::default().peer(), roscfg.master_uri()); - async_std::task::block_on(bridge.assert_ros_error()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_error().await; + bridge.assert_bridge_empy().await; } -#[test] -fn env_checks_no_master_init_and_wait() { +#[tokio::test(flavor = "multi_thread")] +async fn env_checks_no_master_init_and_wait() { let roscfg = IsolatedROSMaster::default(); let _ros_env = ROSEnvironment::new(roscfg.port.port); let bridge = RunningBridge::new(IsolatedConfig::default().peer(), roscfg.master_uri()); - async_std::task::block_on(bridge.assert_ros_error()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_error().await; + bridge.assert_bridge_empy().await; thread::sleep(time::Duration::from_secs(1)); - async_std::task::block_on(bridge.assert_ros_error()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_error().await; + bridge.assert_bridge_empy().await; } -#[test] -fn env_checks_with_master_init_and_exit_immed() { +#[tokio::test(flavor = "multi_thread")] +async fn env_checks_with_master_init_and_exit_immed() { let roscfg = IsolatedROSMaster::default(); let _ros_env = ROSEnvironment::new(roscfg.port.port).with_master(); let bridge = RunningBridge::new(IsolatedConfig::default().peer(), roscfg.master_uri()); - async_std::task::block_on(bridge.assert_ros_ok()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_ok().await; + bridge.assert_bridge_empy().await; } -#[test] -fn env_checks_with_master_init_and_wait() { +#[tokio::test(flavor = "multi_thread")] +async fn env_checks_with_master_init_and_wait() { let roscfg = IsolatedROSMaster::default(); let _ros_env = ROSEnvironment::new(roscfg.port.port).with_master(); let bridge = RunningBridge::new(IsolatedConfig::default().peer(), roscfg.master_uri()); - async_std::task::block_on(bridge.assert_ros_ok()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_ok().await; + bridge.assert_bridge_empy().await; thread::sleep(time::Duration::from_secs(1)); - async_std::task::block_on(bridge.assert_ros_ok()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_ok().await; + bridge.assert_bridge_empy().await; } -#[test] -fn env_checks_with_master_init_and_loose_master() { +#[tokio::test(flavor = "multi_thread")] +async fn env_checks_with_master_init_and_loose_master() { let roscfg = IsolatedROSMaster::default(); let mut _ros_env = Some(ROSEnvironment::new(roscfg.port.port).with_master()); let bridge = RunningBridge::new(IsolatedConfig::default().peer(), roscfg.master_uri()); - async_std::task::block_on(bridge.assert_ros_ok()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_ok().await; + bridge.assert_bridge_empy().await; thread::sleep(time::Duration::from_secs(1)); - async_std::task::block_on(bridge.assert_ros_ok()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_ok().await; + bridge.assert_bridge_empy().await; _ros_env = None; - async_std::task::block_on(bridge.assert_ros_error()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_error().await; + bridge.assert_bridge_empy().await; thread::sleep(time::Duration::from_secs(1)); - async_std::task::block_on(bridge.assert_ros_error()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_error().await; + bridge.assert_bridge_empy().await; } -#[test] -fn env_checks_with_master_init_and_wait_for_master() { +#[tokio::test(flavor = "multi_thread")] +async fn env_checks_with_master_init_and_wait_for_master() { let roscfg = IsolatedROSMaster::default(); let mut _ros_env = ROSEnvironment::new(roscfg.port.port); let bridge = RunningBridge::new(IsolatedConfig::default().peer(), roscfg.master_uri()); - async_std::task::block_on(bridge.assert_ros_error()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_error().await; + bridge.assert_bridge_empy().await; thread::sleep(time::Duration::from_secs(1)); - async_std::task::block_on(bridge.assert_ros_error()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_error().await; + bridge.assert_bridge_empy().await; _ros_env = _ros_env.with_master(); - async_std::task::block_on(bridge.assert_ros_ok()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_ok().await; + bridge.assert_bridge_empy().await; thread::sleep(time::Duration::from_secs(1)); - async_std::task::block_on(bridge.assert_ros_ok()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_ok().await; + bridge.assert_bridge_empy().await; } -#[test] -fn env_checks_with_master_init_and_reconnect_many_times_to_master() { +#[tokio::test(flavor = "multi_thread")] +async fn env_checks_with_master_init_and_reconnect_many_times_to_master() { let roscfg = IsolatedROSMaster::default(); let mut ros_env = ROSEnvironment::new(roscfg.port.port); let bridge = RunningBridge::new(IsolatedConfig::default().peer(), roscfg.master_uri()); for _i in 0..20 { - async_std::task::block_on(bridge.assert_ros_error()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_error().await; + bridge.assert_bridge_empy().await; ros_env = ros_env.with_master(); - async_std::task::block_on(bridge.assert_ros_ok()); - async_std::task::block_on(bridge.assert_bridge_empy()); + bridge.assert_ros_ok().await; + bridge.assert_bridge_empy().await; ros_env = ros_env.without_master(); } } @@ -192,7 +191,7 @@ impl PingPong { .unwrap(); let zenoh_queryable = backend .make_zenoh_queryable(key, |q| { - async_std::task::spawn(async move { + tokio::spawn(async move { let key = q.key_expr().clone(); let val = q.payload().unwrap().clone(); let _ = q.reply(key, val).await; @@ -292,15 +291,14 @@ impl PingPong { data.push((i % 255) as u8); } - async { + tokio::time::timeout(Duration::from_secs(30), async { while { self.pub_sub.publisher.put(data.clone()); !wait_async_fn(|| self.cycles.load(Relaxed) > 0, Duration::from_secs(5)).await } { debug!("Restarting ping-pong!"); } - } - .timeout(Duration::from_secs(30)) + }) .await .is_ok() } @@ -323,7 +321,7 @@ impl PingPong { self.cycles.store(0, Relaxed); while !(result > 0.0 || duration >= 30000) { - async_std::task::sleep(Duration::from_millis(duration_milliseconds)).await; + tokio::time::sleep(Duration::from_millis(duration_milliseconds)).await; duration += duration_milliseconds; result += self.cycles.load(Relaxed) as f64; } @@ -350,7 +348,7 @@ struct TestEnvironment { _ros_env: ROSEnvironment, } impl TestEnvironment { - pub fn new() -> TestEnvironment { + pub async fn new() -> TestEnvironment { let cfg = IsolatedConfig::default(); let roscfg = IsolatedROSMaster::default(); @@ -366,9 +364,9 @@ impl TestEnvironment { // this will wait for the bridge to have some expected initial state and serves two purposes: // - asserts on the expected state // - performs wait and ensures that everything is properly connected and negotiated within the bridge - async_std::task::block_on(bridge.assert_ros_ok()); - async_std::task::block_on(bridge.assert_bridge_empy()); - async_std::task::block_on(checker.assert_zenoh_peers(1)); + bridge.assert_ros_ok().await; + bridge.assert_bridge_empy().await; + checker.assert_zenoh_peers(1).await; TestEnvironment { bridge, @@ -472,134 +470,110 @@ async fn ping_pong_duplex_parallel_many_( env.assert_bridge_status_synchronized().await; } -#[test] -fn ping_pong_zenoh_to_ros1() { - let env = TestEnvironment::new(); - futures::executor::block_on(ping_pong_duplex_parallel_many_( - &env, - 1, - HashSet::from([Mode::ZenohToRos1]), - )); +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_zenoh_to_ros1() { + let env = TestEnvironment::new().await; + ping_pong_duplex_parallel_many_(&env, 1, HashSet::from([Mode::ZenohToRos1])).await; } -#[test] -fn ping_pong_zenoh_to_ros1_many() { - let env = TestEnvironment::new(); - futures::executor::block_on(ping_pong_duplex_parallel_many_( +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_zenoh_to_ros1_many() { + let env = TestEnvironment::new().await; + ping_pong_duplex_parallel_many_( &env, TestParams::many_count(), HashSet::from([Mode::ZenohToRos1]), - )); + ) + .await; } -#[test] -fn ping_pong_ros1_to_zenoh() { - let env = TestEnvironment::new(); - futures::executor::block_on(ping_pong_duplex_parallel_many_( - &env, - 1, - HashSet::from([Mode::Ros1ToZenoh]), - )); +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_ros1_to_zenoh() { + let env = TestEnvironment::new().await; + ping_pong_duplex_parallel_many_(&env, 1, HashSet::from([Mode::Ros1ToZenoh])).await; } -#[test] -fn ping_pong_ros1_to_zenoh_many() { - let env = TestEnvironment::new(); - futures::executor::block_on(ping_pong_duplex_parallel_many_( +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_ros1_to_zenoh_many() { + let env = TestEnvironment::new().await; + ping_pong_duplex_parallel_many_( &env, TestParams::many_count(), HashSet::from([Mode::Ros1ToZenoh]), - )); + ) + .await; } -#[test] -fn ping_pong_ros1_service() { - let env = TestEnvironment::new(); - futures::executor::block_on(ping_pong_duplex_parallel_many_( - &env, - 1, - HashSet::from([Mode::Ros1Service]), - )); +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_ros1_service() { + let env = TestEnvironment::new().await; + ping_pong_duplex_parallel_many_(&env, 1, HashSet::from([Mode::Ros1Service])).await; } -#[test] -fn ping_pong_ros1_service_many() { - let env = TestEnvironment::new(); - futures::executor::block_on(ping_pong_duplex_parallel_many_( +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_ros1_service_many() { + let env = TestEnvironment::new().await; + ping_pong_duplex_parallel_many_( &env, TestParams::many_count(), HashSet::from([Mode::Ros1Service]), - )); + ) + .await; } -#[test] -fn ping_pong_ros1_client() { - let env = TestEnvironment::new(); - futures::executor::block_on(ping_pong_duplex_parallel_many_( - &env, - 1, - HashSet::from([Mode::Ros1Client]), - )); +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_ros1_client() { + let env = TestEnvironment::new().await; + ping_pong_duplex_parallel_many_(&env, 1, HashSet::from([Mode::Ros1Client])).await; } -#[test] -fn ping_pong_ros1_client_many() { - let env = TestEnvironment::new(); - futures::executor::block_on(ping_pong_duplex_parallel_many_( +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_ros1_client_many() { + let env = TestEnvironment::new().await; + ping_pong_duplex_parallel_many_( &env, TestParams::many_count(), HashSet::from([Mode::Ros1Client]), - )); + ) + .await; } -#[test] -fn ping_pong_all_sequential() { - let env = TestEnvironment::new(); - futures::executor::block_on(ping_pong_duplex_parallel_many_( - &env, - 1, - HashSet::from([Mode::ZenohToRos1]), - )); - futures::executor::block_on(ping_pong_duplex_parallel_many_( - &env, - 1, - HashSet::from([Mode::Ros1ToZenoh]), - )); - futures::executor::block_on(ping_pong_duplex_parallel_many_( - &env, - 1, - HashSet::from([Mode::Ros1Service]), - )); - futures::executor::block_on(ping_pong_duplex_parallel_many_( - &env, - 1, - HashSet::from([Mode::Ros1Client]), - )); +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_all_sequential() { + let env = TestEnvironment::new().await; + ping_pong_duplex_parallel_many_(&env, 1, HashSet::from([Mode::ZenohToRos1])).await; + ping_pong_duplex_parallel_many_(&env, 1, HashSet::from([Mode::Ros1ToZenoh])).await; + ping_pong_duplex_parallel_many_(&env, 1, HashSet::from([Mode::Ros1Service])).await; + ping_pong_duplex_parallel_many_(&env, 1, HashSet::from([Mode::Ros1Client])).await; } -#[test] -fn ping_pong_all_sequential_many() { - let env = TestEnvironment::new(); - futures::executor::block_on(ping_pong_duplex_parallel_many_( +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_all_sequential_many() { + let env = TestEnvironment::new().await; + ping_pong_duplex_parallel_many_( &env, TestParams::many_count(), HashSet::from([Mode::ZenohToRos1]), - )); - futures::executor::block_on(ping_pong_duplex_parallel_many_( + ) + .await; + ping_pong_duplex_parallel_many_( &env, TestParams::many_count(), HashSet::from([Mode::Ros1ToZenoh]), - )); - futures::executor::block_on(ping_pong_duplex_parallel_many_( + ) + .await; + ping_pong_duplex_parallel_many_( &env, TestParams::many_count(), HashSet::from([Mode::Ros1Service]), - )); - futures::executor::block_on(ping_pong_duplex_parallel_many_( + ) + .await; + ping_pong_duplex_parallel_many_( &env, TestParams::many_count(), HashSet::from([Mode::Ros1Client]), - )); + ) + .await; } -#[test] -fn ping_pong_all_parallel() { - let env = TestEnvironment::new(); +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_all_parallel() { + let env = TestEnvironment::new().await; futures::executor::block_on(ping_pong_duplex_parallel_many_( &env, 1, @@ -612,9 +586,9 @@ fn ping_pong_all_parallel() { )); } -#[test] -fn ping_pong_all_parallel_many() { - let env = TestEnvironment::new(); +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_all_parallel_many() { + let env = TestEnvironment::new().await; futures::executor::block_on(ping_pong_duplex_parallel_many_( &env, TestParams::many_count(), @@ -669,21 +643,21 @@ async fn parallel_subworks( } futures::future::join_all(subworks).await; } -#[test] -fn ping_pong_all_overlap_one() { - let env = TestEnvironment::new(); +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_all_overlap_one() { + let env = TestEnvironment::new().await; let main_work_finished = Arc::new(AtomicBool::new(false)); let main_work = main_work(&env, main_work_finished.clone()); let parallel_subworks = parallel_subworks(&env, main_work_finished, 1); - async_std::task::block_on(futures::future::join(main_work, parallel_subworks)); + futures::future::join(main_work, parallel_subworks).await; } -#[test] -fn ping_pong_all_overlap_many() { - let env = TestEnvironment::new(); +#[tokio::test(flavor = "multi_thread")] +async fn ping_pong_all_overlap_many() { + let env = TestEnvironment::new().await; let main_work_finished = Arc::new(AtomicBool::new(false)); let main_work = main_work(&env, main_work_finished.clone()); let parallel_subworks = parallel_subworks(&env, main_work_finished, 10); - async_std::task::block_on(futures::future::join(main_work, parallel_subworks)); + futures::future::join(main_work, parallel_subworks).await; } diff --git a/zenoh-plugin-ros1/tests/rosmaster_test.rs b/zenoh-plugin-ros1/tests/rosmaster_test.rs index 62b204a..e6196cc 100644 --- a/zenoh-plugin-ros1/tests/rosmaster_test.rs +++ b/zenoh-plugin-ros1/tests/rosmaster_test.rs @@ -17,27 +17,23 @@ use std::time::Duration; use serial_test::serial; use zenoh_plugin_ros1::ros_to_zenoh_bridge::ros1_master_ctrl::Ros1MasterCtrl; -#[test] +#[tokio::test(flavor = "multi_thread")] #[serial(ROS1)] -fn start_and_stop_master() { - async_std::task::block_on(async { - Ros1MasterCtrl::with_ros1_master() - .await - .expect("Error starting rosmaster"); +async fn start_and_stop_master() { + Ros1MasterCtrl::with_ros1_master() + .await + .expect("Error starting rosmaster"); - Ros1MasterCtrl::without_ros1_master().await; - }); + Ros1MasterCtrl::without_ros1_master().await; } -#[test] +#[tokio::test(flavor = "multi_thread")] #[serial(ROS1)] -fn start_and_stop_master_and_check_connectivity() { +async fn start_and_stop_master_and_check_connectivity() { // start rosmaster - async_std::task::block_on(async { - Ros1MasterCtrl::with_ros1_master() - .await - .expect("Error starting rosmaster"); - }); + Ros1MasterCtrl::with_ros1_master() + .await + .expect("Error starting rosmaster"); // start ros1 client let ros1_client = rosrust::api::Ros::new_raw( @@ -59,9 +55,7 @@ fn start_and_stop_master_and_check_connectivity() { } // stop rosmaster - async_std::task::block_on(async { - Ros1MasterCtrl::without_ros1_master().await; - }); + Ros1MasterCtrl::without_ros1_master().await; // check if there was a status from rosmaster... if !has_rosmaster {