Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to tokio #118

Merged
merged 3 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,11 @@ jobs:
run: cargo test --verbose
env:
CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse
ASYNC_STD_THREAD_COUNT: 4

- name: Run doctests
run: cargo test --doc
env:
CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse
ASYNC_STD_THREAD_COUNT: 4

# NOTE: In GitHub repository settings, the "Require status checks to pass
# before merging" branch protection rule ensures that commits are only merged
Expand Down
44 changes: 39 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ categories = ["network-programming"]

[workspace.dependencies]
atoi = "2.0.0"
async-std = "=1.12.0"
async-trait = "0.1"
clap = "3.2.23"
ctrlc = "3.2.5"
Expand All @@ -49,8 +48,11 @@ flume = "0.11"
hex = "0.4.3"
xml-rpc = "0.0.12"
rustc_version = "0.4"
test-case = { version = "3.3" }
tokio = { version = "1.35.1", features = ["process"] }
tracing = "0.1"
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [
"internal",
"unstable",
"plugins",
] }
Expand Down
12 changes: 12 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@
//// The string format is [0-9]+(ns|us|ms|[smhdwy])
////
// ros_master_polling_interval: "100ms",

////
//// work_thread_num: The number of worker thread in TOKIO runtime (default: 2)
//// The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime.
////
// work_thread_num: 2,

////
//// max_block_thread_num: The number of blocking thread in TOKIO runtime (default: 50)
//// The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime.
////
// max_block_thread_num: 50,
},

////
Expand Down
2 changes: 1 addition & 1 deletion zenoh-bridge-ros1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ description = "Zenoh bridge for ROS1"
publish = false

[dependencies]
async-std = { workspace = true, features = ["unstable", "attributes"] }
clap = { workspace = true }
ctrlc = { workspace = true }
tracing = { workspace = true }
lazy_static = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
zenoh = { workspace = true }
zenoh-plugin-trait = { workspace = true }
zenoh-plugin-ros1 = { workspace = true }
Expand Down
24 changes: 15 additions & 9 deletions zenoh-bridge-ros1/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
//
use std::str::FromStr;

use async_std::channel::unbounded;
use clap::{App, Arg};
use tokio::sync::mpsc::unbounded_channel;
use zenoh::{
config::{Config, ZenohId},
internal::{plugins::PluginsManager, runtime::RuntimeBuilder},
Expand Down Expand Up @@ -187,6 +187,16 @@ Bridge polls ROS1 master to get information on local topics. This option is the
Accepted value:'
A string such as 100ms, 2s, 5m
The string format is [0-9]+(ns|us|ms|[smhdwy])"#
))
.arg(Arg::from_usage(
r#"--work_thread_num=[usize] \
'The number of worker thread in TOKIO runtime (default: 2)
The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime.'"#
))
.arg(Arg::from_usage(
r#"--max_block_thread_num=[usize] \
'The number of blocking thread in TOKIO runtime (default: 50)
The configuration only takes effect if running as a dynamic plugin, which can not reuse the current runtime.'"#
));
let args = app.get_matches();

Expand Down Expand Up @@ -252,15 +262,11 @@ The string format is [0-9]+(ns|us|ms|[smhdwy])"#
config
}

