Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribers sometimes not getting messages from topics using durability transient_local #263

Closed
nirwester opened this issue Aug 20, 2024 · 12 comments · Fixed by #269
Closed
Assignees

Comments

@nirwester
Copy link

Version

commit b78d6a1
Date: Wed Aug 7 13:51:57 2024 +0200

Platform

Ubuntu 22, Docker

ROS Version

Iron

Description

Hi, I tried to run our codebase tests with zenoh-rmw, and noticed that many of those relying on latched (durability tranisent_local) topics experience seldom failures: the subscribers sometimes fail to get the data from "recently spawned" publishers.

I created a small package where I manage to reproduce the issue (happens roughly 50% of the times on my setup, never reproducible if switching to Cyclone DDS):

https://github.com/nirwester/zenoh_pub_bug

The instructions to execute it are in the README.

Let me know if I can provide additional information.

@Yadunund
Copy link
Member

Thanks for the ticket and the instructions to reproduce. I will try it out and report back.

@Yadunund Yadunund self-assigned this Aug 20, 2024
@Yadunund
Copy link
Member

I'm able to reproduce the issue.

@JEnoch @imstevenpmwork I'm trying to understand if this is an issue with rmw_zenoh or Zenoh itself. I suspect it's the former but I'd like to get your confirmation that it's not a Zenoh issue.

I ran the test case with RUST_LOG=debug and dumped the outputs to this gist.

Specifically looking at the logs relevant to /test_topic, here is the logs from the successful run

