Skip to content

Commit

Permalink
Move to updated version of roslibrust_serde_rosmsg so that we can avo…
Browse files Browse the repository at this point in the history
…id a lot of byte shuffling
  • Loading branch information
carter committed Dec 4, 2024
1 parent db0b43d commit 13f9ecd
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 47 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion roslibrust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ roslibrust_codegen_macro = { path = "../roslibrust_codegen_macro", version = "0.
roslibrust_codegen = { path = "../roslibrust_codegen", version = "0.11.1" }
reqwest = { version = "0.11", optional = true } # Only used with native ros1
serde_xmlrpc = { version = "0.2", optional = true } # Only used with native ros1
roslibrust_serde_rosmsg = { version = "0.3", optional = true } # Only used with native ros1
roslibrust_serde_rosmsg = { version = "0.4", optional = true } # Only used with native ros1
hyper = { version = "0.14", features = [
"server",
], optional = true } # Only used with native ros1
Expand Down
2 changes: 1 addition & 1 deletion roslibrust_zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ roslibrust_codegen = { path = "../roslibrust_codegen" }
zenoh = "1.0"
hex = "0.4"
anyhow = "1.0"
roslibrust_serde_rosmsg = "0.3"
roslibrust_serde_rosmsg = "0.4"
log = "0.4"

[dev-dependencies]
Expand Down
71 changes: 29 additions & 42 deletions roslibrust_zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ pub struct ZenohPublisher<T> {

impl<T: RosMessageType> Publish<T> for ZenohPublisher<T> {
async fn publish(&self, data: &T) -> RosLibRustResult<()> {
let bytes = roslibrust_serde_rosmsg::to_vec(data).map_err(|e| {
let bytes = roslibrust_serde_rosmsg::to_vec_skip_length(data).map_err(|e| {
RosLibRustError::SerializationError(format!("Failed to serialize message: {e:?}"))
})?;

// Note: serde_rosmsg places the length of the message as the first four bytes
// Which zenoh ros1 bridge does not expect, so we need to strip it off.
match self.publisher.put(&bytes[4..]).await {
match self.publisher.put(&bytes).await {
Ok(()) => Ok(()),
Err(e) => Err(RosLibRustError::Unexpected(anyhow::anyhow!(
"Failed to publish message to zenoh: {e:?}"
Expand Down Expand Up @@ -63,16 +61,12 @@ impl<T: RosMessageType> Subscribe<T> for ZenohSubscriber<T> {
};

let bytes = sample.payload().to_bytes();

// This is messy roslibrust expects the starting bytes to be total message size
// which Zenoh or the bridge is stripping somewhere, so I'm just manually sticking them back for now
// This is very inefficient, but it works for now.
let starting_bytes = (bytes.len() as u32).to_le_bytes();
let bytes = [&starting_bytes, &bytes[..]].concat();

let msg = roslibrust_serde_rosmsg::from_slice(&bytes).map_err(|e| {
RosLibRustError::SerializationError(format!("Failed to deserialize sample: {e:?}"))
})?;
// Note: Zenoh decided to not make the 4 byte length header part of the payload
// So we use the known length version of the deserialization
let msg = roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32)
.map_err(|e| {
RosLibRustError::SerializationError(format!("Failed to deserialize sample: {e:?}"))
})?;
Ok(msg)
}
}
Expand Down Expand Up @@ -157,16 +151,16 @@ pub struct ZenohServiceClient<T: RosServiceType> {

impl<T: RosServiceType> Service<T> for ZenohServiceClient<T> {
async fn call(&self, request: &T::Request) -> RosLibRustResult<T::Response> {
let request_bytes = roslibrust_serde_rosmsg::to_vec(request).map_err(|e| {
// Note: Zenoh decided the 4 byte length header is not part of the payload
let request_bytes = roslibrust_serde_rosmsg::to_vec_skip_length(request).map_err(|e| {
RosLibRustError::SerializationError(format!("Failed to serialize message: {e:?}"))
})?;
debug!("request bytes: {request_bytes:?}");

let query = match self
.session
.get(&self.zenoh_query)
.payload(&request_bytes[4..])
// .timeout(tokio::time::Duration::from_secs(1))
.payload(&request_bytes)
.await
{
Ok(query) => query,
Expand Down Expand Up @@ -199,15 +193,11 @@ impl<T: RosServiceType> Service<T> for ZenohServiceClient<T> {

let bytes = sample.payload().to_bytes();

// This is messy roslibrust expects the starting bytes to be total message size
// which Zenoh or the bridge is stripping somewhere, so I'm just manually sticking them back for now
// This is very inefficient, but it works for now.
let starting_bytes = (bytes.len() as u32).to_le_bytes();
let bytes = [&starting_bytes, &bytes[..]].concat();

let msg = roslibrust_serde_rosmsg::from_slice(&bytes).map_err(|e| {
RosLibRustError::SerializationError(format!("Failed to deserialize sample: {e:?}"))
})?;
// Note: Zenoh decided to not make the 4 byte length header part of the payload
let msg = roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32)
.map_err(|e| {
RosLibRustError::SerializationError(format!("Failed to deserialize sample: {e:?}"))
})?;
Ok(msg)
}
}
Expand Down Expand Up @@ -275,15 +265,14 @@ impl ServiceProvider for ZenohClient {
};
let bytes = payload.to_bytes();
debug!("Got bytes: {bytes:?}");
// TODO MAJOR HACK HERE STILL
// Our deserialization still expects the first four bytes to be the total message size
// So we're just going to manually add the bytes back in
let starting_bytes = (bytes.len() as u32).to_le_bytes();
let bytes = [&starting_bytes, &bytes[..]].concat();

let Ok(request) = roslibrust_serde_rosmsg::from_slice(&bytes).map_err(|e| {
error!("Failed to deserialize request: {e:?}");
}) else {

// Note: Zenoh decided the 4 byte length header is not part of the payload
let Ok(request) =
roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32)
.map_err(|e| {
error!("Failed to deserialize request: {e:?}");
})
else {
continue;
};

Expand All @@ -293,16 +282,14 @@ impl ServiceProvider for ZenohClient {
continue;
};

let Ok(response_bytes) = roslibrust_serde_rosmsg::to_vec(&response).map_err(|e| {
error!("Failed to serialize response: {e:?}");
}) else {
let Ok(response_bytes) = roslibrust_serde_rosmsg::to_vec_skip_length(&response)
.map_err(|e| {
error!("Failed to serialize response: {e:?}");
})
else {
continue;
};

// TODO HACK HERE STILL
// Zenoh doesn't want the first four bytes that are the overall message size:
let response_bytes = &response_bytes[4..];

let _ = query
.reply(query.key_expr(), response_bytes)
.await
Expand Down

0 comments on commit 13f9ecd

Please sign in to comment.