Skip to content

Commit

Permalink
Migrate to tokio.
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <evshary@gmail.com>
  • Loading branch information
evshary committed Jul 30, 2024
1 parent 57651e2 commit 8ff45ce
Show file tree
Hide file tree
Showing 24 changed files with 443 additions and 611 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,11 @@ jobs:
run: cargo test --verbose
env:
CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse
ASYNC_STD_THREAD_COUNT: 4

- name: Run doctests
run: cargo test --doc
env:
CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse
ASYNC_STD_THREAD_COUNT: 4

# NOTE: In GitHub repository settings, the "Require status checks to pass
# before merging" branch protection rule ensures that commits are only merged
Expand Down
39 changes: 37 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ categories = ["network-programming"]

[workspace.dependencies]
atoi = "2.0.0"
async-std = "=1.12.0"
async-trait = "0.1"
clap = "3.2.23"
ctrlc = "3.2.5"
Expand All @@ -49,8 +48,11 @@ flume = "0.11"
hex = "0.4.3"
xml-rpc = "0.0.12"
rustc_version = "0.4"
test-case = { version = "3.3" }
tokio = { version = "1.35.1", features = ["process"] }
tracing = "0.1"
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [
"internal",
"unstable",
"plugins",
] }
Expand Down
2 changes: 1 addition & 1 deletion zenoh-bridge-ros1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ description = "Zenoh bridge for ROS1"
publish = false

[dependencies]
async-std = { workspace = true, features = ["unstable", "attributes"] }
clap = { workspace = true }
ctrlc = { workspace = true }
tracing = { workspace = true }
lazy_static = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
zenoh = { workspace = true }
zenoh-plugin-trait = { workspace = true }
zenoh-plugin-ros1 = { workspace = true }
Expand Down
14 changes: 5 additions & 9 deletions zenoh-bridge-ros1/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
//
use std::str::FromStr;

use async_std::channel::unbounded;
use clap::{App, Arg};
use tokio::sync::mpsc::unbounded_channel;
use zenoh::{
config::{Config, ZenohId},
internal::{plugins::PluginsManager, runtime::RuntimeBuilder},
Expand Down Expand Up @@ -252,15 +252,11 @@ The string format is [0-9]+(ns|us|ms|[smhdwy])"#
config
}

#[async_std::main]
#[tokio::main]
async fn main() {
let (sender, receiver) = unbounded();
ctrlc::set_handler(move || {
sender
.send_blocking(())
.expect("Error handling Ctrl+C signal")
})
.expect("Error setting Ctrl+C handler");
let (sender, mut receiver) = unbounded_channel();
ctrlc::set_handler(move || sender.send(()).expect("Error handling Ctrl+C signal"))
.expect("Error setting Ctrl+C handler");

zenoh::init_log_from_env_or("z=info");
tracing::info!(
Expand Down
6 changes: 2 additions & 4 deletions zenoh-plugin-ros1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ flume = { workspace = true }
futures = { workspace = true }
git-version = { workspace = true }
lazy_static = { workspace = true }
test-case = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -63,10 +65,6 @@ ctrlc = { workspace = true }
# TODO: https://zettascale.atlassian.net/browse/ZEN-291
# zenoh-plugin-ros1 = { path = ".", features = ["test"]}

[dependencies.async-std]
version = "=1.12.0"
features = ["unstable", "attributes"]

[build-dependencies]
rustc_version = { workspace = true }

Expand Down
4 changes: 2 additions & 2 deletions zenoh-plugin-ros1/examples/ros1_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zenoh_plugin_ros1::ros_to_zenoh_bridge::{
environment::Environment, ros1_master_ctrl::Ros1MasterCtrl, Ros1ToZenohBridge,
};

#[async_std::main]
#[tokio::main]
async fn main() {
// initiate logging
zenoh::try_init_log_from_env();
Expand Down Expand Up @@ -80,5 +80,5 @@ async fn main() {
std::thread::sleep(core::time::Duration::from_secs(1));
}
};
async_std::task::spawn_blocking(working_loop).await;
tokio::task::spawn_blocking(working_loop).await.unwrap();
}
4 changes: 2 additions & 2 deletions zenoh-plugin-ros1/examples/ros1_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use zenoh_plugin_ros1::ros_to_zenoh_bridge::{
environment::Environment, ros1_master_ctrl::Ros1MasterCtrl, Ros1ToZenohBridge,
};

#[async_std::main]
#[tokio::main]
async fn main() {
// initiate logging
zenoh::try_init_log_from_env();
Expand Down Expand Up @@ -86,6 +86,6 @@ async fn main() {
println!("Zenoh got error: {}", e);
}
}
async_std::task::sleep(core::time::Duration::from_secs(1)).await;
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
}
}
12 changes: 5 additions & 7 deletions zenoh-plugin-ros1/examples/ros1_standalone_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use async_std::channel::unbounded;
use tokio::sync::mpsc::unbounded_channel;
use zenoh_plugin_ros1::ros_to_zenoh_bridge::environment::Environment;

