diff --git a/docker/noetic_compose.yaml b/docker/noetic_compose.yaml index f768796b..73197cbd 100644 --- a/docker/noetic_compose.yaml +++ b/docker/noetic_compose.yaml @@ -1,6 +1,8 @@ services: rosbridge: image: carter12s/roslibrust-ci-noetic:rust-1-72 + # network_mode host required for ros1 testing + network_mode: host ports: - "9090:9090" # Pass through the ros master port for native ros1 testing diff --git a/roslibrust/src/ros1/node/actor.rs b/roslibrust/src/ros1/node/actor.rs index 64b0839e..8cbbf48c 100644 --- a/roslibrust/src/ros1/node/actor.rs +++ b/roslibrust/src/ros1/node/actor.rs @@ -3,12 +3,12 @@ use crate::{ names::Name, node::{XmlRpcServer, XmlRpcServerHandle}, publisher::Publication, - service_client::{CallServiceRequest, ServiceClientLink}, + service_client::ServiceClientLink, service_server::ServiceServerLink, subscriber::Subscription, - MasterClient, NodeError, ProtocolParams, + MasterClient, NodeError, ProtocolParams, ServiceClient, }, - RosLibRustError, RosLibRustResult, + RosLibRustError, }; use abort_on_drop::ChildTask; use log::warn; @@ -57,7 +57,7 @@ pub enum NodeMsg { md5sum: String, }, RegisterServiceClient { - reply: oneshot::Sender, String>>, + reply: oneshot::Sender>, service: Name, service_type: String, srv_definition: String, @@ -180,7 +180,7 @@ impl NodeServerHandle { pub async fn register_service_client( &self, service_name: &Name, - ) -> Result, NodeError> { + ) -> Result, NodeError> { // Create a channel for hooking into the node server let (sender, receiver) = oneshot::channel(); @@ -197,10 +197,13 @@ impl NodeServerHandle { })?; // Get a channel back from the node server for pushing requests into let received = receiver.await?; - Ok(received.map_err(|err| { + let link = received.map_err(|err| { log::error!("Failed to register service client: {err}"); NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted)) - })?) + })?; + let sender = link.get_sender(); + + Ok(ServiceClient::new(service_name, sender, link)) } pub async fn register_service_server( @@ -332,7 +335,12 @@ pub(crate) struct Node { // Record of subscriptions this node has subscriptions: HashMap, // Map of topic names to the service client handles for each topic - service_clients: HashMap, + // Note: decision made to not hold a list of service clients here, instead each call + // to register_service_client will create a new service client and return a sender to it + // Okay to have multiple service clients for the same service. + // Eventually, if we can also make ServiceClient clone() + // This should give better control of how disconnection and lifetimes work for a given client + // service_clients: HashMap, // Map of topic names to service server handles for each topic service_servers: HashMap, // TODO need signal to shutdown xmlrpc server when node is dropped @@ -366,7 +374,6 @@ impl Node { node_msg_rx: node_receiver, publishers: std::collections::HashMap::new(), subscriptions: std::collections::HashMap::new(), - service_clients: std::collections::HashMap::new(), service_servers: std::collections::HashMap::new(), host_addr: addr, hostname: hostname.to_owned(), @@ -651,60 +658,25 @@ impl Node { service_type: &str, srv_definition: &str, md5sum: &str, - ) -> Result, Box> { + ) -> Result> { log::debug!("Registering service client for {service}"); let service_name = service.resolve_to_global(&self.node_name).to_string(); - //TODO MAJOR: - // Okay design problem here - // Shane decided to hand out multiple copies of a tx to the same underlying ServiceLink - // when duplicate service clients are created. - // 1. That is just a design choice and its equally valid to A) make the ServiceClient Clone instead - // 2. We're not counting the # of handles that exist for a given link now, which means we don't know - // when we should shut down the service link - // What to do... - - let existing_entry = { - self.service_clients.iter().find_map(|(key, value)| { - if key.as_str() == &service_name { - if value.service_type() == service_type { - Some(Ok(value.get_sender())) - } else { - // TODO: Why is this AddrInUse? - // Is it in-use because we're double registering? - // Need better error message here - log::error!("Attempt to create two service clients to same service with two different types"); - Some(Err(Box::new(std::io::Error::from( - std::io::ErrorKind::AddrInUse, - )))) - } - } else { - None - } - }) - }; - - if let Some(handle) = existing_entry { - log::debug!("Found existing service client for {service}, returning existing handle"); - Ok(handle?) - } else { - log::debug!("Creating new service client for {service}"); - let service_uri = self.client.lookup_service(&service_name).await?; - log::debug!("Found service at {service_uri}"); - let server_link = ServiceClientLink::new( - &self.node_name, - &service_name, - service_type, - &service_uri, - srv_definition, - md5sum, - ) - .await?; + log::debug!("Creating new service client for {service}"); + let service_uri = self.client.lookup_service(&service_name).await?; + + log::debug!("Found service at {service_uri}"); + let server_link = ServiceClientLink::new( + &self.node_name, + &service_name, + service_type, + &service_uri, + srv_definition, + md5sum, + ) + .await?; - let handle = server_link.get_sender(); - self.service_clients.insert(service_name, server_link); - Ok(handle) - } + Ok(server_link) } /// Registers a type-erased server function with the NodeServer diff --git a/roslibrust/src/ros1/node/handle.rs b/roslibrust/src/ros1/node/handle.rs index 605af7dc..86f36201 100644 --- a/roslibrust/src/ros1/node/handle.rs +++ b/roslibrust/src/ros1/node/handle.rs @@ -1,10 +1,7 @@ use super::actor::{Node, NodeServerHandle}; -use crate::{ - ros1::{ - names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber, - NodeError, ServiceServer, - }, - RosLibRustResult, +use crate::ros1::{ + names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber, + NodeError, ServiceServer, }; /// Represents a handle to an underlying [Node]. NodeHandle's can be freely cloned, moved, copied, etc. @@ -90,7 +87,7 @@ impl NodeHandle { .inner .register_service_client::(&service_name) .await?; - Ok(ServiceClient::new(&service_name, sender)) + Ok(sender) } pub async fn advertise_service( @@ -125,13 +122,11 @@ impl NodeHandle { let copy = self.clone(); let name_copy = service_name.to_string(); tokio::spawn(async move { - let result = copy.inner.unadvertise_service(&name_copy).await; - if let Err(e) = result{ - log::error!("Failed to undvertise service: {e:?}"); - } + let result = copy.inner.unadvertise_service(&name_copy).await; + if let Err(e) = result { + log::error!("Failed to undvertise service: {e:?}"); + } }); Ok(()) } - - pub(crate) fn unadvertise_service_client(&self, ) } diff --git a/roslibrust/src/ros1/publisher.rs b/roslibrust/src/ros1/publisher.rs index 213350ec..6432584f 100644 --- a/roslibrust/src/ros1/publisher.rs +++ b/roslibrust/src/ros1/publisher.rs @@ -1,7 +1,4 @@ -use crate::{ - ros1::{names::Name, tcpros::ConnectionHeader}, - RosLibRustError, -}; +use crate::ros1::{names::Name, tcpros::ConnectionHeader}; use abort_on_drop::ChildTask; use roslibrust_codegen::RosMessageType; use std::{ diff --git a/roslibrust/src/ros1/service_client.rs b/roslibrust/src/ros1/service_client.rs index 2a26f9e3..4a04dac1 100644 --- a/roslibrust/src/ros1/service_client.rs +++ b/roslibrust/src/ros1/service_client.rs @@ -7,7 +7,7 @@ use crate::{ }; use abort_on_drop::ChildTask; use roslibrust_codegen::RosServiceType; -use std::marker::PhantomData; +use std::{marker::PhantomData, sync::Arc}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::TcpStream, @@ -17,31 +17,33 @@ use tokio::{ }, }; -use super::NodeHandle; - pub type CallServiceRequest = (Vec, oneshot::Sender); pub type CallServiceResponse = RosLibRustResult>; +// Note: ServiceClient is clone, and this is expressly different behavior than calling .service_client() twice on NodeHandle +// clonning a ServiceClient does not create a new connection to the service, but instead creates a second handle to the +// same underlying service client. +#[derive(Clone)] pub struct ServiceClient { service_name: Name, sender: mpsc::UnboundedSender, _phantom: PhantomData, - node_handle: NodeHandle, + // A given copy of a service client is actually just a handle to an underlying actor + // When the last ServiceClient is dropped this will shut down the underlying actor and TCP connection + _link: Arc, } impl ServiceClient { - pub fn new( + pub(crate) fn new( service_name: &Name, sender: mpsc::UnboundedSender, - // Need a node handle to underlying server so we can clean up - // after ourselves when dropped - node_handle: NodeHandle + link: ServiceClientLink, ) -> ServiceClient { Self { service_name: service_name.to_owned(), sender, _phantom: PhantomData, - node_handle, + _link: Arc::new(link), } } @@ -85,15 +87,7 @@ impl ServiceClient { } } -impl Drop for ServiceClient { - fn drop(&mut self) { - // TODO Major, dropping ServiceClient needs to clean up link from NodeServer - self.node_handle.unadvertise_service_client(); - } -} - pub struct ServiceClientLink { - service_type: String, call_sender: mpsc::UnboundedSender, _actor_task: ChildTask<()>, } @@ -132,7 +126,6 @@ impl ServiceClientLink { let handle = tokio::spawn(actor_context); Ok(Self { - service_type: service_type.to_owned(), call_sender: call_tx, _actor_task: handle.into(), }) @@ -142,10 +135,6 @@ impl ServiceClientLink { self.call_sender.clone() } - pub fn service_type(&self) -> &str { - &self.service_type - } - async fn actor_context( mut stream: TcpStream, service_name: String, @@ -239,10 +228,7 @@ impl ServiceClientLink { mod test { use log::info; - use crate::{ - ros1::{NodeError, NodeHandle}, - RosLibRustError, - }; + use crate::ros1::{NodeError, NodeHandle}; roslibrust_codegen_macro::find_and_generate_ros_messages!( "assets/ros1_test_msgs", diff --git a/roslibrust/src/ros1/service_server.rs b/roslibrust/src/ros1/service_server.rs index f04d9643..37399c46 100644 --- a/roslibrust/src/ros1/service_server.rs +++ b/roslibrust/src/ros1/service_server.rs @@ -262,7 +262,7 @@ mod test { async fn basic_service_server() { const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); debug!("Getting node handle"); - let nh = NodeHandle::new("http://localhost:11311", "basic_service_server") + let nh = NodeHandle::new("http://localhost:11311", "/basic_service_server") .await .unwrap(); @@ -276,7 +276,10 @@ mod test { // Create the server debug!("Creating server"); let _handle = nh - .advertise_service::("basic_service_server", server_fn) + .advertise_service::( + "/basic_service_server/add_two", + server_fn, + ) .await .unwrap(); @@ -286,7 +289,7 @@ mod test { TIMEOUT, timeout( TIMEOUT, - nh.service_client::("basic_service_server"), + nh.service_client::("/basic_service_server/add_two"), ) .await .unwrap() @@ -304,7 +307,7 @@ mod test { #[test_log::test(tokio::test)] async fn dropping_service_server_kill_correctly() { debug!("Getting node handle"); - let nh = NodeHandle::new("http://localhost:11311", "dropping_service_node") + let nh = NodeHandle::new("http://localhost:11311", "/dropping_service_node") .await .unwrap(); @@ -333,8 +336,8 @@ mod test { // Shut down the server std::mem::drop(handle); - // Wait for shut down to process - tokio::time::sleep(tokio::time::Duration::from_millis(250)).await; + // Wait a little bit for server shut down to process + tokio::time::sleep(std::time::Duration::from_millis(250)).await; // Make the request again (should fail) let call_2 = client @@ -346,18 +349,27 @@ mod test { "Shouldn't be able to call after server is shut down" ); - // Drop our client (should expunge the storage of the client from the NodeServer) - std::mem::drop(client); + // Create a new clinet let client = nh .service_client::("~/add_two") .await; - // Client should fail to create as there should be no provider of the service assert!( client.is_err(), "Shouldn't be able to connect again (no provider of service)" ); - // TODO as an extra test here we could make a rosapi call + // Confirm ros master no longer reports our service as provided (via rosapi for fun) + let rosapi_client = nh + .service_client::("/rosapi/services") + .await + .unwrap(); + let service_list: rosapi::ServicesResponse = rosapi_client + .call(&rosapi::ServicesRequest {}) + .await + .unwrap(); + assert!(!service_list + .services + .contains(&"/dropping_service_node/add_two".to_string())); } } diff --git a/roslibrust/src/ros1/tcpros.rs b/roslibrust/src/ros1/tcpros.rs index ce5b8103..f274a631 100644 --- a/roslibrust/src/ros1/tcpros.rs +++ b/roslibrust/src/ros1/tcpros.rs @@ -192,7 +192,7 @@ pub async fn establish_connection( let mut responded_header_bytes = Vec::with_capacity(header_len); let bytes = stream.read_buf(&mut responded_header_bytes).await?; - if let Ok(responded_header) = ConnectionHeader::from_bytes(&responded_header_bytes[..bytes]) { + if let Ok(_responded_header) = ConnectionHeader::from_bytes(&responded_header_bytes[..bytes]) { // TODO we should really examine this md5sum logic... // according to the ROS documentation, the service isn't required to respond // with anything other than caller_id diff --git a/roslibrust/src/rosbridge/client.rs b/roslibrust/src/rosbridge/client.rs index 8e7963d7..7643695f 100644 --- a/roslibrust/src/rosbridge/client.rs +++ b/roslibrust/src/rosbridge/client.rs @@ -130,7 +130,6 @@ impl ClientHandle { .or_insert(Subscription { handles: HashMap::new(), topic_type: Msg::ROS_TYPE_NAME.to_string(), - known_publishers: vec![], }); // TODO Possible bug here? We send a subscribe message each time even if already subscribed diff --git a/roslibrust/src/rosbridge/mod.rs b/roslibrust/src/rosbridge/mod.rs index 62fb90e1..b21edea4 100644 --- a/roslibrust/src/rosbridge/mod.rs +++ b/roslibrust/src/rosbridge/mod.rs @@ -103,11 +103,6 @@ pub(crate) struct Subscription { pub(crate) handles: HashMap, /// Name of ros type (package_name/message_name), used for re-subscribes pub(crate) topic_type: String, - - // TODO consider specializing this type for ros1_native - // Will contain the list of publishers of this topic as told to us by rosmaster - // Currently only used / populated with ros1 native - pub(crate) known_publishers: Vec, } // TODO move out of rosbridge and into common