Skip to content

Commit

Permalink
Migrate to Zenoh 1.0
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <evshary@gmail.com>
  • Loading branch information
evshary committed Jun 17, 2024
1 parent de80c93 commit 6539741
Show file tree
Hide file tree
Showing 8 changed files with 369 additions and 263 deletions.
438 changes: 267 additions & 171 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ lazy_static = "1.4.0"
regex = "1.7.1"
rustc_version = "0.4"
serde = "1.0.154"
serde_json = "1.0.94"
serde_json = "1.0.114"
tracing = "0.1"
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [
"plugins",
"unstable",
] }
zenoh-collections = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" }
zenoh-core = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
zenoh-collections = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" }
zenoh-core = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [
"unstable",
] }
zenoh-plugin-dds = { version = "0.11.0-dev", path = "zenoh-plugin-dds/", default-features = false }
zenoh-plugin-rest = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false }
zenoh-plugin-trait = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false }
zenoh-util = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false }
zenoh-plugin-rest = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", default-features = false }
zenoh-plugin-trait = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", default-features = false }
zenoh-util = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", default-features = false }

[profile.release]
codegen-units = 1
Expand Down
10 changes: 6 additions & 4 deletions zenoh-bridge-dds/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use async_liveliness_monitor::LivelinessMonitor;
use clap::{App, Arg};
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use zenoh::config::{Config, ModeDependentValue};
use zenoh::plugins::PluginsManager;
use zenoh::prelude::r#async::*;
use zenoh::runtime::RuntimeBuilder;
use zenoh::{
config::{Config, ModeDependentValue},
info::ZenohId,
internal::{plugins::PluginsManager, runtime::RuntimeBuilder},
prelude::*,
};
use zenoh_plugin_dds::DDSPlugin;
use zenoh_plugin_trait::Plugin;

Expand Down
2 changes: 1 addition & 1 deletion zenoh-plugin-dds/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use serde::{de, Deserialize, Deserializer};
use std::env;
use std::fmt;
use std::time::Duration;
use zenoh::prelude::*;
use zenoh::key_expr::OwnedKeyExpr;

pub const DEFAULT_DOMAIN: u32 = 0;
pub const DEFAULT_FORWARD_DISCOVERY: bool = false;
Expand Down
17 changes: 10 additions & 7 deletions zenoh-plugin-dds/src/dds_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ use std::time::Duration;
use tracing::{debug, error, warn};
#[cfg(feature = "dds_shm")]
use zenoh::buffers::{ZBuf, ZSlice};
use zenoh::prelude::*;
use zenoh::publication::CongestionControl;
use zenoh::Session;
use zenoh_core::SyncResolve;
use zenoh::{
bytes::ZBytes,
key_expr::{KeyExpr, OwnedKeyExpr},
prelude::*,
publisher::CongestionControl,
Session,
};

const MAX_SAMPLES: usize = 32;

Expand Down Expand Up @@ -264,7 +267,7 @@ impl fmt::Debug for DDSRawSample {
}
}