1: [subscription_test_exe-1] 2024-08-20T22:08:44.194632Z DEBUG ThreadId(02) zenoh::net::routing::dispatcher::resource: Register resource 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18
1: [subscription_test_exe-1] 2024-08-20T22:08:44.194643Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 for Face{0, b81aa26d75dea93cbb49550b382cabd5}
1: [subscription_test_exe-1] 2024-08-20T22:08:44.194683Z DEBUG ThreadId(02) zenoh::net::routing::dispatcher::queries: Route query Face{0, b81aa26d75dea93cbb49550b382cabd5}:1 for res 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18
1: [subscription_test_exe-1] 2024-08-20T22:08:44.194757Z DEBUG ThreadId(02) zenoh::net::routing::dispatcher::resource: Register resource @/liveliness/@ros2_lv/1/b81aa26d75dea93cbb4955b382cabd5/0/11/MS/%/%/subscription_test/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807
1: [subscription_test_exe-1] 2024-08-20T22:08:44.194765Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription @/liveliness/@ros2_lv/1/b81aa26d75dea93cbb4955b382cabd5/0/11/MS/%/%/subscription_test/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{0, b81aa26d75dea93cbb49550b382cabd5}
1: [test_subscription-2] 2024-08-20T22:08:44.198516Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::dispatcher::resource: Register resource 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18
1: [test_subscription-2] 2024-08-20T22:08:44.198536Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 for Face{2, b81aa26d75dea93cbb49550b382cabd5}
1: [test_subscription-2] 2024-08-20T22:08:44.198555Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::dispatcher::resource: Register resource @/liveliness/@ros2_lv/1/b81aa26d75dea93cbb4955b382cabd5/0/11/MS/%/%/subscription_test/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807
1: [test_subscription-2] 2024-08-20T22:08:44.198586Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription @/liveliness/@ros2_lv/1/b81aa26d75dea93cbb4955b382cabd5/0/11/MS/%/%/subscription_test/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{2, b81aa26d75dea93cbb49550b382cabd5}
1: [test_subscription-2] 2024-08-20T22:08:44.198766Z DEBUG ThreadId(02) zenoh_ext::publication_cache: Create PublicationCache on 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 with history=10 resource_limit=None
1: [test_subscription-2] 2024-08-20T22:08:44.198801Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::queries: Register queryable 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 (face: Face{0, 95ef0865fe38fafdf35b2ec6074881b4})
1: [test_subscription-2] 2024-08-20T22:08:44.198868Z DEBUG ThreadId(02) zenoh::net::routing::dispatcher::resource: Register resource @/liveliness/@ros2_lv/1/95ef865fe38fafdf35b2ec674881b4/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807
1: [test_subscription-2] 2024-08-20T22:08:44.198874Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription @/liveliness/@ros2_lv/1/95ef865fe38fafdf35b2ec674881b4/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{0, 95ef0865fe38fafdf35b2ec6074881b4}
1: [subscription_test_exe-1] 2024-08-20T22:08:44.203671Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::hat::p2p_peer::queries: Register queryable 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 (face: Face{2, 95ef0865fe38fafdf35b2ec6074881b4})
1: [subscription_test_exe-1] 2024-08-20T22:08:44.203709Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::dispatcher::resource: Register resource @/liveliness/@ros2_lv/1/95ef865fe38fafdf35b2ec674881b4/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807
1: [subscription_test_exe-1] 2024-08-20T22:08:44.203749Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription @/liveliness/@ros2_lv/1/95ef865fe38fafdf35b2ec674881b4/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{2, 95ef0865fe38fafdf35b2ec6074881b4}
1: [test_subscription-2] 2024-08-20T22:08:44.299312Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::pubsub: Unregister client subscription @/liveliness/@ros2_lv/1/95ef865fe38fafdf35b2ec674881b4/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{0, 95ef0865fe38fafdf35b2ec6074881b4}
1: [test_subscription-2] 2024-08-20T22:08:44.299365Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::queries: Unregister client queryable 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 for Face{0, 95ef0865fe38fafdf35b2ec6074881b4}
1: [subscription_test_exe-1] 2024-08-20T22:08:44.303319Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::hat::p2p_peer::pubsub: Unregister client subscription @/liveliness/@ros2_lv/1/95ef865fe38fafdf35b2ec674881b4/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{2, 95ef0865fe38fafdf35b2ec6074881b4}
1: [subscription_test_exe-1] 2024-08-20T22:08:44.303411Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::hat::p2p_peer::queries: Unregister client queryable 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 for Face{2, 95ef0865fe38fafdf35b2ec6074881b4}
1: [subscription_test_exe-1] 2024-08-20T22:08:44.307465Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::pubsub: Unregister client subscription @/liveliness/@ros2_lv/1/b81aa26d75dea93cbb4955b382cabd5/0/11/MS/%/%/subscription_test/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{0, b81aa26d75dea93cbb49550b382cabd5}
1: [subscription_test_exe-1] 2024-08-20T22:08:44.307495Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::pubsub: Unregister client subscription 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 for Face{0, b81aa26d75dea93cbb49550b382cabd5}

and here are the logs from the unsuccessful run

