Skip to content

Commit

Permalink
Merge pull request #168 from Carter12s/fix-service-server-shutdown
Browse files Browse the repository at this point in the history
Fix Service Servers Shutdown Cleanly
  • Loading branch information
Carter12s authored Jul 5, 2024
2 parents 2b339fb + e8b6835 commit 913dcb1
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 89 deletions.
2 changes: 2 additions & 0 deletions docker/noetic_compose.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
services:
rosbridge:
image: carter12s/roslibrust-ci-noetic:rust-1-72
# network_mode host required for ros1 testing
network_mode: host
ports:
- "9090:9090"
# Pass through the ros master port for native ros1 testing
Expand Down
130 changes: 80 additions & 50 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use crate::{
names::Name,
node::{XmlRpcServer, XmlRpcServerHandle},
publisher::Publication,
service_client::{CallServiceRequest, ServiceClientLink},
service_client::ServiceClientLink,
service_server::ServiceServerLink,
subscriber::Subscription,
MasterClient, NodeError, ProtocolParams,
MasterClient, NodeError, ProtocolParams, ServiceClient,
},
RosLibRustError, RosLibRustResult,
RosLibRustError,
};
use abort_on_drop::ChildTask;
use log::warn;
Expand Down Expand Up @@ -57,7 +57,7 @@ pub enum NodeMsg {
md5sum: String,
},
RegisterServiceClient {
reply: oneshot::Sender<Result<mpsc::UnboundedSender<CallServiceRequest>, String>>,
reply: oneshot::Sender<Result<ServiceClientLink, String>>,
service: Name,
service_type: String,
srv_definition: String,
Expand All @@ -75,6 +75,10 @@ pub enum NodeMsg {
>,
md5sum: String,
},
UnregisterServiceServer {
reply: oneshot::Sender<Result<(), String>>,
service_name: String,
},
RequestTopic {
reply: oneshot::Sender<Result<ProtocolParams, String>>,
caller_id: String,
Expand Down Expand Up @@ -176,7 +180,7 @@ impl NodeServerHandle {
pub async fn register_service_client<T: RosServiceType>(
&self,
service_name: &Name,
) -> Result<mpsc::UnboundedSender<CallServiceRequest>, NodeError> {
) -> Result<ServiceClient<T>, NodeError> {
// Create a channel for hooking into the node server
let (sender, receiver) = oneshot::channel();

Expand All @@ -193,10 +197,13 @@ impl NodeServerHandle {
})?;
// Get a channel back from the node server for pushing requests into
let received = receiver.await?;
Ok(received.map_err(|err| {
let link = received.map_err(|err| {
log::error!("Failed to register service client: {err}");
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})?)
})?;
let sender = link.get_sender();

Ok(ServiceClient::new(service_name, sender, link))
}

pub async fn register_service_server<T, F>(
Expand Down Expand Up @@ -244,6 +251,22 @@ impl NodeServerHandle {
})?)
}

/// Called to remove a service server
/// Delegates to the NodeServer via channel
pub async fn unadvertise_service(&self, service_name: &str) -> Result<(), NodeError> {
let (tx, rx) = oneshot::channel();
log::debug!("Queuing unregister service server command for: {service_name:?}");
self.node_server_sender
.send(NodeMsg::UnregisterServiceServer {
reply: tx,
service_name: service_name.to_string(),
})?;
rx.await?.map_err(|e| {
log::error!("Failed to unadvertise service server {service_name:?}: {e:?}");
NodeError::IoError(io::Error::from(io::ErrorKind::InvalidData))
})
}

