Skip to content

Commit

Permalink
Merge pull request #208 from RosLibRust/publisher-lags-correctly
Browse files Browse the repository at this point in the history
Working publisher using broadcast
  • Loading branch information
Carter12s authored Dec 2, 2024
2 parents 46f1b67 + 8fe15e4 commit 63a282b
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 180 deletions.
45 changes: 25 additions & 20 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub enum NodeMsg {
// This results in the node's task ending and the node being dropped.
Shutdown,
RegisterPublisher {
reply: oneshot::Sender<Result<mpsc::Sender<Vec<u8>>, String>>,
reply: oneshot::Sender<Result<(broadcast::Sender<Vec<u8>>, mpsc::Sender<()>), String>>,
topic: String,
topic_type: String,
queue_size: usize,
Expand Down Expand Up @@ -166,7 +166,7 @@ impl NodeServerHandle {
topic: &str,
queue_size: usize,
latching: bool,
) -> Result<mpsc::Sender<Vec<u8>>, NodeError> {
) -> Result<(broadcast::Sender<Vec<u8>>, mpsc::Sender<()>), NodeError> {
let (sender, receiver) = oneshot::channel();
self.node_server_sender.send(NodeMsg::RegisterPublisher {
reply: sender,
Expand All @@ -192,7 +192,7 @@ impl NodeServerHandle {
msg_definition: &str,
queue_size: usize,
latching: bool,
) -> Result<mpsc::Sender<Vec<u8>>, NodeError> {
) -> Result<(broadcast::Sender<Vec<u8>>, mpsc::Sender<()>), NodeError> {
let (sender, receiver) = oneshot::channel();

let md5sum;
Expand Down Expand Up @@ -693,38 +693,43 @@ impl Node {
msg_definition: String,
md5sum: String,
latching: bool,
) -> Result<mpsc::Sender<Vec<u8>>, NodeError> {
) -> Result<(broadcast::Sender<Vec<u8>>, mpsc::Sender<()>), NodeError> {
// Return handle to existing Publication if it exists
let existing_entry = {
self.publishers.iter().find_map(|(key, value)| {
if key.as_str() == &topic {
if value.topic_type() == topic_type {
if let Some(sender) = value.get_sender() {
return Some(Ok(sender));
}else{
// Edge case here
// The channel for the publication is closed, but publication hasn't been cleaned up yet
None
}
} else {
warn!("Attempted to register publisher with different topic type than existing publisher: existing_type={}, new_type={}", value.topic_type(), topic_type);
if key.as_str() != &topic {
return None;
}
if value.topic_type() != topic_type {
warn!("Attempted to register publisher with different topic type than existing publisher: existing_type={}, new_type={}", value.topic_type(), topic_type);
// TODO MAJOR: this is a terrible error type to return...
return Some(Err(NodeError::IoError(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
))));
}
let (sender, shutdown) = value.get_senders();
match shutdown.upgrade() {
Some(shutdown) => {
Some(Ok((sender, shutdown)))
}
None => {
error!("We still have an entry for a publication, but it has been shutdown");
// TODO MAJOR: this is a terrible error type to return...
Some(Err(NodeError::IoError(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
))))
}
} else {
None
}
})
};
// If we found an existing publication return the handle to it
if let Some(handle) = existing_entry {
return Ok(handle?);
let (sender, shutdown) = handle?;
return Ok((sender, shutdown));
}

// Otherwise create a new Publication and advertise
let (channel, sender) = Publication::new(
let (channel, sender, shutdown) = Publication::new(
&self.node_name,
latching,
&topic,
Expand All @@ -742,7 +747,7 @@ impl Node {
})?;
self.publishers.insert(topic.clone(), channel);
let _ = self.client.register_publisher(&topic, topic_type).await?;
Ok(sender)
Ok((sender, shutdown))
}

async fn unregister_publisher(&mut self, topic: &str) -> Result<(), NodeError> {
Expand Down
8 changes: 4 additions & 4 deletions roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ impl NodeHandle {
queue_size: usize,
latching: bool,
) -> Result<PublisherAny, NodeError> {
let sender = self
let (sender, shutdown) = self
.inner
.register_publisher_any(topic_name, topic_type, msg_definition, queue_size, latching)
.await?;
Ok(PublisherAny::new(topic_name, sender))
Ok(PublisherAny::new(topic_name, sender, shutdown))
}

/// Create a new publisher for the given type.
Expand All @@ -103,11 +103,11 @@ impl NodeHandle {
queue_size: usize,
latching: bool,
) -> Result<Publisher<T>, NodeError> {
let sender = self
let (sender, shutdown) = self
.inner
.register_publisher::<T>(topic_name, queue_size, latching)
.await?;
Ok(Publisher::new(topic_name, sender))
Ok(Publisher::new(topic_name, sender, shutdown))
}

pub async fn subscribe_any(
Expand Down
Loading

0 comments on commit 63a282b

Please sign in to comment.