Skip to content

Commit

Permalink
Merge pull request #170 from Carter12s/fix-service-servers-with-rosse…
Browse files Browse the repository at this point in the history
…rvice

Fix service servers with rosservice
  • Loading branch information
Carter12s authored Jul 5, 2024
2 parents 410e56f + fd1f875 commit 8a03431
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 9 deletions.
3 changes: 3 additions & 0 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,9 @@ impl Node {
self.host_addr,
service.clone(),
self.node_name.clone(),
service_type.to_string(),
md5sum.to_string(),
srv_definition.to_string(),
)
.await?;
let port = link.port();
Expand Down
37 changes: 30 additions & 7 deletions roslibrust/src/ros1/service_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl ServiceServerLink {
host_addr: Ipv4Addr,
service_name: Name,
node_name: Name,
service_type: String, // name of the message type e.g. "std_srvs/Trigger"
md5sum: String, // md5sum of the service message type
srv_definition: String, // Full text of the service message type definition
) -> Result<Self, std::io::Error> {
// TODO A lot of this is duplicated with publisher
// We could probably move chunks into tcpros.rs and re-use
Expand All @@ -83,7 +86,15 @@ impl ServiceServerLink {
.port();
let service_name_copy = service_name.to_string();

let task = tokio::spawn(Self::actor(tcp_listener, service_name, node_name, method));
let task = tokio::spawn(Self::actor(
tcp_listener,
service_name,
node_name,
method,
service_type,
md5sum,
srv_definition,
));

Ok(Self {
_child_task: task.into(),
Expand All @@ -105,6 +116,9 @@ impl ServiceServerLink {
service_name: Name, // Service path of the this service
node_name: Name, // Name of node we're running on
method: Box<TypeErasedCallback>,
service_type: String,
md5sum: String,
srv_definition: String,
) {
// We have to move our callback into an Arc so the separately spawned tasks for each service connection
// can access it in parrallel and not worry about the lifetime.
Expand All @@ -122,6 +136,9 @@ impl ServiceServerLink {
service_name.clone(),
node_name.clone(),
arc_method.clone(),
service_type.clone(),
md5sum.clone(),
srv_definition.clone(),
));
// Add spawned task to child task list to ensure dropping shuts down server
tasks.push(task.into());
Expand All @@ -143,6 +160,9 @@ impl ServiceServerLink {
service_name: Name,
node_name: Name,
method: Arc<Box<TypeErasedCallback>>,
service_type: String,
md5sum: String,
srv_definition: String,
) {
// TODO for a bunch of the error branches in this handling
// it is unclear whether we should respond over the socket
Expand Down Expand Up @@ -176,15 +196,14 @@ impl ServiceServerLink {
trace!("Got connection header: {connection_header:#?} for service {service_name}");

// Respond with our header
// TODO this is pretty cursed, may want a better version
let response_header = ConnectionHeader {
caller_id: node_name.to_string(),
latching: false,
msg_definition: "".to_string(),
md5sum: None,
service: None,
msg_definition: srv_definition,
md5sum: Some(md5sum),
service: Some(service_name.to_string()),
topic: None,
topic_type: "".to_string(),
topic_type: service_type.to_string(),
tcp_nodelay: false,
};
let bytes = response_header.to_bytes(false).unwrap();
Expand All @@ -201,7 +220,8 @@ impl ServiceServerLink {
loop {
let mut body_len_bytes = [0u8; 4];
if let Err(e) = stream.read_exact(&mut body_len_bytes).await {
warn!("Communication error while handling service request connection for {service_name}, could not get body length: {e:?}");
// Note: this was lowered to debug! from warn! because this is intentionally done by tools like `rosservice` to discover service type
debug!("Communication error while handling service request connection for {service_name}, could not get body length: {e:?}");
// TODO returning here simply closes the socket? Should we respond with an error instead?
return;
}
Expand Down Expand Up @@ -410,6 +430,9 @@ mod test {
let call = client
.call(&test_msgs::AddTwoIntsRequest { a: 1, b: 2 })
.await;
// Okay so this is logging the error message correctly, but the contents currently suck:
// "Got call: Err(IoError(Custom { kind: Other, error: "Failure response from service server: Custom { kind: NotFound, error: \"test message\" }" }))"
// We should someday clean up error types here, but frankly errors throughout the entire crate need an overhaul
debug!("Got call: {call:?}");
assert!(call.is_err());
}
Expand Down
15 changes: 13 additions & 2 deletions roslibrust/src/ros1/tcpros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,20 @@ impl ConnectionHeader {
let mut tcp_nodelay_str = String::new();
field[equals_pos + 1..].clone_into(&mut tcp_nodelay_str);
tcp_nodelay = &tcp_nodelay_str != "0";
} else if field.starts_with("probe=") {
// probe is apprantly an undocumented header field that is sent
// by certain ros tools when they initiate a service_client connection to a service server
// for the purpose of discovering the service type
// If you do `rosservice call /my_service` and hit TAB you'll see this field in the connection header
// we can ignore it
} else if field.starts_with("error=") {
log::error!("Error reported in TCPROS connection header: {field}");
} else {
log::warn!("Encountered unhandled field in connection header: {field}");
}
}

Ok(ConnectionHeader {
let header = ConnectionHeader {
caller_id,
latching,
msg_definition,
Expand All @@ -96,7 +102,12 @@ impl ConnectionHeader {
service,
topic_type,
tcp_nodelay,
})
};
trace!(
"Got connection header: {header:?} for topic {:?}",
header.topic
);
Ok(header)
}

/// Serializes the connection header to a byte array
Expand Down

0 comments on commit 8a03431

Please sign in to comment.