1: [subscription_test_exe-1] 2024-08-20T23:00:19.901979Z DEBUG ThreadId(02) zenoh::net::routing::dispatcher::resource: Register resource 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18
1: [subscription_test_exe-1] 2024-08-20T23:00:19.901985Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 for Face{0, 8a16db271ccc9aa37061e48019223be6}
1: [subscription_test_exe-1] 2024-08-20T23:00:19.902027Z DEBUG ThreadId(02) zenoh::net::routing::dispatcher::queries: Route query Face{0, 8a16db271ccc9aa37061e48019223be6}:1 for res 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18
1: [subscription_test_exe-1] 2024-08-20T23:00:19.902083Z DEBUG ThreadId(02) zenoh::net::routing::dispatcher::resource: Register resource @/liveliness/@ros2_lv/1/8a16db271ccc9aa37061e48019223be6/0/11/MS/%/%/subscription_test/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807
1: [subscription_test_exe-1] 2024-08-20T23:00:19.902088Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription @/liveliness/@ros2_lv/1/8a16db271ccc9aa37061e48019223be6/0/11/MS/%/%/subscription_test/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{0, 8a16db271ccc9aa37061e48019223be6}
1: [test_subscription-2] 2024-08-20T23:00:19.902143Z DEBUG ThreadId(02) zenoh_ext::publication_cache: Create PublicationCache on 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 with history=10 resource_limit=None
1: [test_subscription-2] 2024-08-20T23:00:19.902176Z DEBUG ThreadId(02) zenoh::net::routing::dispatcher::resource: Register resource 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18
1: [test_subscription-2] 2024-08-20T23:00:19.902182Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::queries: Register queryable 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 (face: Face{0, 1162aa0ba8efc262a95fb48041748cb})
1: [test_subscription-2] 2024-08-20T23:00:19.902246Z DEBUG ThreadId(02) zenoh::net::routing::dispatcher::resource: Register resource @/liveliness/@ros2_lv/1/1162aa0ba8efc262a95fb4841748cb/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807
1: [test_subscription-2] 2024-08-20T23:00:19.902254Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription @/liveliness/@ros2_lv/1/1162aa0ba8efc262a95fb4841748cb/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{0, 1162aa0ba8efc262a95fb48041748cb}
1: [test_subscription-2] 2024-08-20T23:00:19.904628Z DEBUG  rx-0 ThreadId(08) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 for Face{2, 8a16db271ccc9aa37061e48019223be6}
1: [test_subscription-2] 2024-08-20T23:00:19.904651Z DEBUG  rx-0 ThreadId(08) zenoh::net::routing::dispatcher::resource: Register resource @/liveliness/@ros2_lv/1/8a16db271ccc9aa37061e48019223be6/0/11/MS/%/%/subscription_test/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807
1: [test_subscription-2] 2024-08-20T23:00:19.904676Z DEBUG  rx-0 ThreadId(08) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription @/liveliness/@ros2_lv/1/8a16db271ccc9aa37061e48019223be6/0/11/MS/%/%/subscription_test/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{2, 8a16db271ccc9aa37061e48019223be6}
1: [subscription_test_exe-1] 2024-08-20T23:00:19.905822Z DEBUG  rx-0 ThreadId(08) zenoh::net::routing::hat::p2p_peer::queries: Register queryable 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 (face: Face{2, 1162aa0ba8efc262a95fb48041748cb})
1: [subscription_test_exe-1] 2024-08-20T23:00:19.905846Z DEBUG  rx-0 ThreadId(08) zenoh::net::routing::dispatcher::resource: Register resource @/liveliness/@ros2_lv/1/1162aa0ba8efc262a95fb4841748cb/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807
1: [subscription_test_exe-1] 2024-08-20T23:00:19.905873Z DEBUG  rx-0 ThreadId(08) zenoh::net::routing::hat::p2p_peer::pubsub: Register subscription @/liveliness/@ros2_lv/1/1162aa0ba8efc262a95fb4841748cb/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{2, 1162aa0ba8efc262a95fb48041748cb}
1: [test_subscription-2] 2024-08-20T23:00:24.903523Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::pubsub: Unregister client subscription @/liveliness/@ros2_lv/1/1162aa0ba8efc262a95fb4841748cb/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{0, 1162aa0ba8efc262a95fb48041748cb}
1: [test_subscription-2] 2024-08-20T23:00:24.903580Z DEBUG ThreadId(02) zenoh::net::routing::hat::p2p_peer::queries: Unregister client queryable 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 for Face{0, 1162aa0ba8efc262a95fb48041748cb}
1: [subscription_test_exe-1] 2024-08-20T23:00:24.907669Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::hat::p2p_peer::pubsub: Unregister client subscription @/liveliness/@ros2_lv/1/1162aa0ba8efc262a95fb4841748cb/0/11/MP/%/%/test_node/%test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18/1:1:1,10:9223372036,854775807:9223372036,854775807:1,9223372036,854775807 for Face{2, 1162aa0ba8efc262a95fb48041748cb}
1: [subscription_test_exe-1] 2024-08-20T23:00:24.907929Z DEBUG  rx-1 ThreadId(09) zenoh::net::routing::hat::p2p_peer::queries: Unregister client queryable 1/test_topic/std_msgs::msg::dds_::String_/RIHS01_df668c740482bbd48fb39d76a70dfd4bd59db1288021743503259e948f6b1a18 for Face{2, 1162aa0ba8efc262a95fb48041748cb}