/// Registers a subscription with the underlying node server
/// If this is the first time the given topic has been subscribed to (by this node)
/// rosmaster will be informed.
Expand Down Expand Up @@ -312,7 +335,12 @@ pub(crate) struct Node {
// Record of subscriptions this node has
subscriptions: HashMap<String, Subscription>,
// Map of topic names to the service client handles for each topic
service_clients: HashMap<String, ServiceClientLink>,
// Note: decision made to not hold a list of service clients here, instead each call
// to register_service_client will create a new service client and return a sender to it
// Okay to have multiple service clients for the same service.
// Eventually, if we can also make ServiceClient clone()
// This should give better control of how disconnection and lifetimes work for a given client
// service_clients: HashMap<String, ServiceClientLink>,
// Map of topic names to service server handles for each topic
service_servers: HashMap<String, ServiceServerLink>,
// TODO need signal to shutdown xmlrpc server when node is dropped
Expand Down Expand Up @@ -346,7 +374,6 @@ impl Node {
node_msg_rx: node_receiver,
publishers: std::collections::HashMap::new(),
subscriptions: std::collections::HashMap::new(),
service_clients: std::collections::HashMap::new(),
service_servers: std::collections::HashMap::new(),
host_addr: addr,
hostname: hostname.to_owned(),
Expand Down Expand Up @@ -491,6 +518,16 @@ impl Node {
.map_err(|err| err.to_string()),
);
}
NodeMsg::UnregisterServiceServer {
reply,
service_name,
} => {
let _ = reply.send(
self.unregister_service_server(&service_name)
.await
.map_err(|err| err.to_string()),
);
}
NodeMsg::RequestTopic {
reply,
topic,
Expand Down Expand Up @@ -621,50 +658,25 @@ impl Node {
service_type: &str,
srv_definition: &str,
md5sum: &str,
) -> Result<mpsc::UnboundedSender<CallServiceRequest>, Box<dyn std::error::Error>> {
) -> Result<ServiceClientLink, Box<dyn std::error::Error>> {
log::debug!("Registering service client for {service}");
let service_name = service.resolve_to_global(&self.node_name).to_string();

let existing_entry = {
self.service_clients.iter().find_map(|(key, value)| {
if key.as_str() == &service_name {
if value.service_type() == service_type {
Some(Ok(value.get_sender()))
} else {
// TODO: Why is this AddrInUse?
// Is it in-use because we're double registering?
// Need better error message here
Some(Err(Box::new(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
))))
}
} else {
None
}
})
};

if let Some(handle) = existing_entry {
log::debug!("Found existing service client for {service}, returning existing handle");
Ok(handle?)
} else {
log::debug!("Creating new service client for {service}");
let service_uri = self.client.lookup_service(&service_name).await?;
log::debug!("Found service at {service_uri}");
let server_link = ServiceClientLink::new(
&self.node_name,
&service_name,
service_type,
&service_uri,
srv_definition,
md5sum,
)
.await?;
log::debug!("Creating new service client for {service}");
let service_uri = self.client.lookup_service(&service_name).await?;

log::debug!("Found service at {service_uri}");
let server_link = ServiceClientLink::new(
&self.node_name,
&service_name,
service_type,
&service_uri,
srv_definition,
md5sum,
)
.await?;

let handle = server_link.get_sender();
self.service_clients.insert(service_name, server_link);
Ok(handle)
}
Ok(server_link)
}

/// Registers a type-erased server function with the NodeServer
Expand Down Expand Up @@ -698,7 +710,7 @@ impl Node {
warn!("Existing service implementation for {service_type} found while registering service server. Previous implementation will be ejected");
*server_in_map = link;
} else {
self.service_servers.insert(service_type.to_owned(), link);
self.service_servers.insert(service.to_string(), link);
// This is the address that ros will find this specific service server link
let service_uri = format!("rosrpc://{}:{}", self.host_addr, port);

Expand All @@ -710,4 +722,22 @@ impl Node {

Ok(())
}

async fn unregister_service_server(
&mut self,
service_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
if let Some(service_link) = self.service_servers.remove(service_name) {
log::debug!("Removing service_link for: {service_name:?}");
// Inform rosmaster that we no longer provide this service
let uri = format!("rosrpc://{}:{}", self.host_addr, service_link.port());
self.client.unregister_service(service_name, uri).await?;
Ok(())
} else {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Attempt to unregister service that is not currently registered",
)));
}
}
}
31 changes: 24 additions & 7 deletions roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use super::actor::{Node, NodeServerHandle};
use crate::{
ros1::{
names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber,
NodeError, ServiceServer,
},
RosLibRustResult,
use crate::ros1::{
names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber,
NodeError, ServiceServer,
};

/// Represents a handle to an underlying [Node]. NodeHandle's can be freely cloned, moved, copied, etc.
Expand Down Expand Up @@ -90,7 +87,7 @@ impl NodeHandle {
.inner
.register_service_client::<T>(&service_name)
.await?;
Ok(ServiceClient::new(&service_name, sender))
Ok(sender)
}

pub async fn advertise_service<T, F>(
Expand All @@ -112,4 +109,24 @@ impl NodeHandle {
.await?;
Ok(ServiceServer::new(service_name, self.clone()))
}

/// Not intended to be called manually
/// Stops hosting the specified server.
/// This is automatically called when dropping the ServiceServer returned by [advertise_service]
pub(crate) fn unadvertise_service_server(&self, service_name: &str) -> Result<(), NodeError> {
// TODO should we be using Name as the type of service_name here?
// I don't love Name's API at the moment
// This function is intended to be called in a "Drop impl" which is non-async
// so we're wrapping in a task here.
// This should be fine due to the "cmd dispatch" that is the current communication mechanism with NodeServer
let copy = self.clone();
let name_copy = service_name.to_string();
tokio::spawn(async move {
let result = copy.inner.unadvertise_service(&name_copy).await;
if let Err(e) = result {
log::error!("Failed to undvertise service: {e:?}");
}
});
Ok(())
}
}
5 changes: 1 addition & 4 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::{
ros1::{names::Name, tcpros::ConnectionHeader},
RosLibRustError,
};
use crate::ros1::{names::Name, tcpros::ConnectionHeader};
use abort_on_drop::ChildTask;
use roslibrust_codegen::RosMessageType;
use std::{
Expand Down
24 changes: 12 additions & 12 deletions roslibrust/src/ros1/service_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};
use abort_on_drop::ChildTask;
use roslibrust_codegen::RosServiceType;
use std::marker::PhantomData;
use std::{marker::PhantomData, sync::Arc};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
Expand All @@ -20,21 +20,30 @@ use tokio::{
pub type CallServiceRequest = (Vec<u8>, oneshot::Sender<CallServiceResponse>);
pub type CallServiceResponse = RosLibRustResult<Vec<u8>>;

// Note: ServiceClient is clone, and this is expressly different behavior than calling .service_client() twice on NodeHandle
// clonning a ServiceClient does not create a new connection to the service, but instead creates a second handle to the
// same underlying service client.
#[derive(Clone)]
pub struct ServiceClient<T: RosServiceType> {
service_name: Name,
sender: mpsc::UnboundedSender<CallServiceRequest>,
_phantom: PhantomData<T>,
// A given copy of a service client is actually just a handle to an underlying actor
// When the last ServiceClient is dropped this will shut down the underlying actor and TCP connection
_link: Arc<ServiceClientLink>,
}

impl<T: RosServiceType> ServiceClient<T> {
pub fn new(
pub(crate) fn new(
service_name: &Name,
sender: mpsc::UnboundedSender<CallServiceRequest>,
link: ServiceClientLink,
) -> ServiceClient<T> {
Self {
service_name: service_name.to_owned(),
sender,
_phantom: PhantomData,
_link: Arc::new(link),
}
}

Expand Down Expand Up @@ -79,7 +88,6 @@ impl<T: RosServiceType> ServiceClient<T> {
}

pub struct ServiceClientLink {
service_type: String,
call_sender: mpsc::UnboundedSender<CallServiceRequest>,
_actor_task: ChildTask<()>,
}
Expand Down Expand Up @@ -118,7 +126,6 @@ impl ServiceClientLink {
let handle = tokio::spawn(actor_context);

Ok(Self {
service_type: service_type.to_owned(),
call_sender: call_tx,
_actor_task: handle.into(),
})
Expand All @@ -128,10 +135,6 @@ impl ServiceClientLink {
self.call_sender.clone()
}

pub fn service_type(&self) -> &str {
&self.service_type
}

async fn actor_context(
mut stream: TcpStream,
service_name: String,
Expand Down Expand Up @@ -225,10 +228,7 @@ impl ServiceClientLink {
mod test {
use log::info;

use crate::{
ros1::{NodeError, NodeHandle},
RosLibRustError,
};
use crate::ros1::{NodeError, NodeHandle};

roslibrust_codegen_macro::find_and_generate_ros_messages!(
"assets/ros1_test_msgs",
Expand Down
Loading

0 comments on commit 913dcb1

Please sign in to comment.