Skip to content

Commit

Permalink
Working both ways zenoh services
Browse files Browse the repository at this point in the history
  • Loading branch information
carter committed Dec 3, 2024
1 parent 4d5456c commit 7262ad6
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 2 deletions.
47 changes: 47 additions & 0 deletions roslibrust_zenoh/examples/service_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! Purpose of this example is to show how to host service visible to ROS1 from a Zenoh client.
// IMPORTANT to bring this trait into scope so we can access the functions it provides
// [ServiceProvider] is what allows us to actually access .service_client()
use roslibrust::topic_provider::ServiceProvider;
use roslibrust_zenoh::ZenohClient;

// IMPORTANT this example will not work with the default zenoh-ros1-bridge settings!
// It requires the flag `--client_bridging_mode auto` to be set when the bridge is started
// Otherwise the service will not be advertised to ROS1 master.

// Generate rust definitions for our messages
roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_interfaces");

#[tokio::main]
async fn main() {
// Setup a logger for debugging purposes
// Run this example with RUST_LOG=debug for more information if this doesn't work for you
env_logger::init();

// Create our zenoh client, depending on how you are running zenoh / the bridge you may
// need to pass specific configuration in here.
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
let client = ZenohClient::new(session);

// Service will remain alive until handle is dropped
let _handle = client
.advertise_service::<std_srvs::SetBool, _>(
"/my_set_bool",
|request: std_srvs::SetBoolRequest| {
log::info!("Got request to set bool: {request:?}");
Ok(std_srvs::SetBoolResponse {
success: true,
message: "You set my bool!".to_string(),
})
},
)
.await
.unwrap();

// Wait for ctrl_c to kill this process
tokio::signal::ctrl_c().await.unwrap();

// While this example is running:
// `rosservice list` should show /my_set_bool
// `rosservice call /my_set_bool "data: true"` should work
}
16 changes: 16 additions & 0 deletions roslibrust_zenoh/examples/snoop_on_discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//! Not actually an example for this crate, just peeping on how discovery for zenoh-ros1-bridge works
#[tokio::main]
async fn main() {
let session = zenoh::open(zenoh::Config::default()).await.unwrap();

let sub = session
.declare_subscriber("ros1_discovery_info/**")
.await
.unwrap();

loop {
let sample = sub.recv_async().await.unwrap();
println!("Got sample: {sample:?}");
}
}
138 changes: 136 additions & 2 deletions roslibrust_zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use roslibrust::{RosLibRustError, RosLibRustResult};
use roslibrust_codegen::{RosMessageType, RosServiceType};

use log::*;
use tokio::select;
use zenoh::bytes::ZBytes;