Does anything stand out to you?

I'm looking into whether the subscription in the rmw side receives the payload but is failing to notify the guard_condition or if rmw_take is missing the notification....

@Yadunund
Copy link
Member

I added a line to sub_data_handler to print a statement when a message is received.

When the tests pass, I do see this line printed

1: [subscription_test_exe-1] [INFO] [1724286701.114562488] [rmw_zenoh_cpp]: Sub /test_topic received a message

However, there is no printout when the test fails.

Could this suggest that the issue lies in Zenoh? Or perhaps the options we define for the querying subscriber?

@JEnoch
Copy link
Contributor

JEnoch commented Aug 22, 2024

I think it's an issue of "publication before subscriber discovery":

  • in successful log, I see the Publisher side registering the discovered Subscriber BEFORE creating its PublicationCache, and hence before the publication
  • in failing log, the Subscriber is discovered and registered AFTER the creation of the PublicationCache, and thus possibly after the only one publication. Hence it won't go to the Subscriber.

In PublicationCache / QueryingSubscriber pattern, in addition of its first query at creation, a QueryingSubscriber shall re-issue a Query on any discovered PublicationCache. Thus it get historical publications that it might have missed before mutual discovery, or in case of network partition.
Such extra query shall be done calling ze_querying_subscriber_get(). I don't see any call to this function in rmw_zenoh. Thus, I think this this part is missing.

@Yadunund
Copy link
Member

Such extra query shall be done calling ze_querying_subscriber_get(). I don't see any call to this function in rmw_zenoh.

I had assumed this call is implicitly made when the QueryingSubscriber is created. Is that not the case?

@JEnoch
Copy link
Contributor

JEnoch commented Aug 22, 2024

I had assumed this call is implicitly made when the QueryingSubscriber is created. Is that not the case?

No it isn't. The PublicationCache and QueryingSubscriber have been implemented before the LivelinessTokens exist.
It was up to the developper to figure out in some way that a new PublicationCache joined and shall be queried for historical publications.

Now, we can consider for a next version of Zenoh to improve the PublicationCache to automatically declare a LivelinessToken, and the QueryingSubscriber to automatically query a PublicationCache when it discovers its LivelinessToken.
The drawback would be 1 extra LivelinessToken per transient_local publisher, knowing there is already 1 declared for the ROS graph.

@Yadunund
Copy link
Member

Thanks Julien for the additional clarification. I now realize that the issue here is publisher discovery after first publication.

I will update the implementation in rmw_zenoh to call ze_querying_subscriber_get() when such a change is detected in the graph cache.
I think the changes you proposed are good to have in upstream Zenoh but perhaps only when configured to do so when constructing these objects. By default, it can be disabled.

@YuanYuYuan
Copy link
Contributor

YuanYuYuan commented Aug 23, 2024

  1. It succeeds as long as the subscription is established before the message is sent.
sequenceDiagram
  participant Pub as Pub
  participant Cache as Cache
  participant Get as Get
  participant Sub as Sub

  Sub ->> Pub: subscription
  Pub ->> Cache: msg
  Pub ->> Sub: msg

  box Gray Pub Node
  participant Pub
  participant Cache
  end
  box Gray Sub Node
  participant Get
  participant Sub
  end
Loading
  1. Or it can obtain the historical message if the query arrives after the message stored in the cache
sequenceDiagram
  participant Pub as Pub
  participant Cache as Cache
  participant Get as Get
  participant Sub as Sub

  Pub ->> Cache: msg
  Pub --x Sub: msg
  Sub ->> Pub: subscription
  Get ->> Cache: query
  Cache ->> Get: msg

  box Gray Pub Node
  participant Pub
  participant Cache
  end
  box Gray Sub Node
  participant Get
  participant Sub
  end

