Skip to content

Commit

Permalink
Port from async_std to tokio. (#195)
Browse files Browse the repository at this point in the history
* Port from async_std to tokio.

Signed-off-by: ChenYing Kuo <evshary@gmail.com>

* Support configuring the number of thread for runtime.

Signed-off-by: ChenYing Kuo <evshary@gmail.com>

---------

Signed-off-by: ChenYing Kuo <evshary@gmail.com>
  • Loading branch information
evshary authored Aug 6, 2024
1 parent 8ae68aa commit 04eee11
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 22 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ version = "0.11.0-dev"

[workspace.dependencies]
async-liveliness-monitor = "0.1.1"
async-std = "=1.12.0"
async-trait = "0.1.66"
bincode = "1.3.3"
cdr = "0.2.4"
Expand All @@ -41,6 +40,7 @@ regex = "1.7.1"
rustc_version = "0.4"
serde = "1.0.154"
serde_json = "1.0.114"
tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements
tracing = "0.1"
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [
"internal", "unstable", "plugins"
Expand Down
3 changes: 2 additions & 1 deletion zenoh-bridge-ros2dds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ publish = false
dds_shm = ["zenoh-plugin-ros2dds/dds_shm"]

[dependencies]
async-std = { workspace = true, features = ["unstable", "attributes"] }
async-liveliness-monitor = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
futures = { workspace = true }
lazy_static = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
zenoh = { workspace = true }
zenoh-plugin-rest = { workspace = true }
Expand Down
23 changes: 16 additions & 7 deletions zenoh-bridge-ros2dds/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn parse_args() -> (Option<f32>, Config) {
(watchdog_opt, config)
}

#[async_std::main]
#[tokio::main]
async fn main() {
zenoh::init_log_from_env_or("z=info");
tracing::info!(
Expand Down Expand Up @@ -112,7 +112,7 @@ async fn main() {
std::process::exit(-1);
}

async_std::future::pending::<()>().await;
futures::future::pending::<()>().await;
}

fn run_watchdog(period: f32) {
Expand All @@ -130,8 +130,8 @@ fn run_watchdog(period: f32) {
report_threshold_2.as_secs_f32()
);

// Start a Liveliness Monitor thread for async_std Runtime
let (_task, monitor) = LivelinessMonitor::start(async_std::task::spawn);
// Start a Liveliness Monitor thread for tokio Runtime
let (_task, monitor) = LivelinessMonitor::start(tokio::task::spawn);
std::thread::spawn(move || {
tracing::debug!(
"Watchdog started with period {} sec",
Expand All @@ -153,11 +153,20 @@ fn run_watchdog(period: f32) {
let report = monitor.latest_report();
if report.elapsed() > report_threshold_1 {
if report.elapsed() > sleep_time {
tracing::error!("Watchdog detecting async_std is stalled! No task scheduling since {} seconds", report.elapsed().as_secs_f32());
tracing::error!(
"Watchdog detecting tokio is stalled! No task scheduling since {} seconds",
report.elapsed().as_secs_f32()
);
} else if report.elapsed() > report_threshold_2 {
tracing::warn!("Watchdog detecting async_std was not scheduling tasks during the last {} ms", report.elapsed().as_micros());
tracing::warn!(
"Watchdog detecting tokio was not scheduling tasks during the last {} ms",
report.elapsed().as_micros()
);
} else {
tracing::info!("Watchdog detecting async_std was not scheduling tasks during the last {} ms", report.elapsed().as_micros());
tracing::info!(
"Watchdog detecting tokio was not scheduling tasks during the last {} ms",
report.elapsed().as_micros()
);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion zenoh-plugin-ros2dds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ dynamic_plugin = []
dds_shm = ["cyclors/iceoryx"]

[dependencies]
async-std = { workspace = true, features = ["unstable", "attributes"] }
async-trait = { workspace = true }
bincode = { workspace = true }
cdr = { workspace = true }
Expand All @@ -47,6 +46,7 @@ lazy_static = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
zenoh = { workspace = true }
zenoh-ext = { workspace = true }
Expand Down
14 changes: 14 additions & 0 deletions zenoh-plugin-ros2dds/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub const DEFAULT_RELIABLE_ROUTES_BLOCKING: bool = true;
pub const DEFAULT_TRANSIENT_LOCAL_CACHE_MULTIPLIER: usize = 10;
pub const DEFAULT_DDS_LOCALHOST_ONLY: bool = false;
pub const DEFAULT_QUERIES_TIMEOUT: f32 = 5.0;
pub const DEFAULT_WORK_THREAD_NUM: usize = 2;
pub const DEFAULT_MAX_BLOCK_THREAD_NUM: usize = 50;

#[derive(Deserialize, Debug, Serialize)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -59,6 +61,10 @@ pub struct Config {
serialize_with = "serialize_vec_regex_prio"
)]
pub pub_priorities: Vec<(Regex, Priority)>,
#[serde(default = "default_work_thread_num")]
pub work_thread_num: usize,
#[serde(default = "default_max_block_thread_num")]
pub max_block_thread_num: usize,
__required__: Option<bool>,
#[serde(default, deserialize_with = "deserialize_path")]
__path__: Option<Vec<String>>,
Expand Down Expand Up @@ -468,6 +474,14 @@ fn default_transient_local_cache_multiplier() -> usize {
DEFAULT_TRANSIENT_LOCAL_CACHE_MULTIPLIER
}

fn default_work_thread_num() -> usize {
DEFAULT_WORK_THREAD_NUM
}

fn default_max_block_thread_num() -> usize {
DEFAULT_MAX_BLOCK_THREAD_NUM
}

fn serialize_regex<S>(r: &Option<Regex>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
Expand Down
4 changes: 2 additions & 2 deletions zenoh-plugin-ros2dds/src/dds_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use std::{
time::Duration,
};

use async_std::task;
use cyclors::{
qos::{History, HistoryKind, Qos},
*,
};
use serde::Serializer;
use tokio::task;

use crate::{
dds_types::{DDSRawSample, TypeInfo},
Expand Down Expand Up @@ -332,7 +332,7 @@ where
break;
}

async_std::task::sleep(period).await;
tokio::time::sleep(period).await;
let mut zp: *mut ddsi_serdata = std::ptr::null_mut();
#[allow(clippy::uninit_assumed_init)]
let mut si = MaybeUninit::<[dds_sample_info_t; 1]>::uninit();
Expand Down
8 changes: 3 additions & 5 deletions zenoh-plugin-ros2dds/src/discovery_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use std::{
time::Duration,
};

use async_std::task;
use cyclors::dds_entity_t;
use flume::{unbounded, Receiver, Sender};
use futures::select;
use futures::{executor::block_on, select};
use tokio::task;
use zenoh::{
internal::{zread, zwrite, TimedEvent, Timer},
key_expr::keyexpr,
Expand Down Expand Up @@ -141,8 +141,6 @@ impl DiscoveryMgr {
// pass query to discovered_entities
let discovered_entities = zread!(self.discovered_entities);
// TODO: find a better solution than block_on()
async_std::task::block_on(
discovered_entities.treat_admin_query(query, admin_keyexpr_prefix),
);
block_on(discovered_entities.treat_admin_query(query, admin_keyexpr_prefix))
}
}
48 changes: 46 additions & 2 deletions zenoh-plugin-ros2dds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,24 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{collections::HashMap, env, mem::ManuallyDrop, sync::Arc};
use std::{
collections::HashMap,
env,
future::Future,
mem::ManuallyDrop,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};

use async_trait::async_trait;
use cyclors::*;
use events::ROS2AnnouncementEvent;
use flume::{unbounded, Receiver, Sender};
use futures::select;
use serde::Serializer;
use tokio::task::JoinHandle;
use zenoh::{
bytes::{Encoding, ZBytes},
internal::{
Expand Down Expand Up @@ -66,6 +76,36 @@ use crate::{
liveliness_mgt::*, ros_discovery::RosDiscoveryInfoMgr, routes_mgr::RoutesMgr,
};

lazy_static::lazy_static! {
static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
// The global runtime is used in the dynamic plugins, which we can't get the current runtime
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
.max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
.enable_all()
.build()
.expect("Unable to create runtime");
}
#[inline(always)]
pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
// Check whether able to get the current runtime
match tokio::runtime::Handle::try_current() {
Ok(rt) => {
// Able to get the current runtime (standalone binary), use the current runtime
rt.spawn(task)
}
Err(_) => {
// Unable to get the current runtime (dynamic plugins), reuse the global runtime
TOKIO_RUNTIME.spawn(task)
}
}
}

lazy_static::lazy_static!(


Expand Down Expand Up @@ -126,7 +166,11 @@ impl Plugin for ROS2Plugin {
.ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
let config: Config = serde_json::from_value(plugin_conf.clone())
.map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
async_std::task::spawn(run(runtime.clone(), config));
WORK_THREAD_NUM.store(config.work_thread_num, Ordering::SeqCst);
MAX_BLOCK_THREAD_NUM.store(config.max_block_thread_num, Ordering::SeqCst);

spawn_runtime(run(runtime.clone(), config));

Ok(Box::new(ROS2Plugin))
}
}
Expand Down
2 changes: 1 addition & 1 deletion zenoh-plugin-ros2dds/src/ros_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{
time::Duration,
};

use async_std::task;
use cdr::{CdrLe, Infinite};
use cyclors::{
qos::{Durability, History, IgnoreLocal, IgnoreLocalKind, Qos, Reliability, DDS_INFINITE_TIME},
Expand All @@ -16,6 +15,7 @@ use cyclors::{
use flume::{unbounded, Receiver, Sender};
use futures::select;
use serde::{ser::SerializeSeq, Deserialize, Deserializer, Serialize, Serializer};
use tokio::task;
use zenoh::{
bytes::ZBytes,
internal::{zwrite, TimedEvent, Timer},
Expand Down

0 comments on commit 04eee11

Please sign in to comment.