Skip to content

Commit

Permalink
Use new Serialization and zenoh-ext API (#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles-Schleich authored Oct 1, 2024
1 parent 7400036 commit 5acac6b
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 162 deletions.
250 changes: 129 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,18 @@ serde_json = "1.0.114"
tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements
tracing = "0.1"
zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
"internal",
"internal_config",
"unstable",
"plugins",
"unstable",
] }
zenoh-config = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false }
zenoh-ext = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
"unstable",
] }
zenoh-plugin-rest = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false, features=["static_plugin"]}
zenoh-plugin-ros2dds = { version = "1.0.0-dev", path = "zenoh-plugin-ros2dds/", default-features = false }
zenoh-plugin-rest = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false, features=["static_plugin"]}
zenoh-plugin-trait = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false }


[profile.release]
codegen-units = 1
debug = false
Expand Down
6 changes: 3 additions & 3 deletions zenoh-plugin-ros2dds/src/discovered_entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,10 @@ impl DiscoveredEntities {
match self.get_entity_json_value(entity_ref) {
Ok(Some(v)) => {
let admin_keyexpr = admin_keyexpr_prefix / key_expr;
match ZBytes::try_from(v) {
Ok(payload) => {
match serde_json::to_vec(&v) {
Ok(bytes) => {
if let Err(e) = query
.reply(admin_keyexpr, payload)
.reply(admin_keyexpr, ZBytes::from(bytes))
.encoding(Encoding::APPLICATION_JSON)
.await
{
Expand Down
8 changes: 4 additions & 4 deletions zenoh-plugin-ros2dds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ impl ROS2PluginRuntime {
async fn send_admin_reply(&self, query: &Query, key_expr: &keyexpr, admin_ref: &AdminRef) {
let z_bytes: ZBytes = match admin_ref {
AdminRef::Version => match serde_json::to_value(ROS2Plugin::PLUGIN_LONG_VERSION) {
Ok(v) => match ZBytes::try_from(v) {
Ok(value) => value,
Ok(v) => match serde_json::to_vec(&v) {
Ok(bytes) => ZBytes::from(bytes),
Err(e) => {
tracing::warn!("Error transforming JSON to ZBytes: {}", e);
return;
Expand All @@ -614,8 +614,8 @@ impl ROS2PluginRuntime {
}
},
AdminRef::Config => match serde_json::to_value(&*self.config) {
Ok(v) => match ZBytes::try_from(v) {
Ok(value) => value,
Ok(v) => match serde_json::to_vec(&v) {
Ok(bytes) => ZBytes::from(bytes),
Err(e) => {
tracing::warn!("Error transforming JSON to ZBytes: {}", e);
return;
Expand Down
52 changes: 31 additions & 21 deletions zenoh-plugin-ros2dds/src/ros2_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//

use std::{
collections::HashMap,
env::VarError,
sync::atomic::{AtomicU32, Ordering},
};
Expand Down Expand Up @@ -220,39 +219,50 @@ impl CddsRequestHeader {
}

pub fn as_attachment(&self) -> ZBytes {
let mut hashmap = HashMap::new();

// concat header + endianness flag
let mut buf = [0u8; 17];
buf[0..16].copy_from_slice(&self.header);
buf[16] = self.is_little_endian as u8;

hashmap.insert(ATTACHMENT_KEY_REQUEST_HEADER, buf);
ZBytes::from_iter(hashmap.iter())
let mut writer = ZBytes::writer();
writer.append(ZBytes::from(ATTACHMENT_KEY_REQUEST_HEADER));
writer.append(ZBytes::from(buf));
writer.finish()
}
}

impl TryFrom<&ZBytes> for CddsRequestHeader {
type Error = ZError;

fn try_from(value: &ZBytes) -> Result<Self, Self::Error> {
let hashmap: HashMap<[u8; 3], [u8; 17]> =
HashMap::from_iter(value.iter::<([u8; 3], [u8; 17])>().map(Result::unwrap));
match hashmap.get(&ATTACHMENT_KEY_REQUEST_HEADER) {
Some(buf) => {
if buf.len() == 17 {
let header: [u8; 16] = buf[0..16]
.try_into()
.expect("Shouldn't happen: buf is 17 bytes");
Ok(CddsRequestHeader {
header,
is_little_endian: buf[16] != 0,
})
} else {
bail!("Attachment 'header' is not 16 bytes: {buf:02x?}")
}
let bytes = value.to_bytes();

let header = match bytes.get(0..ATTACHMENT_KEY_REQUEST_HEADER.len()) {
Some(header) => header,
None => bail!("No 'key request header' bytes found in attachment"),
};

if header != ATTACHMENT_KEY_REQUEST_HEADER {
bail!(
"Initial {:?} bytes do not match ATTACHMENT_KEY_REQUEST_HEADER",
ATTACHMENT_KEY_REQUEST_HEADER.len()
)
}

if let Some(buf) = bytes.get(ATTACHMENT_KEY_REQUEST_HEADER.len()..) {
if buf.len() == 17 {
let header: [u8; 16] = buf[0..16]
.try_into()
.expect("Shouldn't happen: buf is 17 bytes");
Ok(CddsRequestHeader {
header,
is_little_endian: buf[16] != 0,
})
} else {
bail!("Attachment 'header' is not 16 bytes: {buf:02x?}")
}
None => bail!("No 'header' key found in Attachment"),
} else {
bail!("Could not Read Remaining Attachment Buffer")
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions zenoh-plugin-ros2dds/src/route_service_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ fn route_dds_request_to_zenoh(
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73

let z_bytes: ZBytes = sample.into();
let slice: ZSlice = z_bytes.into();
let slice: ZSlice = ZBuf::from(z_bytes).to_zslice();

// Decompose the slice into 3 sub-slices (4 bytes header, 16 bytes request_id and payload)
let (payload, request_id, header) = match (
Expand All @@ -376,6 +376,7 @@ fn route_dds_request_to_zenoh(

// route request buffer stripped from request_id
let mut zenoh_req_buf = ZBuf::empty();

zenoh_req_buf.push_zslice(header);
zenoh_req_buf.push_zslice(payload);

Expand Down Expand Up @@ -431,7 +432,7 @@ fn route_zenoh_reply_to_dds(
) {
match reply.result() {
Ok(sample) => {
let zenoh_rep_buf = sample.payload().into::<Vec<u8>>();
let zenoh_rep_buf = sample.payload().to_bytes();
if zenoh_rep_buf.len() < 4 || zenoh_rep_buf[1] > 1 {
tracing::warn!(
"{route_id}: received invalid reply from Zenoh for {request_id}: {zenoh_rep_buf:0x?}"
Expand Down
6 changes: 3 additions & 3 deletions zenoh-plugin-ros2dds/src/route_service_srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ fn route_zenoh_request_to_dds(
// if any and if long enoough it shall be the Request type encoded as CDR (including 4 bytes header)
let is_little_endian = match query.payload() {
Some(value) if value.len() > 4 => {
is_cdr_little_endian(value.into::<ZSlice>().as_ref()).unwrap_or(true)
is_cdr_little_endian(value.to_bytes().as_ref()).unwrap_or(true)
}
_ => true,
};
Expand All @@ -383,7 +383,7 @@ fn route_zenoh_request_to_dds(
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73
let dds_req_buf = if let Some(value) = query.payload() {
// The query comes with some payload. It's expected to be the Request type encoded as CDR (including 4 bytes header)
let zenoh_req_buf = value.into::<Vec<u8>>();
let zenoh_req_buf = value.to_bytes();
if zenoh_req_buf.len() < 4 || zenoh_req_buf[1] > 1 {
tracing::warn!("{route_id}: received invalid request: {zenoh_req_buf:0x?}");
return;
Expand Down Expand Up @@ -442,7 +442,7 @@ fn route_dds_reply_to_zenoh(
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73

let z_bytes: ZBytes = sample.into();
let slice: ZSlice = z_bytes.into();
let slice: ZSlice = ZBuf::from(z_bytes).to_zslice();

// Decompose the slice into 3 sub-slices (4 bytes header, 16 bytes request_id and payload)
let (payload, request_id, header) = match (
Expand Down
2 changes: 1 addition & 1 deletion zenoh-plugin-ros2dds/src/route_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ fn route_zenoh_message_to_dds(s: Sample, ros2_name: &str, data_writer: dds_entit
}

unsafe {
let bs = s.payload().into();
let bs = s.payload().to_bytes().to_vec();
// As per the Vec documentation (see https://doc.rust-lang.org/std/vec/struct.Vec.html#method.into_raw_parts)
// the only way to correctly releasing it is to create a vec using from_raw_parts
// and then have its destructor do the cleanup.
Expand Down
6 changes: 3 additions & 3 deletions zenoh-plugin-ros2dds/src/routes_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,10 +792,10 @@ impl<'a> RoutesMgr {
match self.get_entity_json_value(route_ref) {
Ok(Some(v)) => {
let admin_keyexpr = &self.admin_prefix / key_expr;
match ZBytes::try_from(v) {
Ok(payload) => {
match serde_json::to_vec(&v) {
Ok(bytes) => {
if let Err(e) = query
.reply(admin_keyexpr, payload)
.reply(admin_keyexpr, ZBytes::from(bytes))
.encoding(Encoding::APPLICATION_JSON)
.await
{
Expand Down

0 comments on commit 5acac6b

Please sign in to comment.