Loading
  1. It is possible to fail even with liveliness tokens. For instance, a slow subscription establishment.
sequenceDiagram
  participant PL as Liveliness
  participant Pub
  participant Cache

  participant Get
  participant Sub
  participant SL as Liveliness

  Get ->> Cache: query
  Cache ->> Get: nothing
  PL ->> SL: liveliness
  SL ->> Get: trigger
  Get ->> Cache: query
  Cache ->> Get: nothing
  Pub ->> Cache: msg
  Pub --x Sub: msg
  Sub ->> Pub: subscription

  box Gray Pub Node
  participant Pub
  participant Cache
  participant PL
  end

  box Gray Sub Node
  participant Get
  participant Sub
  participant SL
  end

Loading

My questions are:

  1. Does receiving a liveliness token imply the subscription (of the data subscriber) is established? (Note there are two kinds of subscribers here. One is for the normal data and the other is for the liveliness.) If the answer is no, we will fail in the case 3 above.
  2. If we can guarantee 1., why not just confirm this when we get the reply of querying the pub cache?

@Yadunund
Copy link
Member

@nirwester could you checkout this branch #269, do a clean build (rm -rf build/ install/ log/ && colcon build) of your workspace and run your test again?

The changes in the PR seem to have solved the issue at my end.

@JEnoch
Copy link
Contributor

JEnoch commented Aug 27, 2024

  1. It is possible to fail even with liveliness tokens. For instance, a slow subscription establishment.
sequenceDiagram
  participant PL as Liveliness
  participant Pub
  participant Cache

  participant Get
  participant Sub
  participant SL as Liveliness

  Get ->> Cache: query
  Cache ->> Get: nothing
  PL ->> SL: liveliness
  SL ->> Get: trigger
  Get ->> Cache: query
  Cache ->> Get: nothing
  Pub ->> Cache: msg
  Pub --x Sub: msg
  Sub ->> Pub: subscription

  box Gray Pub Node
  participant Pub
  participant Cache
  participant PL
  end

  box Gray Sub Node
  participant Get
  participant Sub
  participant SL
  end

Loading

This scenario cannot happen since the QueryingSubscriber declares its subscription before sending any query, and the PublicationCache declares its queryable before declaring its LivelinessToken.
And Zenoh guarantees the ordering of messages on each link.
Thus at publication time:

  • either the subscription message has been received and the publication directly goes to the QueryingSubscriber
  • either the subscription message has not been received, but eventually a query will come when the QueryingSubscriber will receive the LivelinessToken and send the query as a consequence.

My questions are:

  1. Does receiving a liveliness token imply the subscription (of the data subscriber) is established? (Note there are two kinds of subscribers here. One is for the normal data and the other is for the liveliness.) If the answer is no, we will fail in the case 3 above.
  • A QueryingSubscriber receiving the LivelinessToken from the PublicationCache implies it previously received the queryable declaration. Hence, the query will hit the cache and anything stored here will be sent as a reply.
  • A PublicationCache receiving a query from a QueryingSubscriber implies it already received its subscription declaration. Hence there it's not possible that a publication arriving after the query would not directly go to the QueryingSubscriber.

@MarcoMatteoBassa
Copy link

@nirwester could you checkout this branch #269, do a clean build (rm -rf build/ install/ log/ && colcon build) of your workspace and run your test again?

The changes in the PR seem to have solved the issue at my end.

I can not reproduce it any more with the fix, thank you! :)
(Still @nirwester, but I'm too lazy to switch accounts :P)

@Yadunund
Copy link
Member

@nirwester could you checkout this branch #269, do a clean build (rm -rf build/ install/ log/ && colcon build) of your workspace and run your test again?
The changes in the PR seem to have solved the issue at my end.

I can not reproduce it any more with the fix, thank you! :) (Still @nirwester, but I'm too lazy to switch accounts :P)

Thanks for testing! Glad to know the issue is resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants