diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 382c00b4..3a005d50 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -291,6 +291,7 @@ std::shared_ptr SubscriptionData::make( return nullptr; } } + sub_data->graph_cache_ = graph_cache; auto undeclare_z_sub = rcpputils::make_scope_exit( [data = sub_data]() { @@ -612,4 +613,12 @@ void SubscriptionData::set_on_new_message_callback( std::lock_guard lock(mutex_); data_callback_mgr_.set_callback(user_data, callback); } + +//============================================================================== +std::shared_ptr SubscriptionData::graph_cache() const +{ + std::lock_guard lock(mutex_); + return graph_cache_; +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index e20720c7..166349ef 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -123,6 +123,8 @@ class SubscriptionData final rmw_event_callback_t callback, const void * user_data); + std::shared_ptr graph_cache() const; + // Destructor. ~SubscriptionData(); @@ -153,6 +155,8 @@ class SubscriptionData final std::shared_ptr events_mgr_; // Shutdown flag. bool is_shutdown_; + // The graph cache. + std::shared_ptr graph_cache_; }; using SubscriptionDataPtr = std::shared_ptr; using SubscriptionDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 1de4ff8b..ac792f8f 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -101,15 +101,8 @@ rmw_subscription_event_init( RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription->implementation_identifier, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_INVALID_ARGUMENT); - rmw_node_t * node = - static_cast(subscription->data); - RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_s * context_impl = - static_cast(node->context->impl); - RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); - auto sub_data = node_data->get_sub_data(subscription); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); if (subscription->implementation_identifier != rmw_zenoh_cpp::rmw_zenoh_identifier) { RMW_SET_ERROR_MSG( @@ -135,14 +128,14 @@ rmw_subscription_event_init( return RMW_RET_OK; } - std::weak_ptr data_wp = sub_data; - context_impl->graph_cache()->set_qos_event_callback( + // std::weak_ptr data_wp = sub_data; + sub_data->graph_cache()->set_qos_event_callback( sub_data->guid(), zenoh_event_type, - [data_wp, + [sub_data, zenoh_event_type](std::unique_ptr zenoh_event) { - auto sub_data = data_wp.lock(); + // auto sub_data = data_wp.lock(); if (sub_data == nullptr) { return; } diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index e5ca235e..de617a41 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -972,7 +972,11 @@ rmw_create_subscription( // Store type erased node in rmw_subscription->data so that the // Subscription can be safely accessed. - rmw_subscription->data = reinterpret_cast(const_cast(node)); + // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased + // subscription handle will be returned in the rmw_subscriptions_t in rmw_wait + // from which we cannot obtain SubscriptionData. + // rmw_subscription->data = reinterpret_cast(const_cast(node)); + rmw_subscription->data = static_cast(node_data->get_sub_data(rmw_subscription).get()); rmw_subscription->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; rmw_subscription->options = *subscription_options; rmw_subscription->can_loan_messages = false; @@ -1058,18 +1062,11 @@ rmw_subscription_count_matched_publishers( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(publisher_count, RMW_RET_INVALID_ARGUMENT); - rmw_node_t * node = - static_cast(subscription->data); - RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_s * context_impl = - static_cast(node->context->impl); - RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); - auto sub_data = node_data->get_sub_data(subscription); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - return context_impl->graph_cache()->subscription_count_matched_publishers( + return sub_data->graph_cache()->subscription_count_matched_publishers( sub_data->topic_info(), publisher_count); } @@ -1087,15 +1084,8 @@ rmw_subscription_get_actual_qos( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); - rmw_node_t * node = - static_cast(subscription->data); - RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_s * context_impl = - static_cast(node->context->impl); - RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); - auto sub_data = node_data->get_sub_data(subscription); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); *qos = sub_data->adapted_qos_profile(); @@ -1152,15 +1142,8 @@ rmw_take( subscription handle, subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - rmw_node_t * node = - static_cast(subscription->data); - RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_s * context_impl = - static_cast(node->context->impl); - RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); - auto sub_data = node_data->get_sub_data(subscription); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); return sub_data->take_one_message(ros_message, nullptr, taken); @@ -1188,15 +1171,8 @@ rmw_take_with_info( subscription handle, subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - rmw_node_t * node = - static_cast(subscription->data); - RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_s * context_impl = - static_cast(node->context->impl); - RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); - auto sub_data = node_data->get_sub_data(subscription); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); return sub_data->take_one_message(ros_message, message_info, taken); @@ -1225,15 +1201,8 @@ rmw_take_sequence( subscription handle, subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - rmw_node_t * node = - static_cast(subscription->data); - RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_s * context_impl = - static_cast(node->context->impl); - RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); - auto sub_data = node_data->get_sub_data(subscription); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); if (0u == count) { @@ -1308,15 +1277,8 @@ __rmw_take_serialized( subscription handle, subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - rmw_node_t * node = - static_cast(subscription->data); - RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_s * context_impl = - static_cast(node->context->impl); - RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); - auto sub_data = node_data->get_sub_data(subscription); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); return sub_data->take_serialized_message( @@ -2795,24 +2757,8 @@ check_and_attach_condition( if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { - rmw_node_t * node = - static_cast(subscriptions->subscribers[i]); - if (node == nullptr) { - continue; - } - rmw_context_impl_s * context_impl = - static_cast(node->context->impl); - if (context_impl == nullptr) { - continue; - } - auto node_data = context_impl->get_node_data(node); - if (node_data == nullptr) { - continue; - } - auto sub_data = node_data->get_sub_data(subscriptions->subscribers[i]); - if (sub_data == nullptr) { - continue; - } + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscriptions->subscribers[i]); if (sub_data->queue_has_data_and_attach_condition_if_not(wait_set_data)) { return true; } @@ -2962,24 +2908,8 @@ rmw_wait( if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { - rmw_node_t * node = - static_cast(subscriptions->subscribers[i]); - if (node == nullptr) { - continue; - } - rmw_context_impl_s * context_impl = - static_cast(node->context->impl); - if (context_impl == nullptr) { - continue; - } - auto node_data = context_impl->get_node_data(node); - if (node_data == nullptr) { - continue; - } - auto sub_data = node_data->get_sub_data(subscriptions->subscribers[i]); - if (sub_data == nullptr) { - continue; - } + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscriptions->subscribers[i]); if (sub_data == nullptr) { continue; } @@ -3335,15 +3265,8 @@ rmw_subscription_set_on_new_message_callback( const void * user_data) { RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); - rmw_node_t * node = - static_cast(subscription->data); - RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_s * context_impl = - static_cast(node->context->impl); - RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); - auto sub_data = node_data->get_sub_data(subscription); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); sub_data->set_on_new_message_callback(callback, user_data);