diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index adfb4208..608f6157 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -132,9 +133,13 @@ struct builtin_readers dds_entity_t rds[sizeof(builtin_topics) / sizeof(builtin_topics[0])]; }; -struct CddsNode +struct CddsEntity +{ + dds_entity_t enth; +}; + +struct CddsNode : CddsEntity { - dds_entity_t pp; dds_entity_t pub; dds_entity_t sub; rmw_guard_condition_t * graph_guard_condition; @@ -146,16 +151,14 @@ struct CddsNode dds_domainid_t domain_id; }; -struct CddsPublisher +struct CddsPublisher : CddsEntity { - dds_entity_t pubh; dds_instance_handle_t pubiid; struct ddsi_sertopic * sertopic; }; -struct CddsSubscription +struct CddsSubscription : CddsEntity { - dds_entity_t subh; dds_entity_t rdcondh; struct ddsi_sertopic * sertopic; }; @@ -187,6 +190,11 @@ struct CddsGuardCondition dds_entity_t gcondh; }; +struct CddsEvent : CddsEntity +{ + rmw_event_type_t event_type; +}; + struct CddsWaitset { dds_entity_t waitseth; @@ -200,6 +208,7 @@ struct CddsWaitset std::vector gcs; std::vector cls; std::vector srvs; + std::vector evs; }; #if SUPPORT_LOCALHOST @@ -723,7 +732,7 @@ extern "C" rmw_node_t * rmw_create_node( if (!(graph_guard_condition = rmw_create_guard_condition(context))) { goto fail_ggc; } - node_impl->pp = pp; + node_impl->enth = pp; node_impl->pub = pub; node_impl->sub = sub; node_impl->graph_guard_condition = graph_guard_condition; @@ -809,7 +818,7 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node) /* prevent race with rmw_create_node (see there) */ std::lock_guard lock(gcdds.domains_lock); #endif - if (dds_delete(node_impl->pp) < 0) { + if (dds_delete(node_impl->enth) < 0) { RMW_SET_ERROR_MSG("failed to destroy DDS participant"); result_ret = RMW_RET_ERROR; } @@ -927,7 +936,7 @@ extern "C" rmw_ret_t rmw_publish( RET_NULL(ros_message); auto pub = static_cast(publisher->data); assert(pub); - if (dds_write(pub->pubh, ros_message) >= 0) { + if (dds_write(pub->enth, ros_message) >= 0) { return RMW_RET_OK; } else { RMW_SET_ERROR_MSG("failed to publish data"); @@ -946,7 +955,7 @@ extern "C" rmw_ret_t rmw_publish_serialized_message( struct ddsi_serdata * d = serdata_rmw_from_serialized_message(pub->sertopic, serialized_message->buffer, serialized_message->buffer_length); - const bool ok = (dds_writecdr(pub->pubh, d) >= 0); + const bool ok = (dds_writecdr(pub->enth, d) >= 0); return ok ? RMW_RET_OK : RMW_RET_ERROR; } @@ -1007,7 +1016,9 @@ static dds_qos_t * create_readwrite_qos( const rmw_qos_profile_t * qos_policies, bool ignore_local_publications) { + dds_duration_t ldur; dds_qos_t * qos = dds_create_qos(); + dds_qset_writer_data_lifecycle (qos, false); /* disable autodispose */ switch (qos_policies->history) { case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: case RMW_QOS_POLICY_HISTORY_UNKNOWN: @@ -1062,7 +1073,34 @@ static dds_qos_t * create_readwrite_qos( default: rmw_cyclonedds_cpp::unreachable(); } - /* deadline, lifespan, liveliness are not yet supported */ + if (qos_policies->lifespan.sec > 0 || qos_policies->lifespan.nsec > 0) { + dds_qset_lifespan(qos, DDS_SECS(qos_policies->lifespan.sec) + qos_policies->lifespan.nsec); + } + if (qos_policies->deadline.sec > 0 || qos_policies->deadline.nsec > 0) { + dds_qset_deadline(qos, DDS_SECS(qos_policies->deadline.sec) + qos_policies->deadline.nsec); + } + + if (qos_policies->liveliness_lease_duration.sec == 0 && + qos_policies->liveliness_lease_duration.nsec == 0) + { + ldur = DDS_INFINITY; + } else { + ldur = DDS_SECS(qos_policies->liveliness_lease_duration.sec) + + qos_policies->liveliness_lease_duration.nsec; + } + switch (qos_policies->liveliness) { + case RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: + case RMW_QOS_POLICY_LIVELINESS_UNKNOWN: + dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, ldur); + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE: + dds_qset_liveliness(qos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur); + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC: + dds_qset_liveliness(qos, DDS_LIVELINESS_MANUAL_BY_TOPIC, ldur); + break; + } if (ignore_local_publications) { dds_qset_ignorelocal(qos, DDS_IGNORELOCAL_PARTICIPANT); } @@ -1145,12 +1183,8 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic RMW_SET_ERROR_MSG("get_readwrite_qos: deadline not set"); goto error; } - if (deadline == DDS_INFINITY) { - qos_policies->deadline.sec = qos_policies->deadline.nsec = 0; - } else { - qos_policies->deadline.sec = (uint64_t) deadline / 1000000000; - qos_policies->deadline.nsec = (uint64_t) deadline % 1000000000; - } + qos_policies->deadline.sec = (uint64_t) deadline / 1000000000; + qos_policies->deadline.nsec = (uint64_t) deadline % 1000000000; } { @@ -1158,12 +1192,8 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic if (!dds_qget_lifespan(qos, &lifespan)) { lifespan = DDS_INFINITY; } - if (lifespan == DDS_INFINITY) { - qos_policies->lifespan.sec = qos_policies->lifespan.nsec = 0; - } else { - qos_policies->lifespan.sec = (uint64_t) lifespan / 1000000000; - qos_policies->lifespan.nsec = (uint64_t) lifespan % 1000000000; - } + qos_policies->lifespan.sec = (uint64_t) lifespan / 1000000000; + qos_policies->lifespan.nsec = (uint64_t) lifespan % 1000000000; } { @@ -1186,13 +1216,8 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic default: rmw_cyclonedds_cpp::unreachable(); } - if (lease_duration == DDS_INFINITY) { - qos_policies->liveliness_lease_duration.sec = qos_policies->liveliness_lease_duration.nsec = - 0; - } else { - qos_policies->liveliness_lease_duration.sec = (uint64_t) lease_duration / 1000000000; - qos_policies->liveliness_lease_duration.nsec = (uint64_t) lease_duration % 1000000000; - } + qos_policies->liveliness_lease_duration.sec = (uint64_t) lease_duration / 1000000000; + qos_policies->liveliness_lease_duration.nsec = (uint64_t) lease_duration % 1000000000; } dds_delete_qos(qos); @@ -1226,7 +1251,7 @@ static CddsPublisher * create_cdds_publisher( create_message_type_support(type_support->data, type_support->typesupport_identifier), false, rmw_cyclonedds_cpp::make_message_value_type(type_supports)); if ((topic = - dds_create_topic_arbitrary(node_impl->pp, sertopic, nullptr, nullptr, nullptr)) < 0) + dds_create_topic_arbitrary(node_impl->enth, sertopic, nullptr, nullptr, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create topic"); goto fail_topic; @@ -1234,11 +1259,11 @@ static CddsPublisher * create_cdds_publisher( if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) { goto fail_qos; } - if ((pub->pubh = dds_create_writer(node_impl->pub, topic, qos, nullptr)) < 0) { + if ((pub->enth = dds_create_writer(node_impl->pub, topic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; } - if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { + if (dds_get_instance_handle(pub->enth, &pub->pubiid) < 0) { RMW_SET_ERROR_MSG("failed to get instance handle for writer"); goto fail_instance_handle; } @@ -1248,7 +1273,7 @@ static CddsPublisher * create_cdds_publisher( return pub; fail_instance_handle: - if (dds_delete(pub->pubh) < 0) { + if (dds_delete(pub->enth) < 0) { RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy writer during error handling"); } fail_writer: @@ -1309,7 +1334,7 @@ extern "C" rmw_publisher_t * rmw_create_publisher( fail_topic_name: rmw_publisher_free(rmw_publisher); fail_publisher: - if (dds_delete(pub->pubh) < 0) { + if (dds_delete(pub->enth) < 0) { RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling"); } delete pub; @@ -1350,7 +1375,7 @@ extern "C" rmw_ret_t rmw_publisher_count_matched_subscriptions( RET_WRONG_IMPLID(publisher); auto pub = static_cast(publisher->data); dds_publication_matched_status_t status; - if (dds_get_publication_matched_status(pub->pubh, &status) < 0) { + if (dds_get_publication_matched_status(pub->enth, &status) < 0) { return RMW_RET_ERROR; } else { *subscription_count = status.current_count; @@ -1369,7 +1394,7 @@ rmw_ret_t rmw_publisher_get_actual_qos(const rmw_publisher_t * publisher, rmw_qo RET_NULL(qos); RET_WRONG_IMPLID(publisher); auto pub = static_cast(publisher->data); - if (get_readwrite_qos(pub->pubh, qos)) { + if (get_readwrite_qos(pub->enth, qos)) { return RMW_RET_OK; } else { return RMW_RET_ERROR; @@ -1407,7 +1432,7 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * RET_WRONG_IMPLID(publisher); auto pub = static_cast(publisher->data); if (pub != nullptr) { - if (dds_delete(pub->pubh) < 0) { + if (dds_delete(pub->enth) < 0) { RMW_SET_ERROR_MSG("failed to delete writer"); } ddsi_sertopic_unref(pub->sertopic); @@ -1448,7 +1473,7 @@ static CddsSubscription * create_cdds_subscription( create_message_type_support(type_support->data, type_support->typesupport_identifier), false, rmw_cyclonedds_cpp::make_message_value_type(type_supports)); if ((topic = - dds_create_topic_arbitrary(node_impl->pp, sertopic, nullptr, nullptr, nullptr)) < 0) + dds_create_topic_arbitrary(node_impl->enth, sertopic, nullptr, nullptr, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create topic"); goto fail_topic; @@ -1456,11 +1481,11 @@ static CddsSubscription * create_cdds_subscription( if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) { goto fail_qos; } - if ((sub->subh = dds_create_reader(node_impl->sub, topic, qos, nullptr)) < 0) { + if ((sub->enth = dds_create_reader(node_impl->sub, topic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create reader"); goto fail_reader; } - if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { + if ((sub->rdcondh = dds_create_readcondition(sub->enth, DDS_ANY_STATE)) < 0) { RMW_SET_ERROR_MSG("failed to create readcondition"); goto fail_readcond; } @@ -1469,7 +1494,7 @@ static CddsSubscription * create_cdds_subscription( dds_delete(topic); return sub; fail_readcond: - if (dds_delete(sub->subh) < 0) { + if (dds_delete(sub->enth) < 0) { RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); } fail_reader: @@ -1546,7 +1571,7 @@ extern "C" rmw_subscription_t * rmw_create_subscription( RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete readcondition during error handling"); } - if (dds_delete(sub->subh) < 0) { + if (dds_delete(sub->enth) < 0) { RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); } delete sub; @@ -1560,7 +1585,7 @@ extern "C" rmw_ret_t rmw_subscription_count_matched_publishers( RET_WRONG_IMPLID(subscription); auto sub = static_cast(subscription->data); dds_subscription_matched_status_t status; - if (dds_get_subscription_matched_status(sub->subh, &status) < 0) { + if (dds_get_subscription_matched_status(sub->enth, &status) < 0) { return RMW_RET_ERROR; } else { *publisher_count = status.current_count; @@ -1575,7 +1600,7 @@ extern "C" rmw_ret_t rmw_subscription_get_actual_qos( RET_NULL(qos); RET_WRONG_IMPLID(subscription); auto sub = static_cast(subscription->data); - if (get_readwrite_qos(sub->subh, qos)) { + if (get_readwrite_qos(sub->enth, qos)) { return RMW_RET_OK; } else { return RMW_RET_ERROR; @@ -1592,7 +1617,7 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscriptio if (dds_delete(sub->rdcondh) < 0) { RMW_SET_ERROR_MSG("failed to delete readcondition"); } - if (dds_delete(sub->subh) < 0) { + if (dds_delete(sub->enth) < 0) { RMW_SET_ERROR_MSG("failed to delete reader"); } ddsi_sertopic_unref(sub->sertopic); @@ -1614,7 +1639,7 @@ static rmw_ret_t rmw_take_int( CddsSubscription * sub = static_cast(subscription->data); RET_NULL(sub); dds_sample_info_t info; - while (dds_take(sub->subh, &ros_message, &info, 1, 1) == 1) { + while (dds_take(sub->enth, &ros_message, &info, 1, 1) == 1) { if (info.valid_data) { if (message_info) { message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier; @@ -1651,7 +1676,7 @@ static rmw_ret_t rmw_take_ser_int( RET_NULL(sub); dds_sample_info_t info; struct ddsi_serdata * dcmn; - while (dds_takecdr(sub->subh, &dcmn, 1, &info, DDS_ANY_STATE) == 1) { + while (dds_takecdr(sub->enth, &dcmn, 1, &info, DDS_ANY_STATE) == 1) { if (info.valid_data) { if (message_info) { message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier; @@ -1771,7 +1796,7 @@ extern "C" rmw_ret_t rmw_take_event( auto ei = static_cast(event_info); auto sub = static_cast(event_handle->data); dds_liveliness_changed_status_t st; - if (dds_get_liveliness_changed_status(sub->subh, &st) < 0) { + if (dds_get_liveliness_changed_status(sub->enth, &st) < 0) { *taken = false; return RMW_RET_ERROR; } else { @@ -1788,7 +1813,7 @@ extern "C" rmw_ret_t rmw_take_event( auto ei = static_cast(event_info); auto sub = static_cast(event_handle->data); dds_requested_deadline_missed_status_t st; - if (dds_get_requested_deadline_missed_status(sub->subh, &st) < 0) { + if (dds_get_requested_deadline_missed_status(sub->enth, &st) < 0) { *taken = false; return RMW_RET_ERROR; } else { @@ -1803,7 +1828,7 @@ extern "C" rmw_ret_t rmw_take_event( auto ei = static_cast(event_info); auto pub = static_cast(event_handle->data); dds_liveliness_lost_status_t st; - if (dds_get_liveliness_lost_status(pub->pubh, &st) < 0) { + if (dds_get_liveliness_lost_status(pub->enth, &st) < 0) { *taken = false; return RMW_RET_ERROR; } else { @@ -1818,7 +1843,7 @@ extern "C" rmw_ret_t rmw_take_event( auto ei = static_cast(event_info); auto pub = static_cast(event_handle->data); dds_offered_deadline_missed_status_t st; - if (dds_get_offered_deadline_missed_status(pub->pubh, &st) < 0) { + if (dds_get_offered_deadline_missed_status(pub->enth, &st) < 0) { *taken = false; return RMW_RET_ERROR; } else { @@ -2011,6 +2036,27 @@ static bool require_reattach(const std::vector & cached, size_t count, void } } +static bool require_reattach( + const std::vector & cached, rmw_events_t * events) +{ + if (events == nullptr || events->event_count == 0) { + return cached.size() != 0; + } else if (events->event_count != cached.size()) { + return true; + } else { + for (size_t i = 0; i < events->event_count; ++i) { + rmw_event_t * current_event = static_cast(events->events[i]); + CddsEvent c = cached.at(i); + if (c.enth != static_cast(current_event->data)->enth || + c.event_type != current_event->event_type) + { + return true; + } + } + return false; + } +} + static void waitset_detach(CddsWaitset * ws) { for (auto && x : ws->subs) { @@ -2047,12 +2093,84 @@ static void clean_waitset_caches() } } +/// mapping of RMW_EVENT to the corresponding DDS status +static const std::unordered_map mask_map{ + {RMW_EVENT_LIVELINESS_CHANGED, DDS_LIVELINESS_CHANGED_STATUS}, + {RMW_EVENT_REQUESTED_DEADLINE_MISSED, DDS_REQUESTED_DEADLINE_MISSED_STATUS}, + {RMW_EVENT_LIVELINESS_LOST, DDS_LIVELINESS_LOST_STATUS}, + {RMW_EVENT_OFFERED_DEADLINE_MISSED, DDS_OFFERED_DEADLINE_MISSED_STATUS}, +}; + +static uint32_t get_status_kind_from_rmw(const rmw_event_type_t event_t) +{ + return mask_map.at(event_t); +} + +static bool is_event_supported(const rmw_event_type_t event_t) +{ + return mask_map.count(event_t) > 0; +} + +static rmw_ret_t gather_event_entities( + const rmw_events_t * events, + std::unordered_set & entities) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(events, RMW_RET_INVALID_ARGUMENT); + + std::unordered_map status_mask_map; + + for (size_t i = 0; i < events->event_count; ++i) { + rmw_event_t * current_event = static_cast(events->events[i]); + dds_entity_t dds_entity = static_cast(current_event->data)->enth; + if (dds_entity <= 0) { + RMW_SET_ERROR_MSG("Event entity handle is invalid"); + return RMW_RET_ERROR; + } + + if (is_event_supported(current_event->event_type)) { + if (status_mask_map.find(dds_entity) == status_mask_map.end()) { + status_mask_map[dds_entity] = 0; + } + status_mask_map[dds_entity] |= get_status_kind_from_rmw(current_event->event_type); + } + } + for (auto & pair : status_mask_map) { + // set the status condition's mask with the supported type + dds_set_status_mask(pair.first, pair.second); + entities.insert(pair.first); + } + + return RMW_RET_OK; +} + +static rmw_ret_t handle_active_events(rmw_events_t * events) +{ + if (events) { + for (size_t i = 0; i < events->event_count; ++i) { + rmw_event_t * current_event = static_cast(events->events[i]); + dds_entity_t dds_entity = static_cast(current_event->data)->enth; + if (dds_entity <= 0) { + RMW_SET_ERROR_MSG("Event entity handle is invalid"); + return RMW_RET_ERROR; + } + + uint32_t status_mask; + dds_get_status_changes(dds_entity, &status_mask); + if (!is_event_supported(current_event->event_type) || + !static_cast(status_mask & get_status_kind_from_rmw(current_event->event_type))) + { + events->events[i] = nullptr; + } + } + } + return RMW_RET_OK; +} + extern "C" rmw_ret_t rmw_wait( rmw_subscriptions_t * subs, rmw_guard_conditions_t * gcs, rmw_services_t * srvs, rmw_clients_t * cls, rmw_events_t * evs, rmw_wait_set_t * wait_set, const rmw_time_t * wait_timeout) { - static_cast(evs); RET_NULL(wait_set); CddsWaitset * ws = static_cast(wait_set->data); RET_NULL(ws); @@ -2071,7 +2189,8 @@ extern "C" rmw_ret_t rmw_wait( require_reattach(ws->gcs, gcs ? gcs->guard_condition_count : 0, gcs ? gcs->guard_conditions : nullptr) || require_reattach(ws->srvs, srvs ? srvs->service_count : 0, srvs ? srvs->services : nullptr) || - require_reattach(ws->cls, cls ? cls->client_count : 0, cls ? cls->clients : nullptr)) + require_reattach(ws->cls, cls ? cls->client_count : 0, cls ? cls->clients : nullptr) || + require_reattach(ws->evs, evs)) { size_t nelems = 0; waitset_detach(ws); @@ -2093,6 +2212,28 @@ extern "C" rmw_ret_t rmw_wait( ATTACH(CddsService, srvs, service, service.sub->rdcondh); ATTACH(CddsClient, cls, client, client.sub->rdcondh); #undef ATTACH + + ws->evs.resize(0); + if (evs) { + std::unordered_set event_entities; + rmw_ret_t ret_code = gather_event_entities(evs, event_entities); + if (ret_code != RMW_RET_OK) { + return ret_code; + } + for (auto e : event_entities) { + dds_waitset_attach(ws->waitseth, e, nelems); + nelems++; + } + ws->evs.reserve(evs->event_count); + for (size_t i = 0; i < evs->event_count; i++) { + auto current_event = static_cast(evs->events[i]); + CddsEvent ev; + ev.enth = static_cast(current_event->data)->enth; + ev.event_type = current_event->event_type; + ws->evs.push_back(ev); + } + } + ws->nelems = nelems; } @@ -2116,7 +2257,6 @@ extern "C" rmw_ret_t rmw_wait( if (var) { \ for (size_t i = 0; i < var->name ## _count; i++) { \ auto x = static_cast(var->name ## s[i]); \ - /*dds_waitset_detach (ws->waitseth, x->cond);*/ \ if (ws->trigs[trig_idx] == static_cast(nelems)) { \ on_triggered; \ trig_idx++; \ @@ -2133,6 +2273,7 @@ extern "C" rmw_ret_t rmw_wait( DETACH(CddsService, srvs, service, service.sub->rdcondh, (void) x); DETACH(CddsClient, cls, client, client.sub->rdcondh, (void) x); #undef DETACH + handle_active_events(evs); } #if REPORT_BLOCKED_REQUESTS @@ -2167,7 +2308,7 @@ static rmw_ret_t rmw_take_response_request( dds_sample_info_t info; wrap.data = ros_data; void * wrap_ptr = static_cast(&wrap); - while (dds_take(cs->sub->subh, &wrap_ptr, &info, 1, 1) == 1) { + while (dds_take(cs->sub->enth, &wrap_ptr, &info, 1, 1) == 1) { if (info.valid_data) { memset(request_header, 0, sizeof(wrap.header)); assert(sizeof(wrap.header.guid) <= sizeof(request_header->writer_guid)); @@ -2248,7 +2389,7 @@ static rmw_ret_t rmw_send_response_request( const void * ros_data) { const cdds_request_wrapper_t wrap = {header, const_cast(ros_data)}; - if (dds_write(cs->pub->pubh, static_cast(&wrap)) >= 0) { + if (dds_write(cs->pub->enth, static_cast(&wrap)) >= 0) { return RMW_RET_OK; } else { RMW_SET_ERROR_MSG("cannot publish data"); @@ -2377,13 +2518,13 @@ static rmw_ret_t rmw_init_cs( dds_qos_t * qos; if ((pubtopic = - dds_create_topic_arbitrary(node_impl->pp, pub_st, nullptr, nullptr, nullptr)) < 0) + dds_create_topic_arbitrary(node_impl->enth, pub_st, nullptr, nullptr, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create topic"); goto fail_pubtopic; } if ((subtopic = - dds_create_topic_arbitrary(node_impl->pp, sub_st, nullptr, nullptr, nullptr)) < 0) + dds_create_topic_arbitrary(node_impl->enth, sub_st, nullptr, nullptr, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create topic"); goto fail_subtopic; @@ -2393,21 +2534,21 @@ static rmw_ret_t rmw_init_cs( } dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); - if ((pub->pubh = dds_create_writer(node_impl->pub, pubtopic, qos, nullptr)) < 0) { + if ((pub->enth = dds_create_writer(node_impl->pub, pubtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; } pub->sertopic = pub_st; - if ((sub->subh = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) { + if ((sub->enth = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create reader"); goto fail_reader; } sub->sertopic = sub_st; - if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { + if ((sub->rdcondh = dds_create_readcondition(sub->enth, DDS_ANY_STATE)) < 0) { RMW_SET_ERROR_MSG("failed to create readcondition"); goto fail_readcond; } - if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { + if (dds_get_instance_handle(pub->enth, &pub->pubiid) < 0) { RMW_SET_ERROR_MSG("failed to get instance handle for writer"); goto fail_instance_handle; } @@ -2422,9 +2563,9 @@ static rmw_ret_t rmw_init_cs( fail_instance_handle: dds_delete(sub->rdcondh); fail_readcond: - dds_delete(sub->subh); + dds_delete(sub->enth); fail_reader: - dds_delete(pub->pubh); + dds_delete(pub->enth); fail_writer: dds_delete_qos(qos); fail_qos: @@ -2440,8 +2581,8 @@ static void rmw_fini_cs(CddsCS * cs) ddsi_sertopic_unref(cs->sub->sertopic); ddsi_sertopic_unref(cs->pub->sertopic); dds_delete(cs->sub->rdcondh); - dds_delete(cs->sub->subh); - dds_delete(cs->pub->pubh); + dds_delete(cs->sub->enth); + dds_delete(cs->pub->enth); } extern "C" rmw_client_t * rmw_create_client( @@ -2539,7 +2680,9 @@ static rmw_ret_t do_for_node( std::function oper) { dds_entity_t rd; - if ((rd = dds_create_reader(node_impl->pp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) { + if ((rd = dds_create_reader(node_impl->enth, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, + NULL, NULL)) < 0) + { RMW_SET_ERROR_MSG("rmw_get_node_names: failed to create reader"); return RMW_RET_ERROR; } @@ -2663,7 +2806,7 @@ static rmw_ret_t rmw_collect_tptyp_for_kind( builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); dds_entity_t rd; - if ((rd = dds_create_reader(node_impl->pp, builtin_topic, NULL, NULL)) < 0) { + if ((rd = dds_create_reader(node_impl->enth, builtin_topic, NULL, NULL)) < 0) { RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind failed to create reader"); return RMW_RET_ERROR; } @@ -2956,8 +3099,8 @@ extern "C" rmw_ret_t rmw_service_server_is_available( auto info = static_cast(client->data); dds_publication_matched_status_t ps; dds_subscription_matched_status_t cs; - if (dds_get_publication_matched_status(info->client.pub->pubh, &ps) < 0 || - dds_get_subscription_matched_status(info->client.sub->subh, &cs) < 0) + if (dds_get_publication_matched_status(info->client.pub->enth, &ps) < 0 || + dds_get_subscription_matched_status(info->client.sub->enth, &cs) < 0) { RMW_SET_ERROR_MSG("rmw_service_server_is_available: get_..._matched_status failed"); return RMW_RET_ERROR; @@ -2980,7 +3123,7 @@ static rmw_ret_t rmw_count_pubs_or_subs( std::string fqtopic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", false); dds_entity_t rd; - if ((rd = dds_create_reader(node_impl->pp, builtin_topic, NULL, NULL)) < 0) { + if ((rd = dds_create_reader(node_impl->enth, builtin_topic, NULL, NULL)) < 0) { RMW_SET_ERROR_MSG("rmw_count_pubs_or_subs failed to create reader"); return RMW_RET_ERROR; }