Skip to content

Commit

Permalink
Merge pull request #167 from Carter12s/fix-generic-subscription
Browse files Browse the repository at this point in the history
Fix rostopic echo not working because it doesn't provide md5sum
  • Loading branch information
Carter12s authored Jun 29, 2024
2 parents 8e56b24 + 9482557 commit 2b339fb
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 25 deletions.
2 changes: 1 addition & 1 deletion roslibrust/examples/ros1_talker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> Result<(), anyhow::Error> {
.map_err(|err| err)?;
let publisher = nh.advertise::<std_msgs::String>("/chatter", 1).await?;

for count in 0..10 {
for count in 0..50 {
publisher
.publish(&std_msgs::String {
data: format!("hello world from rust {count}"),
Expand Down
33 changes: 19 additions & 14 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Publication {
caller_id: node_name.to_string(),
latching,
msg_definition: msg_definition.to_owned(),
md5sum: md5sum.to_owned(),
md5sum: Some(md5sum.to_owned()),
topic: Some(topic_name.to_owned()),
topic_type: topic_type.to_owned(),
tcp_nodelay: false,
Expand All @@ -100,27 +100,32 @@ impl Publication {
ConnectionHeader::from_bytes(&connection_header[..bytes])
{
log::debug!(
"Received subscribe request for {:?} with md5sum {}",
"Received subscribe request for {:?} with md5sum {:?}",
connection_header.topic,
connection_header.md5sum
);
// I can't find documentation for this anywhere, but when using
// `rostopic hz` with one of our publishers I discovered that the rospy code sent "*" as the md5sum
// To indicate a "generic subscription"...
if connection_header.md5sum != "*" {
if connection_header.md5sum != responding_conn_header.md5sum {
log::warn!(
"Got subscribe request for {}, but md5sums do not match. Expected {}, received {}",
// I also discovered that `rostopic echo` does not send a md5sum (even thou ros documentation says its required)
if let Some(connection_md5sum) = connection_header.md5sum {
if connection_md5sum != "*" {
if let Some(local_md5sum) = &responding_conn_header.md5sum {
if connection_md5sum != *local_md5sum {
log::warn!(
"Got subscribe request for {}, but md5sums do not match. Expected {:?}, received {:?}",
topic_name,
responding_conn_header.md5sum,
connection_header.md5sum,
local_md5sum,
connection_md5sum,
);
// Close the TCP connection
stream
.shutdown()
.await
.expect("Unable to shutdown tcpstream");
continue;
// Close the TCP connection
stream
.shutdown()
.await
.expect("Unable to shutdown tcpstream");
continue;
}
}
}
}
// Write our own connection header in response
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/src/ros1/service_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl ServiceClientLink {
caller_id: node_name.to_string(),
latching: false,
msg_definition: srv_definition.to_owned(),
md5sum: md5sum.to_owned(),
md5sum: Some(md5sum.to_owned()),
// Note: using "topic" indicates a subscription
// using "service" indicates a service client
topic: None,
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/src/ros1/service_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl ServiceServerLink {
caller_id: node_name.to_string(),
latching: false,
msg_definition: "".to_string(),
md5sum: "".to_string(),
md5sum: None,
service: None,
topic: None,
topic_type: "".to_string(),
Expand Down
4 changes: 2 additions & 2 deletions roslibrust/src/ros1/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Subscription {
caller_id: node_name.to_string(),
latching: false,
msg_definition,
md5sum,
md5sum: Some(md5sum),
topic: Some(topic_name.to_owned()),
topic_type: topic_type.to_owned(),
tcp_nodelay: false,
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn establish_publisher_connection(
Ok(stream)
} else {
log::error!(
"Tried to subscribe to {}, but md5sums do not match. Expected {}, received {}",
"Tried to subscribe to {}, but md5sums do not match. Expected {:?}, received {:?}",
topic_name,
conn_header.md5sum,
responded_header.md5sum
Expand Down
16 changes: 10 additions & 6 deletions roslibrust/src/ros1/tcpros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct ConnectionHeader {
pub caller_id: String,
pub latching: bool, // TODO this field should be optional and None for service clients and servers
pub msg_definition: String, // TODO this should be optional and None for service clients and servers
pub md5sum: String,
pub md5sum: Option<String>,
// TODO we may want to distinguish between service and topic headers with different types?
pub service: Option<String>,
pub topic: Option<String>,
Expand All @@ -35,7 +35,7 @@ impl ConnectionHeader {
let mut msg_definition = String::new();
let mut caller_id = String::new();
let mut latching = false;
let mut md5sum = String::new();
let mut md5sum = None;
let mut topic = None;
let mut service = None;
let mut topic_type = String::new();
Expand Down Expand Up @@ -63,7 +63,9 @@ impl ConnectionHeader {
field[equals_pos + 1..].clone_into(&mut latching_str);
latching = &latching_str != "0";
} else if field.starts_with("md5sum=") {
field[equals_pos + 1..].clone_into(&mut md5sum);
let mut md5sum_str = String::new();
field[equals_pos + 1..].clone_into(&mut md5sum_str);
md5sum = Some(md5sum_str);
} else if field.starts_with("topic=") {
let mut topic_str = String::new();
field[equals_pos + 1..].clone_into(&mut topic_str);
Expand Down Expand Up @@ -113,9 +115,11 @@ impl ConnectionHeader {
header_data.write_u32::<LittleEndian>(latching_str.len() as u32)?;
header_data.write(latching_str.as_bytes())?;

let md5sum = format!("md5sum={}", self.md5sum);
header_data.write_u32::<LittleEndian>(md5sum.len() as u32)?;
header_data.write(md5sum.as_bytes())?;
if let Some(md5sum) = self.md5sum.as_ref() {
let md5sum = format!("md5sum={}", md5sum);
header_data.write_u32::<LittleEndian>(md5sum.len() as u32)?;
header_data.write(md5sum.as_bytes())?;
}

let msg_definition = format!("message_definition={}", self.msg_definition);
header_data.write_u32::<LittleEndian>(msg_definition.len() as u32)?;
Expand Down

0 comments on commit 2b339fb

Please sign in to comment.