impl From<DDSRawSample> for Value {
impl From<DDSRawSample> for ZBytes {
fn from(buf: DDSRawSample) -> Self {
#[cfg(feature = "dds_shm")]
{
Expand Down Expand Up @@ -506,7 +509,7 @@ unsafe extern "C" fn data_forwarder_listener(dr: dds_entity_t, arg: *mut std::os
.2
.put(&(*pa).1, raw_sample)
.congestion_control((*pa).3)
.res_sync();
.wait();
}
ddsi_serdata_unref(zp);
}
Expand Down Expand Up @@ -603,7 +606,7 @@ pub(crate) fn create_forwarding_dds_reader(
let _ = z
.put(&z_key, raw_sample)
.congestion_control(congestion_ctrl)
.res_sync();
.wait();
}
ddsi_serdata_unref(zp);
}
Expand Down
101 changes: 54 additions & 47 deletions zenoh-plugin-dds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,22 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, info, trace, warn};
use zenoh::liveliness::LivelinessToken;
use zenoh::plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin};
use zenoh::prelude::r#async::AsyncResolve;
use zenoh::prelude::r#sync::SyncResolve;
use zenoh::prelude::*;
use zenoh::publication::CongestionControl;
use zenoh::query::{ConsolidationMode, QueryTarget};
use zenoh::queryable::{Query, Queryable};
use zenoh::runtime::Runtime;
use zenoh::Result as ZResult;
use zenoh::Session;
use zenoh::{
bytes::ZBytes,
internal::{
plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
runtime::Runtime,
},
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
liveliness::LivelinessToken,
prelude::*,
publisher::CongestionControl,
query::{ConsolidationMode, Query, QueryTarget},
queryable::Queryable,
sample::{Locality, Sample, SampleKind},
selector::Selector,
Result as ZResult, Session,
};
use zenoh_core::zerror;
use zenoh_ext::{SessionExt, SubscriberBuilderExt};
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
Expand Down Expand Up @@ -70,7 +75,7 @@ macro_rules! ke_for_sure {

macro_rules! member_id {
($val:expr) => {
$val.key_expr.as_str().split('/').last().unwrap()
$val.key_expr().as_str().split('/').last().unwrap()
};
}

Expand Down Expand Up @@ -156,10 +161,9 @@ pub async fn run(runtime: Runtime, config: Config) {
debug!("DDS plugin {:?}", config);

// open zenoh-net Session
let zsession = match zenoh::init(runtime)
let zsession = match zenoh::session::init(runtime)
.aggregated_subscribers(config.generalise_subs.clone())
.aggregated_publishers(config.generalise_pubs.clone())
.res_async()
.await
{
Ok(session) => Arc::new(session),
Expand All @@ -170,14 +174,14 @@ pub async fn run(runtime: Runtime, config: Config) {
};

// create group member using the group_member_id if configured, or the Session ID otherwise
// TODO: Wait for the PR: https://github.com/eclipse-zenoh/zenoh/pull/1149
let member_id = match config.group_member_id {
Some(ref id) => id.clone(),
None => zsession.zid().into_keyexpr(),
None => zsession.zid().to_string().try_into().unwrap(),
};
let member = match zsession
.liveliness()
.declare_token(*KE_PREFIX_LIVELINESS_GROUP / &member_id)
.res_async()
.await
{
Ok(member) => member,
Expand Down Expand Up @@ -667,12 +671,15 @@ impl<'a> DdsPluginRuntime<'a> {
// send replies
for (ke, v) in kvs.drain(..) {
let admin_keyexpr = admin_keyexpr_prefix / &ke;
if let Err(e) = query
.reply(Ok(Sample::new(admin_keyexpr, v)))
.res_async()
.await
{
warn!("Error replying to admin query {:?}: {}", query, e);
match TryInto::<ZBytes>::try_into(v) {
Ok(payload) => {
if let Err(e) = query.reply(admin_keyexpr, payload).await {
warn!("Error replying to admin query {:?}: {}", query, e);
}
}
Err(e) => {
warn!("Error transforming JSON to admin query {:?}: {}", query, e);
}
}
}
}
Expand All @@ -684,7 +691,6 @@ impl<'a> DdsPluginRuntime<'a> {
.declare_subscriber(*KE_PREFIX_LIVELINESS_GROUP / *KE_ANY_N_SEGMENT)
.querying()
.with(flume::unbounded())
.res_async()
.await
.expect("Failed to create Liveliness Subscriber");

Expand All @@ -693,13 +699,14 @@ impl<'a> DdsPluginRuntime<'a> {
run_discovery(self.dp, tx);

// declare admin space queryable
let admin_keyexpr_prefix = *KE_PREFIX_ADMIN_SPACE / &self.zsession.zid().into_keyexpr();
// TODO: Wait for the PR: https://github.com/eclipse-zenoh/zenoh/pull/1149
let admin_keyexpr_prefix =
*KE_PREFIX_ADMIN_SPACE / keyexpr::new(&self.zsession.zid().to_string()).unwrap();
let admin_keyexpr_expr = (&admin_keyexpr_prefix) / *KE_ANY_N_SEGMENT;
debug!("Declare admin space on {}", admin_keyexpr_expr);
let admin_queryable = self
.zsession
.declare_queryable(admin_keyexpr_expr)
.res_async()
.await
.expect("Failed to create AdminSpace queryable");

Expand Down Expand Up @@ -907,7 +914,7 @@ impl<'a> DdsPluginRuntime<'a> {
},

group_event = group_subscriber.recv_async() => {
match group_event.as_ref().map(|s|s.kind) {
match group_event.as_ref().map(|s|s.kind()) {
Ok(SampleKind::Put) => {
let mid = member_id!(group_event.as_ref().unwrap());
debug!("New zenoh_dds_plugin detected: {}", mid);
Expand Down Expand Up @@ -951,7 +958,13 @@ impl<'a> DdsPluginRuntime<'a> {
// - ros_discovery_info on <KE_PREFIX_FWD_DISCO>/<uuid>/[<scope>]/ros_disco/<gid>
// The PublicationCache is declared on <KE_PREFIX_FWD_DISCO>/<uuid>/[<scope>]/**
// The QuerySubscriber is declared on <KE_PREFIX_FWD_DISCO>/*/[<scope>]/**
let uuid: OwnedKeyExpr = self.zsession.zid().into();
// TODO: Wait for the PR: https://github.com/eclipse-zenoh/zenoh/pull/1149
let uuid: OwnedKeyExpr = self
.zsession
.zid()
.to_string()
.try_into()
.expect("The zsession id can't be transformed into key expr");
let fwd_key_prefix = if let Some(scope) = &self.config.scope {
*KE_PREFIX_FWD_DISCO / &uuid / scope
} else {
Expand All @@ -971,19 +984,16 @@ impl<'a> DdsPluginRuntime<'a> {
let fwd_writers_key_prefix_key = self
.zsession
.declare_keyexpr(fwd_writers_key_prefix)
.res_async()
.await
.expect("Failed to declare key expression for Fwd Discovery of writers");
let fwd_readers_key_prefix_key = self
.zsession
.declare_keyexpr(fwd_readers_key_prefix)
.res_async()
.await
.expect("Failed to declare key expression for Fwd Discovery of readers");
let fwd_ros_discovery_key_declared = self
.zsession
.declare_keyexpr(&fwd_ros_discovery_key)
.res_async()
.await
.expect("Failed to declare key expression for Fwd Discovery of ros_discovery");

Expand All @@ -992,7 +1002,6 @@ impl<'a> DdsPluginRuntime<'a> {
.zsession
.declare_publication_cache(fwd_declare_publication_cache_key)
.queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers
.res_async()
.await
.expect("Failed to declare PublicationCache for Fwd Discovery");

Expand All @@ -1003,7 +1012,6 @@ impl<'a> DdsPluginRuntime<'a> {
.querying()
.allowed_origin(Locality::Remote) // Note: ignore my own publications
.query_timeout(self.config.queries_timeout)
.res_async()
.await
.expect("Failed to declare QueryingSubscriber for Fwd Discovery");

Expand Down Expand Up @@ -1040,7 +1048,7 @@ impl<'a> DdsPluginRuntime<'a> {
Ok(s) => s,
Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
};
if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).res_async().await {
if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).await {
error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
}

Expand All @@ -1055,7 +1063,7 @@ impl<'a> DdsPluginRuntime<'a> {
if let Some((admin_keyexpr, _)) = self.remove_dds_writer(&key) {
let fwd_ke = &fwd_writers_key_prefix_key / &admin_keyexpr;
// publish its deletion from admin space
if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).res_async().await {
if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).await {
error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
}
}
Expand Down Expand Up @@ -1100,7 +1108,7 @@ impl<'a> DdsPluginRuntime<'a> {
Ok(s) => s,
Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
};
if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).res_async().await {
if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).await {
error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
}

Expand All @@ -1115,7 +1123,7 @@ impl<'a> DdsPluginRuntime<'a> {
if let Some((admin_keyexpr, _)) = self.remove_dds_reader(&key) {
let fwd_ke = &fwd_readers_key_prefix_key / &admin_keyexpr;
// publish its deletion from admin space
if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).res_async().await {
if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).await {
error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
}
}
Expand Down Expand Up @@ -1161,7 +1169,7 @@ impl<'a> DdsPluginRuntime<'a> {

sample = fwd_disco_sub.recv_async() => {
let sample = sample.expect("Fwd Discovery subscriber was closed!");
let fwd_ke = &sample.key_expr;
let fwd_ke = &sample.key_expr();
debug!("Received forwarded discovery message on {}", fwd_ke);

// parse fwd_ke and extract the remote uuid, the discovery kind (reader|writer|ros_disco) and the remaining of the keyexpr
Expand All @@ -1171,9 +1179,9 @@ impl<'a> DdsPluginRuntime<'a> {
"writer" => {
// reconstruct full admin keyexpr for this entity (i.e. with it's remote plugin's uuid)
let full_admin_keyexpr = *KE_PREFIX_ADMIN_SPACE / remote_uuid / remaining_ke;
if sample.kind != SampleKind::Delete {
if sample.kind() != SampleKind::Delete {
// deserialize payload
let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload.contiguous()) {
let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload().into::<Vec<u8>>()) {
Ok(x) => x,
Err(e) => {
warn!("Failed to deserialize discovery msg for {}: {}", full_admin_keyexpr, e);
Expand Down Expand Up @@ -1251,9 +1259,9 @@ impl<'a> DdsPluginRuntime<'a> {
"reader" => {
// reconstruct full admin keyexpr for this entity (i.e. with it's remote plugin's uuid)
let full_admin_keyexpr = *KE_PREFIX_ADMIN_SPACE / remote_uuid / remaining_ke;
if sample.kind != SampleKind::Delete {
if sample.kind() != SampleKind::Delete {
// deserialize payload
let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload.contiguous()) {
let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload().into::<Vec<u8>>()) {
Ok(x) => x,
Err(e) => {
warn!("Failed to deserialize discovery msg for {}: {}", full_admin_keyexpr, e);
Expand Down Expand Up @@ -1328,7 +1336,7 @@ impl<'a> DdsPluginRuntime<'a> {
// it's a ros_discovery_info message
"ros_disco" => {
match cdr::deserialize_from::<_, ParticipantEntitiesInfo, _>(
&*sample.payload.contiguous(),
sample.payload().reader(),
cdr::size::Infinite,
) {
Ok(mut info) => {
Expand Down Expand Up @@ -1356,7 +1364,7 @@ impl<'a> DdsPluginRuntime<'a> {
},

group_event = group_subscriber.recv_async() => {
match group_event.as_ref().map(|s|s.kind) {
match group_event.as_ref().map(|s|s.kind()) {
Ok(SampleKind::Put) => {
let mid = member_id!(group_event.as_ref().unwrap());
debug!("New zenoh_dds_plugin detected: {}", mid);
Expand All @@ -1368,14 +1376,13 @@ impl<'a> DdsPluginRuntime<'a> {
};
debug!("Query past discovery messages from {} on {}", mid, key);
if let Err(e) = fwd_disco_sub.fetch( |cb| {
use zenoh_core::SyncResolve;
self.zsession.get(Selector::from(&key))
.callback(cb)
.target(QueryTarget::All)
.consolidation(ConsolidationMode::None)
.timeout(self.config.queries_timeout)
.res_sync()
}).res_async().await
.wait()
}).await
{
warn!("Query on {} for discovery messages failed: {}", key, e);
}
Expand Down Expand Up @@ -1458,7 +1465,7 @@ impl<'a> DdsPluginRuntime<'a> {
trace!("Received ros_discovery_info from DDS for {}, forward via zenoh: {}", gid, buf.hex_encode());
// forward the payload on zenoh
let ke = &fwd_ros_discovery_key_declared / ke_for_sure!(&gid);
if let Err(e) = self.zsession.put(ke, buf).res_sync() {
if let Err(e) = self.zsession.put(ke, buf).wait() {
error!("Forward ROS discovery info failed: {}", e);
}
}
Expand Down
Loading

0 comments on commit 6539741

Please sign in to comment.