Skip to content

Commit

Permalink
Fix ros2#279 by removing querying sub callback after subscription is …
Browse files Browse the repository at this point in the history
…destroyed (ros2#282)

Signed-off-by: Yadunund <yadunund@intrinsic.ai>
Signed-off-by: Alejandro Hernández Cordero <ahcorde@gmail.com>
Co-authored-by: Alejandro Hernández Cordero <ahcorde@gmail.com>
  • Loading branch information
2 people authored and imstevenpmwork committed Sep 23, 2024
1 parent 3122fa5 commit 6812738
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
24 changes: 19 additions & 5 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ void GraphCache::parse_put(
{
auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_);
if (sub_cbs_it != querying_subs_cbs_.end()) {
for (const auto & cb : sub_cbs_it->second) {
cb(entity->zid());
for (auto sub_it = sub_cbs_it->second.begin(); sub_it != sub_cbs_it->second.end(); ++sub_it) {
sub_it->second(entity->zid());
}
}
}
Expand Down Expand Up @@ -1331,15 +1331,29 @@ std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(

///=============================================================================
void GraphCache::set_querying_subscriber_callback(
const std::string & keyexpr,
const rmw_subscription_data_t * sub_data,
QueryingSubscriberCallback cb)
{
const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_;
auto cb_it = querying_subs_cbs_.find(keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(std::vector<QueryingSubscriberCallback>{});
querying_subs_cbs_[keyexpr] = std::move(
std::unordered_map<const rmw_subscription_data_t *,
QueryingSubscriberCallback>{});
cb_it = querying_subs_cbs_.find(keyexpr);
}
cb_it->second.push_back(std::move(cb));
cb_it->second.insert(std::make_pair(sub_data, std::move(cb)));
}

///=============================================================================
void GraphCache::remove_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data)
{
auto cb_map_it = querying_subs_cbs_.find(sub_data->entity->topic_info()->topic_keyexpr_);
if (cb_map_it == querying_subs_cbs_.end()) {
return;
}
cb_map_it->second.erase(sub_data);
}

} // namespace rmw_zenoh_cpp
13 changes: 11 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@

namespace rmw_zenoh_cpp
{
// Forward declare to prevent circular dependency.
// TODO(Yadunund): Remove this once we move rmw_subscription_data_t out of
// rmw_data_types.hpp.
class rmw_subscription_data_t;

///=============================================================================
// TODO(Yadunund): Consider changing this to an array of unordered_set where the index of the
// array corresponds to the EntityType enum. This way we don't need to mix
Expand Down Expand Up @@ -184,9 +189,12 @@ class GraphCache final
static bool is_entity_pub(const liveliness::Entity & entity);

void set_querying_subscriber_callback(
const std::string & keyexpr,
const rmw_subscription_data_t * sub_data,
QueryingSubscriberCallback cb);

void remove_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data);

private:
// Helper function to convert an Entity into a GraphNode.
// Note: this will update bookkeeping variables in GraphCache.
Expand Down Expand Up @@ -286,7 +294,8 @@ class GraphCache final
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
std::unordered_map<std::string, std::vector<QueryingSubscriberCallback>> querying_subs_cbs_;
std::unordered_map<std::string, std::unordered_map<const rmw_subscription_data_t *,
QueryingSubscriberCallback>> querying_subs_cbs_;
// Counters to track changes to event statues for each topic.
std::unordered_map<std::string,
std::array<rmw_zenoh_event_status_t, ZENOH_EVENT_ID_MAX + 1>> event_statuses_;
Expand Down
8 changes: 7 additions & 1 deletion rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ rmw_create_subscription(
// Register the querying subscriber with the graph cache to get latest
// messages from publishers that were discovered after their first publication.
context_impl->graph_cache->set_querying_subscriber_callback(
sub_data->entity->topic_info()->topic_keyexpr_,
sub_data,
[sub_data](const std::string & queryable_prefix) -> void
{
if (sub_data == nullptr) {
Expand Down Expand Up @@ -1537,6 +1537,10 @@ rmw_ret_t
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
Expand Down Expand Up @@ -1574,6 +1578,8 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
RMW_SET_ERROR_MSG("failed to undeclare sub");
ret = RMW_RET_ERROR;
}
// Also remove the registered callback from the GraphCache.
context_impl->graph_cache->remove_querying_subscriber_callback(sub_data);
}

RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, );
Expand Down

0 comments on commit 6812738

Please sign in to comment.