Skip to content

Commit

Permalink
Working service server dropping, working service client dropping
Browse files Browse the repository at this point in the history
  • Loading branch information
carter committed Jul 5, 2024
1 parent c9f48f5 commit e8b6835
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 119 deletions.
2 changes: 2 additions & 0 deletions docker/noetic_compose.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
services:
rosbridge:
image: carter12s/roslibrust-ci-noetic:rust-1-72
# network_mode host required for ros1 testing
network_mode: host
ports:
- "9090:9090"
# Pass through the ros master port for native ros1 testing
Expand Down
90 changes: 31 additions & 59 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use crate::{
names::Name,
node::{XmlRpcServer, XmlRpcServerHandle},
publisher::Publication,
service_client::{CallServiceRequest, ServiceClientLink},
service_client::ServiceClientLink,
service_server::ServiceServerLink,
subscriber::Subscription,
MasterClient, NodeError, ProtocolParams,
MasterClient, NodeError, ProtocolParams, ServiceClient,
},
RosLibRustError, RosLibRustResult,
RosLibRustError,
};
use abort_on_drop::ChildTask;
use log::warn;
Expand Down Expand Up @@ -57,7 +57,7 @@ pub enum NodeMsg {
md5sum: String,
},
RegisterServiceClient {
reply: oneshot::Sender<Result<mpsc::UnboundedSender<CallServiceRequest>, String>>,
reply: oneshot::Sender<Result<ServiceClientLink, String>>,
service: Name,
service_type: String,
srv_definition: String,
Expand Down Expand Up @@ -180,7 +180,7 @@ impl NodeServerHandle {
pub async fn register_service_client<T: RosServiceType>(
&self,
service_name: &Name,
) -> Result<mpsc::UnboundedSender<CallServiceRequest>, NodeError> {
) -> Result<ServiceClient<T>, NodeError> {
// Create a channel for hooking into the node server
let (sender, receiver) = oneshot::channel();

Expand All @@ -197,10 +197,13 @@ impl NodeServerHandle {
})?;
// Get a channel back from the node server for pushing requests into
let received = receiver.await?;
Ok(received.map_err(|err| {
let link = received.map_err(|err| {
log::error!("Failed to register service client: {err}");
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})?)
})?;
let sender = link.get_sender();

Ok(ServiceClient::new(service_name, sender, link))
}

pub async fn register_service_server<T, F>(
Expand Down Expand Up @@ -332,7 +335,12 @@ pub(crate) struct Node {
// Record of subscriptions this node has
subscriptions: HashMap<String, Subscription>,
// Map of topic names to the service client handles for each topic
service_clients: HashMap<String, ServiceClientLink>,
// Note: decision made to not hold a list of service clients here, instead each call
// to register_service_client will create a new service client and return a sender to it
// Okay to have multiple service clients for the same service.
// Eventually, if we can also make ServiceClient clone()
// This should give better control of how disconnection and lifetimes work for a given client
// service_clients: HashMap<String, ServiceClientLink>,
// Map of topic names to service server handles for each topic
service_servers: HashMap<String, ServiceServerLink>,
// TODO need signal to shutdown xmlrpc server when node is dropped
Expand Down Expand Up @@ -366,7 +374,6 @@ impl Node {
node_msg_rx: node_receiver,
publishers: std::collections::HashMap::new(),
subscriptions: std::collections::HashMap::new(),
service_clients: std::collections::HashMap::new(),
service_servers: std::collections::HashMap::new(),
host_addr: addr,
hostname: hostname.to_owned(),
Expand Down Expand Up @@ -651,60 +658,25 @@ impl Node {
service_type: &str,
srv_definition: &str,
md5sum: &str,
) -> Result<mpsc::UnboundedSender<CallServiceRequest>, Box<dyn std::error::Error>> {
) -> Result<ServiceClientLink, Box<dyn std::error::Error>> {
log::debug!("Registering service client for {service}");
let service_name = service.resolve_to_global(&self.node_name).to_string();

//TODO MAJOR:
// Okay design problem here
// Shane decided to hand out multiple copies of a tx to the same underlying ServiceLink
// when duplicate service clients are created.
// 1. That is just a design choice and its equally valid to A) make the ServiceClient Clone instead
// 2. We're not counting the # of handles that exist for a given link now, which means we don't know
// when we should shut down the service link
// What to do...

let existing_entry = {
self.service_clients.iter().find_map(|(key, value)| {
if key.as_str() == &service_name {
if value.service_type() == service_type {
Some(Ok(value.get_sender()))
} else {
// TODO: Why is this AddrInUse?
// Is it in-use because we're double registering?
// Need better error message here
log::error!("Attempt to create two service clients to same service with two different types");
Some(Err(Box::new(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
))))
}
} else {
None
}
})
};

if let Some(handle) = existing_entry {
log::debug!("Found existing service client for {service}, returning existing handle");
Ok(handle?)
} else {
log::debug!("Creating new service client for {service}");
let service_uri = self.client.lookup_service(&service_name).await?;
log::debug!("Found service at {service_uri}");
let server_link = ServiceClientLink::new(
&self.node_name,
&service_name,
service_type,
&service_uri,
srv_definition,
md5sum,
)
.await?;
log::debug!("Creating new service client for {service}");
let service_uri = self.client.lookup_service(&service_name).await?;

log::debug!("Found service at {service_uri}");
let server_link = ServiceClientLink::new(
&self.node_name,
&service_name,
service_type,
&service_uri,
srv_definition,
md5sum,
)
.await?;

let handle = server_link.get_sender();
self.service_clients.insert(service_name, server_link);
Ok(handle)
}
Ok(server_link)
}

/// Registers a type-erased server function with the NodeServer
Expand Down
21 changes: 8 additions & 13 deletions roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use super::actor::{Node, NodeServerHandle};
use crate::{
ros1::{
names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber,
NodeError, ServiceServer,
},
RosLibRustResult,
use crate::ros1::{
names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber,
NodeError, ServiceServer,
};

/// Represents a handle to an underlying [Node]. NodeHandle's can be freely cloned, moved, copied, etc.
Expand Down Expand Up @@ -90,7 +87,7 @@ impl NodeHandle {
.inner
.register_service_client::<T>(&service_name)
.await?;
Ok(ServiceClient::new(&service_name, sender))
Ok(sender)
}

pub async fn advertise_service<T, F>(
Expand Down Expand Up @@ -125,13 +122,11 @@ impl NodeHandle {
let copy = self.clone();
let name_copy = service_name.to_string();
tokio::spawn(async move {
let result = copy.inner.unadvertise_service(&name_copy).await;
if let Err(e) = result{
log::error!("Failed to undvertise service: {e:?}");
}
let result = copy.inner.unadvertise_service(&name_copy).await;
if let Err(e) = result {
log::error!("Failed to undvertise service: {e:?}");
}
});
Ok(())
}

pub(crate) fn unadvertise_service_client(&self, )
}
5 changes: 1 addition & 4 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::{
ros1::{names::Name, tcpros::ConnectionHeader},
RosLibRustError,
};
use crate::ros1::{names::Name, tcpros::ConnectionHeader};
use abort_on_drop::ChildTask;
use roslibrust_codegen::RosMessageType;
use std::{
Expand Down
38 changes: 12 additions & 26 deletions roslibrust/src/ros1/service_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};
use abort_on_drop::ChildTask;
use roslibrust_codegen::RosServiceType;
use std::marker::PhantomData;
use std::{marker::PhantomData, sync::Arc};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
Expand All @@ -17,31 +17,33 @@ use tokio::{
},
};

use super::NodeHandle;

pub type CallServiceRequest = (Vec<u8>, oneshot::Sender<CallServiceResponse>);
pub type CallServiceResponse = RosLibRustResult<Vec<u8>>;

// Note: ServiceClient is clone, and this is expressly different behavior than calling .service_client() twice on NodeHandle
// clonning a ServiceClient does not create a new connection to the service, but instead creates a second handle to the
// same underlying service client.
#[derive(Clone)]
pub struct ServiceClient<T: RosServiceType> {
service_name: Name,
sender: mpsc::UnboundedSender<CallServiceRequest>,
_phantom: PhantomData<T>,
node_handle: NodeHandle,
// A given copy of a service client is actually just a handle to an underlying actor
// When the last ServiceClient is dropped this will shut down the underlying actor and TCP connection
_link: Arc<ServiceClientLink>,
}

impl<T: RosServiceType> ServiceClient<T> {
pub fn new(
pub(crate) fn new(
service_name: &Name,
sender: mpsc::UnboundedSender<CallServiceRequest>,
// Need a node handle to underlying server so we can clean up
// after ourselves when dropped
node_handle: NodeHandle
link: ServiceClientLink,
) -> ServiceClient<T> {
Self {
service_name: service_name.to_owned(),
sender,
_phantom: PhantomData,
node_handle,
_link: Arc::new(link),
}
}

Expand Down Expand Up @@ -85,15 +87,7 @@ impl<T: RosServiceType> ServiceClient<T> {
}
}

impl<T: RosServiceType> Drop for ServiceClient<T> {
fn drop(&mut self) {
// TODO Major, dropping ServiceClient needs to clean up link from NodeServer
self.node_handle.unadvertise_service_client();
}
}

pub struct ServiceClientLink {
service_type: String,
call_sender: mpsc::UnboundedSender<CallServiceRequest>,
_actor_task: ChildTask<()>,
}
Expand Down Expand Up @@ -132,7 +126,6 @@ impl ServiceClientLink {
let handle = tokio::spawn(actor_context);

Ok(Self {
service_type: service_type.to_owned(),
call_sender: call_tx,
_actor_task: handle.into(),
})
Expand All @@ -142,10 +135,6 @@ impl ServiceClientLink {
self.call_sender.clone()
}

pub fn service_type(&self) -> &str {
&self.service_type
}

async fn actor_context(
mut stream: TcpStream,
service_name: String,
Expand Down Expand Up @@ -239,10 +228,7 @@ impl ServiceClientLink {
mod test {
use log::info;

use crate::{
ros1::{NodeError, NodeHandle},
RosLibRustError,
};
use crate::ros1::{NodeError, NodeHandle};

roslibrust_codegen_macro::find_and_generate_ros_messages!(
"assets/ros1_test_msgs",
Expand Down
32 changes: 22 additions & 10 deletions roslibrust/src/ros1/service_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ mod test {
async fn basic_service_server() {
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
debug!("Getting node handle");
let nh = NodeHandle::new("http://localhost:11311", "basic_service_server")
let nh = NodeHandle::new("http://localhost:11311", "/basic_service_server")
.await
.unwrap();

Expand All @@ -276,7 +276,10 @@ mod test {
// Create the server
debug!("Creating server");
let _handle = nh
.advertise_service::<test_msgs::AddTwoInts, _>("basic_service_server", server_fn)
.advertise_service::<test_msgs::AddTwoInts, _>(
"/basic_service_server/add_two",
server_fn,
)
.await
.unwrap();

Expand All @@ -286,7 +289,7 @@ mod test {
TIMEOUT,
timeout(
TIMEOUT,
nh.service_client::<test_msgs::AddTwoInts>("basic_service_server"),
nh.service_client::<test_msgs::AddTwoInts>("/basic_service_server/add_two"),
)
.await
.unwrap()
Expand All @@ -304,7 +307,7 @@ mod test {
#[test_log::test(tokio::test)]
async fn dropping_service_server_kill_correctly() {
debug!("Getting node handle");
let nh = NodeHandle::new("http://localhost:11311", "dropping_service_node")
let nh = NodeHandle::new("http://localhost:11311", "/dropping_service_node")
.await
.unwrap();

Expand Down Expand Up @@ -333,8 +336,8 @@ mod test {

// Shut down the server
std::mem::drop(handle);
// Wait for shut down to process
tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
// Wait a little bit for server shut down to process
tokio::time::sleep(std::time::Duration::from_millis(250)).await;

// Make the request again (should fail)
let call_2 = client
Expand All @@ -346,18 +349,27 @@ mod test {
"Shouldn't be able to call after server is shut down"
);

// Drop our client (should expunge the storage of the client from the NodeServer)
std::mem::drop(client);
// Create a new clinet
let client = nh
.service_client::<test_msgs::AddTwoInts>("~/add_two")
.await;

// Client should fail to create as there should be no provider of the service
assert!(
client.is_err(),
"Shouldn't be able to connect again (no provider of service)"
);

// TODO as an extra test here we could make a rosapi call
// Confirm ros master no longer reports our service as provided (via rosapi for fun)
let rosapi_client = nh
.service_client::<rosapi::Services>("/rosapi/services")
.await
.unwrap();
let service_list: rosapi::ServicesResponse = rosapi_client
.call(&rosapi::ServicesRequest {})
.await
.unwrap();
assert!(!service_list
.services
.contains(&"/dropping_service_node/add_two".to_string()));
}
}
2 changes: 1 addition & 1 deletion roslibrust/src/ros1/tcpros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ pub async fn establish_connection(

let mut responded_header_bytes = Vec::with_capacity(header_len);
let bytes = stream.read_buf(&mut responded_header_bytes).await?;
if let Ok(responded_header) = ConnectionHeader::from_bytes(&responded_header_bytes[..bytes]) {
if let Ok(_responded_header) = ConnectionHeader::from_bytes(&responded_header_bytes[..bytes]) {
// TODO we should really examine this md5sum logic...
// according to the ROS documentation, the service isn't required to respond
// with anything other than caller_id
Expand Down
Loading

0 comments on commit e8b6835

Please sign in to comment.