#[async_std::main]
#[tokio::main]
async fn main() {
let (sender, receiver) = unbounded();
ctrlc::set_handler(move || {
sender
.send_blocking(())
.expect("Error handling Ctrl+C signal")
})
.expect("Error setting Ctrl+C handler");
let (sender, mut receiver) = unbounded_channel();
ctrlc::set_handler(move || sender.send(()).expect("Error handling Ctrl+C signal"))
.expect("Error setting Ctrl+C handler");

zenoh::init_log_from_env_or("z=info");
tracing::info!(
Expand Down
6 changes: 2 additions & 4 deletions zenoh-plugin-ros1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ flume = { workspace = true }
futures = { workspace = true }
git-version = { workspace = true }
lazy_static = { workspace = true }
test-case = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -63,10 +65,6 @@ ctrlc = { workspace = true }
# TODO: https://zettascale.atlassian.net/browse/ZEN-291
# zenoh-plugin-ros1 = { path = ".", features = ["test"]}

[dependencies.async-std]
version = "=1.12.0"
features = ["unstable", "attributes"]

[build-dependencies]
rustc_version = { workspace = true }

Expand Down
4 changes: 2 additions & 2 deletions zenoh-plugin-ros1/examples/ros1_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zenoh_plugin_ros1::ros_to_zenoh_bridge::{
environment::Environment, ros1_master_ctrl::Ros1MasterCtrl, Ros1ToZenohBridge,
};

#[async_std::main]
#[tokio::main]
async fn main() {
// initiate logging
zenoh::try_init_log_from_env();
Expand Down Expand Up @@ -80,5 +80,5 @@ async fn main() {
std::thread::sleep(core::time::Duration::from_secs(1));
}
};
async_std::task::spawn_blocking(working_loop).await;
tokio::task::spawn_blocking(working_loop).await.unwrap();
}
4 changes: 2 additions & 2 deletions zenoh-plugin-ros1/examples/ros1_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use zenoh_plugin_ros1::ros_to_zenoh_bridge::{
environment::Environment, ros1_master_ctrl::Ros1MasterCtrl, Ros1ToZenohBridge,
};

#[async_std::main]
#[tokio::main]
async fn main() {
// initiate logging
zenoh::try_init_log_from_env();
Expand Down Expand Up @@ -86,6 +86,6 @@ async fn main() {
println!("Zenoh got error: {}", e);
}
}
async_std::task::sleep(core::time::Duration::from_secs(1)).await;
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
}
}
12 changes: 5 additions & 7 deletions zenoh-plugin-ros1/examples/ros1_standalone_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use async_std::channel::unbounded;
use tokio::sync::mpsc::unbounded_channel;
use zenoh_plugin_ros1::ros_to_zenoh_bridge::environment::Environment;

#[async_std::main]
#[tokio::main]
async fn main() {
let (sender, receiver) = unbounded();
let (sender, mut receiver) = unbounded_channel();
ctrlc::set_handler(move || {
tracing::info!("Catching Ctrl+C...");
sender
.send_blocking(())
.expect("Error handling Ctrl+C signal")
sender.send(()).expect("Error handling Ctrl+C signal")
})
.expect("Error setting Ctrl+C handler");

Expand Down Expand Up @@ -59,5 +57,5 @@ async fn main() {
}
tracing::info!("Caught Ctrl+C, stopping...");
};
async_std::task::spawn_blocking(working_loop).await;
tokio::task::spawn_blocking(working_loop).await.unwrap();
}
10 changes: 4 additions & 6 deletions zenoh-plugin-ros1/examples/ros1_standalone_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use async_std::channel::unbounded;
use tokio::sync::mpsc::unbounded_channel;
use zenoh_plugin_ros1::ros_to_zenoh_bridge::environment::Environment;

#[async_std::main]
#[tokio::main]
async fn main() {
let (sender, receiver) = unbounded();
let (sender, mut receiver) = unbounded_channel();
ctrlc::set_handler(move || {
tracing::info!("Catching Ctrl+C...");
sender
.send_blocking(())
.expect("Error handling Ctrl+C signal")
sender.send(()).expect("Error handling Ctrl+C signal")
})
.expect("Error setting Ctrl+C handler");

Expand Down
4 changes: 2 additions & 2 deletions zenoh-plugin-ros1/examples/ros1_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zenoh_plugin_ros1::ros_to_zenoh_bridge::{
environment::Environment, ros1_master_ctrl::Ros1MasterCtrl, Ros1ToZenohBridge,
};

#[async_std::main]
#[tokio::main]
async fn main() {
// initiate logging
zenoh::try_init_log_from_env();
Expand Down Expand Up @@ -73,6 +73,6 @@ async fn main() {
loop {
println!("Zenoh Publisher: publishing data...");
zenoh_publisher.put(data.clone()).await.unwrap();
async_std::task::sleep(core::time::Duration::from_secs(1)).await;
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
}
}
Loading