#[async_std::main]
#[tokio::main]
async fn main() {
let (sender, receiver) = unbounded();
let (sender, mut receiver) = unbounded_channel();
ctrlc::set_handler(move || {
tracing::info!("Catching Ctrl+C...");
sender
.send_blocking(())
.expect("Error handling Ctrl+C signal")
sender.send(()).expect("Error handling Ctrl+C signal")
})
.expect("Error setting Ctrl+C handler");

Expand Down Expand Up @@ -59,5 +57,5 @@ async fn main() {
}
tracing::info!("Caught Ctrl+C, stopping...");
};
async_std::task::spawn_blocking(working_loop).await;
tokio::task::spawn_blocking(working_loop).await.unwrap();
}
10 changes: 4 additions & 6 deletions zenoh-plugin-ros1/examples/ros1_standalone_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use async_std::channel::unbounded;
use tokio::sync::mpsc::unbounded_channel;
use zenoh_plugin_ros1::ros_to_zenoh_bridge::environment::Environment;

#[async_std::main]
#[tokio::main]
async fn main() {
let (sender, receiver) = unbounded();
let (sender, mut receiver) = unbounded_channel();
ctrlc::set_handler(move || {
tracing::info!("Catching Ctrl+C...");
sender
.send_blocking(())
.expect("Error handling Ctrl+C signal")
sender.send(()).expect("Error handling Ctrl+C signal")
})
.expect("Error setting Ctrl+C handler");

Expand Down
4 changes: 2 additions & 2 deletions zenoh-plugin-ros1/examples/ros1_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zenoh_plugin_ros1::ros_to_zenoh_bridge::{
environment::Environment, ros1_master_ctrl::Ros1MasterCtrl, Ros1ToZenohBridge,
};

#[async_std::main]
#[tokio::main]
async fn main() {
// initiate logging
zenoh::try_init_log_from_env();
Expand Down Expand Up @@ -73,6 +73,6 @@ async fn main() {
loop {
println!("Zenoh Publisher: publishing data...");
zenoh_publisher.put(data.clone()).await.unwrap();
async_std::task::sleep(core::time::Duration::from_secs(1)).await;
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
}
}
36 changes: 26 additions & 10 deletions zenoh-plugin-ros1/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginCont

pub mod ros_to_zenoh_bridge;

const WORKER_THREAD_NUM: usize = 2;
const MAX_BLOCK_THREAD_NUM: usize = 50;
lazy_static::lazy_static! {
pub(crate) static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORKER_THREAD_NUM)
.max_blocking_threads(MAX_BLOCK_THREAD_NUM)
.enable_all()
.build()
.expect("Unable to create runtime");
}

// The struct implementing the ZenohPlugin and ZenohPlugin traits
pub struct Ros1Plugin {}

Expand Down Expand Up @@ -87,22 +98,27 @@ impl RunningPluginTrait for Ros1PluginInstance {}
impl Drop for Ros1PluginInstance {
fn drop(&mut self) {
if Environment::with_rosmaster().get() {
async_std::task::block_on(Ros1MasterCtrl::without_ros1_master());
tokio::task::block_in_place(|| {
TOKIO_RUNTIME.block_on(Ros1MasterCtrl::without_ros1_master())
});
}
}
}
impl Ros1PluginInstance {
fn new(runtime: &Runtime) -> ZResult<Self> {
let bridge: ZResult<Ros1ToZenohBridge> = async_std::task::block_on(async {
if Environment::with_rosmaster().get() {
Ros1MasterCtrl::with_ros1_master().await?;
async_std::task::sleep(Duration::from_secs(1)).await;
}
let bridge: ZResult<Ros1ToZenohBridge> = tokio::task::block_in_place(|| {
TOKIO_RUNTIME.block_on(async {
if Environment::with_rosmaster().get() {
Ros1MasterCtrl::with_ros1_master().await?;
tokio::time::sleep(Duration::from_secs(1)).await;
}

// create a zenoh Session that shares the same Runtime as zenohd
let session = zenoh::session::init(runtime.clone()).await?.into_arc();
let bridge = ros_to_zenoh_bridge::Ros1ToZenohBridge::new_with_external_session(session);
Ok(bridge)
// create a zenoh Session that shares the same Runtime as zenohd
let session = zenoh::session::init(runtime.clone()).await?.into_arc();
let bridge =
ros_to_zenoh_bridge::Ros1ToZenohBridge::new_with_external_session(session);
Ok(bridge)
})
});

Ok(Self { _bridge: bridge? })
Expand Down
10 changes: 7 additions & 3 deletions zenoh-plugin-ros1/src/ros_to_zenoh_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ use std::sync::{
Arc,
};

use async_std::task::JoinHandle;
use tokio::task::JoinHandle;
use tracing::error;
use zenoh::{self, Result as ZResult, Session};

use self::{environment::Environment, ros1_to_zenoh_bridge_impl::work_cycle};
use crate::TOKIO_RUNTIME;

#[cfg(feature = "test")]
pub mod aloha_declaration;
Expand Down Expand Up @@ -95,7 +96,7 @@ impl Ros1ToZenohBridge {
let flag = Arc::new(AtomicBool::new(true));
Self {
flag: flag.clone(),
task_handle: Box::new(async_std::task::spawn(Self::run(session, flag))),
task_handle: Box::new(TOKIO_RUNTIME.spawn(Self::run(session, flag))),
}
}

Expand All @@ -120,7 +121,10 @@ impl Ros1ToZenohBridge {
}

async fn async_await(&mut self) {
self.task_handle.as_mut().await;
self.task_handle
.as_mut()
.await
.expect("Unable to complete the task");
}
}
impl Drop for Ros1ToZenohBridge {
Expand Down
Loading

0 comments on commit 8ff45ce

Please sign in to comment.