pub struct ZenohClient {
session: zenoh::Session,
Expand Down Expand Up @@ -137,6 +139,17 @@ fn mangle_topic(topic: &str, type_str: &str, md5sum: &str) -> String {
format!("{type_str}/{md5sum}/{topic}")
}

/// Identical to mangle_topic, but for services we want to separate the datatype stuff from the service name
fn mangle_service(service: &str, type_str: &str, md5sum: &str) -> (String, String) {
let service = service.trim_start_matches('/').trim_end_matches("/");

let type_str = hex::encode(type_str.as_bytes());
(
format!("{type_str}/{md5sum}"),
format!("{service}").to_string(),
)
}

pub struct ZenohServiceClient<T: RosServiceType> {
session: zenoh::Session,
zenoh_query: String,
Expand Down Expand Up @@ -200,9 +213,18 @@ impl<T: RosServiceType> Service<T> for ZenohServiceClient<T> {
}
}

/// The "holder type" returned when advertising a service
/// Dropping this will stop the service server
pub struct ZenohServiceServer {
// Dropping this will stop zenoh's declaration of the queryable
_queryable: zenoh::query::Queryable<()>,
// Dropping this will stop the advertising of the service
_shutdown_channel: tokio::sync::oneshot::Sender<()>,
}

impl ServiceProvider for ZenohClient {
type ServiceClient<T: RosServiceType> = ZenohServiceClient<T>;
type ServiceServer = ();
type ServiceServer = ZenohServiceServer;

async fn service_client<T: roslibrust_codegen::RosServiceType + 'static>(
&self,
Expand All @@ -225,7 +247,119 @@ impl ServiceProvider for ZenohClient {
topic: &str,
server: F,
) -> RosLibRustResult<Self::ServiceServer> {
todo!()
let mangled_topic = mangle_topic(topic, T::ROS_SERVICE_NAME, T::MD5SUM);

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();

let x = self
.session
.declare_queryable(mangled_topic)
.callback(move |query| {
let _ = tx.send(query).map_err(|e| {
error!("Failed to send query: {e:?}");
});
})
.await
.map_err(|e| {
RosLibRustError::Unexpected(anyhow::anyhow!("Failed to declare queryable: {e:?}"))
})?;

// Spawn a task to handle the queries
// This task will shut down when queryable is dropped
tokio::spawn(async move {
while let Some(query) = rx.recv().await {
debug!("Got query: {query:?}");
let Some(payload) = query.payload() else {
error!("Received a query with no payload for a ros0 service {query:?}");
continue;
};
let bytes = payload.to_bytes();
debug!("Got bytes: {bytes:?}");
// TODO MAJOR HACK HERE STILL
// Our deserialization still expects the first four bytes to be the total message size
// So we're just going to manually add the bytes back in
let starting_bytes = (bytes.len() as u32).to_le_bytes();
let bytes = [&starting_bytes, &bytes[..]].concat();

let Ok(request) = roslibrust_serde_rosmsg::from_slice(&bytes).map_err(|e| {
error!("Failed to deserialize request: {e:?}");
}) else {
continue;
};

let Ok(response) = server(request).map_err(|e| {
error!("Failed to handle request: {e:?}");
}) else {
continue;
};

let Ok(response_bytes) = roslibrust_serde_rosmsg::to_vec(&response).map_err(|e| {
error!("Failed to serialize response: {e:?}");
}) else {
continue;
};

// TODO HACK HERE STILL
// Zenoh doesn't want the first four bytes that are the overall message size:
let response_bytes = &response_bytes[4..];

let _ = query
.reply(query.key_expr(), response_bytes)
.await
.map_err(|e| {
error!("Failed to reply to query: {e:?}");
});
}
});
// zenoh-ros1-bridge won't serve our service without us publishing info on 'ros1_discovery_info'
// We don't have to worry about this for publishers, because zenoh will initiate the bridge whenever someone subscribes on the ros side.
// For service, zenoh-ros1-bridge has to create the service before anyone can call it so it has to know that it needs to do that.
// This is a bit of a brittle implementation as it relies on internal implementation details for zenoh-ros1-bridge, but it works for now.
// See: https://github.com/eclipse-zenoh/zenoh-plugin-ros1/blob/main/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/discovery.rs

// Note: I'm uncertain about "discovery_namespace" and just using * for now
// Note: I'm uncertain about "bridge_namespace" and just using * for now
let (type_mangle, service_name) = mangle_service(topic, T::ROS_SERVICE_NAME, T::MD5SUM);
let zenoh_info_topic = format!("ros1_discovery_info/*/srv/{type_mangle}/*/{service_name}");

let q2 = self
.session
.declare_publisher(zenoh_info_topic)
.await
.map_err(|e| {
RosLibRustError::Unexpected(anyhow::anyhow!(
"Failed to declare queryable for service discovery: {e:?}"
))
})?;
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
let shutdown = shutdown_rx.try_recv();
match shutdown {
Ok(_) => {
break;
}
Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
// Continue no shutdown yet
}
Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
break;
}
}
// Send an empty message to the discovery topic
let res = q2.put(ZBytes::default()).await;
if let Err(e) = res {
error!("Failed to publish service discovery info: {e:?}");
}
interval.tick().await;
}
});

Ok(ZenohServiceServer {
_queryable: x,
_shutdown_channel: shutdown_tx,
})
}
}

Expand Down

0 comments on commit 7262ad6

Please sign in to comment.