From 7dbf4bd1a141a7faf3e2b7f2158d42ae9bda8c38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Dom=C3=ADnguez=20L=C3=B3pez?= <116071334+Mario-DL@users.noreply.github.com> Date: Tue, 29 Oct 2024 14:30:55 +0100 Subject: [PATCH 1/5] Fix destruction data-race on participant removal in intra-process (#5034) * Refs #21293: Add BB test Signed-off-by: Mario Dominguez * Refs #21293: Reinforce test to fail more frequently Signed-off-by: Mario Dominguez * Refs #21293: Add RefCountedPointer.hpp to utils Signed-off-by: Mario Dominguez * Refs #21293: Add unittests for RefCountedPointer Signed-off-by: Mario Dominguez * Refs #21293: LocalReaderPointer.hpp Signed-off-by: Mario Dominguez * Refs #21293: BaseReader aggregates LocalReaderPointer Signed-off-by: Mario Dominguez * Refs #21293: ReaderLocator aggregates LocalReaderPointer Signed-off-by: Mario Dominguez * Refs #21293: RTPSDomainImpl::find_local_reader returns a sared_ptr and properly calls local_actions_on_reader_removed() Signed-off-by: Mario Dominguez * Refs #21293: RTPSWriters properly using LocalReaderPointer::Instance when accessing local reader Signed-off-by: Mario Dominguez * Refs #21293: Linter Signed-off-by: Mario Dominguez * Refs #21293: Fix windows warnings Signed-off-by: Mario Dominguez * Refs #21293: Address Miguel's review Signed-off-by: Mario Dominguez * Refs #21293: Apply last comment Signed-off-by: Mario Dominguez * Refs #21293: NIT Signed-off-by: Mario Dominguez --------- Signed-off-by: Mario Dominguez (cherry picked from commit 456e45f25b14cdeeac8ddde7b3627323a9b0f759) # Conflicts: # include/fastdds/rtps/writer/ReaderLocator.h # include/fastdds/rtps/writer/ReaderProxy.h # src/cpp/rtps/RTPSDomain.cpp # src/cpp/rtps/RTPSDomainImpl.hpp # src/cpp/rtps/participant/RTPSParticipantImpl.cpp # src/cpp/rtps/participant/RTPSParticipantImpl.h # src/cpp/rtps/reader/BaseReader.cpp # src/cpp/rtps/reader/BaseReader.hpp # src/cpp/rtps/writer/ReaderLocator.cpp # src/cpp/rtps/writer/StatefulWriter.cpp # src/cpp/rtps/writer/StatelessWriter.cpp # test/blackbox/common/DDSBlackboxTestsBasic.cpp # test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h # test/unittest/utils/CMakeLists.txt --- include/fastdds/rtps/writer/ReaderLocator.h | 13 + include/fastdds/rtps/writer/ReaderProxy.h | 4 + src/cpp/rtps/RTPSDomain.cpp | 19 +- src/cpp/rtps/RTPSDomainImpl.hpp | 11 + .../rtps/participant/RTPSParticipantImpl.cpp | 28 +- .../rtps/participant/RTPSParticipantImpl.h | 9 + src/cpp/rtps/reader/BaseReader.cpp | 548 ++++++++++++++++++ src/cpp/rtps/reader/BaseReader.hpp | 506 ++++++++++++++++ src/cpp/rtps/reader/LocalReaderPointer.hpp | 36 ++ src/cpp/rtps/writer/ReaderLocator.cpp | 21 +- src/cpp/rtps/writer/StatefulWriter.cpp | 41 +- src/cpp/rtps/writer/StatelessWriter.cpp | 13 +- src/cpp/utils/RefCountedPointer.hpp | 218 +++++++ .../blackbox/common/DDSBlackboxTestsBasic.cpp | 111 ++++ .../fastdds/rtps/writer/ReaderLocator.h | 8 +- test/unittest/utils/CMakeLists.txt | 22 + .../unittest/utils/RefCountedPointerTests.cpp | 183 ++++++ 17 files changed, 1774 insertions(+), 17 deletions(-) create mode 100644 src/cpp/rtps/reader/BaseReader.cpp create mode 100644 src/cpp/rtps/reader/BaseReader.hpp create mode 100644 src/cpp/rtps/reader/LocalReaderPointer.hpp create mode 100644 src/cpp/utils/RefCountedPointer.hpp create mode 100644 test/unittest/utils/RefCountedPointerTests.cpp diff --git a/include/fastdds/rtps/writer/ReaderLocator.h b/include/fastdds/rtps/writer/ReaderLocator.h index 49a697fd9e4..a78ec768789 100644 --- a/include/fastdds/rtps/writer/ReaderLocator.h +++ b/include/fastdds/rtps/writer/ReaderLocator.h @@ -27,6 +27,8 @@ #include #include +#include + namespace eprosima { namespace fastrtps { namespace rtps { @@ -69,10 +71,17 @@ class ReaderLocator : public RTPSMessageSenderInterface return is_local_reader_; } +<<<<<<< HEAD:include/fastdds/rtps/writer/ReaderLocator.h RTPSReader* local_reader(); void local_reader( RTPSReader* local_reader) +======= + LocalReaderPointer::Instance local_reader(); + + void local_reader( + std::shared_ptr local_reader) +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/writer/ReaderLocator.hpp { local_reader_ = local_reader; } @@ -260,7 +269,11 @@ class ReaderLocator : public RTPSMessageSenderInterface LocatorSelectorEntry async_locator_info_; bool expects_inline_qos_; bool is_local_reader_; +<<<<<<< HEAD:include/fastdds/rtps/writer/ReaderLocator.h RTPSReader* local_reader_; +======= + std::shared_ptr local_reader_; +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/writer/ReaderLocator.hpp std::vector guid_prefix_as_vector_; std::vector guid_as_vector_; IDataSharingNotifier* datasharing_notifier_; diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index 2610e988ce2..a53ffb0be76 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -294,7 +294,11 @@ class ReaderProxy * Get the local reader on the same process (if any). * @return The local reader on the same process. */ +<<<<<<< HEAD:include/fastdds/rtps/writer/ReaderProxy.h inline RTPSReader* local_reader() +======= + inline LocalReaderPointer::Instance local_reader() +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/writer/ReaderProxy.hpp { return locator_info_.local_reader(); } diff --git a/src/cpp/rtps/RTPSDomain.cpp b/src/cpp/rtps/RTPSDomain.cpp index 20e48e89e5f..967a722bee2 100644 --- a/src/cpp/rtps/RTPSDomain.cpp +++ b/src/cpp/rtps/RTPSDomain.cpp @@ -32,6 +32,19 @@ #include #include +<<<<<<< HEAD +======= +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) #include #include #include @@ -777,7 +790,11 @@ RTPSParticipantImpl* RTPSDomainImpl::find_local_participant( return nullptr; } +<<<<<<< HEAD RTPSReader* RTPSDomainImpl::find_local_reader( +======= +std::shared_ptr RTPSDomainImpl::find_local_reader( +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) const GUID_t& reader_guid) { auto instance = get_instance(); @@ -791,7 +808,7 @@ RTPSReader* RTPSDomainImpl::find_local_reader( } } - return nullptr; + return std::shared_ptr(nullptr); } RTPSWriter* RTPSDomainImpl::find_local_writer( diff --git a/src/cpp/rtps/RTPSDomainImpl.hpp b/src/cpp/rtps/RTPSDomainImpl.hpp index 1f736cd66f2..9b76dd45c70 100644 --- a/src/cpp/rtps/RTPSDomainImpl.hpp +++ b/src/cpp/rtps/RTPSDomainImpl.hpp @@ -29,6 +29,13 @@ #include #include +<<<<<<< HEAD +======= +#include +#include +#include +#include +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) #include #include @@ -173,7 +180,11 @@ class RTPSDomainImpl * * @returns A pointer to a local reader given its endpoint guid, or nullptr if not found. */ +<<<<<<< HEAD static RTPSReader* find_local_reader( +======= + static std::shared_ptr find_local_reader( +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) const GUID_t& reader_guid); /** diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 921257b3926..f93b7d7a736 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -1452,7 +1452,11 @@ bool RTPSParticipantImpl::createReader( return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback); } +<<<<<<< HEAD RTPSReader* RTPSParticipantImpl::find_local_reader( +======= +std::shared_ptr RTPSParticipantImpl::find_local_reader( +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) const GUID_t& reader_guid) { shared_lock _(endpoints_list_mutex); @@ -1461,11 +1465,11 @@ RTPSReader* RTPSParticipantImpl::find_local_reader( { if (reader->getGuid() == reader_guid) { - return reader; + return reader->get_local_pointer(); } } - return nullptr; + return std::shared_ptr(); } RTPSWriter* RTPSParticipantImpl::find_local_writer( @@ -2099,6 +2103,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( bool found = false, found_in_users = false; Endpoint* p_endpoint = nullptr; + BaseReader* reader = nullptr; if (endpoint.entityId.is_writer()) { @@ -2133,6 +2138,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( { if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it { + reader = *rit; m_userReaderList.erase(rit); found_in_users = true; break; @@ -2143,6 +2149,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( { if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it { + reader = *rit; p_endpoint = *rit; m_allReaderList.erase(rit); found = true; @@ -2201,6 +2208,10 @@ bool RTPSParticipantImpl::deleteUserEndpoint( #endif // if HAVE_SECURITY } + if (reader) + { + reader->local_actions_on_reader_removed(); + } delete(p_endpoint); return true; } @@ -2288,6 +2299,11 @@ void RTPSParticipantImpl::deleteAllUserEndpoints() } #endif // if HAVE_SECURITY + if (kind == READER) + { + static_cast(endpoint)->local_actions_on_reader_removed(); + } + // remove the endpoints delete(endpoint); } @@ -2988,8 +3004,16 @@ bool RTPSParticipantImpl::register_in_reader( } else if (!fastdds::statistics::is_statistics_builtin(reader_guid.entityId)) { +<<<<<<< HEAD RTPSReader* reader = find_local_reader(reader_guid); res = reader->add_statistics_listener(listener); +======= + LocalReaderPointer::Instance local_reader(find_local_reader(reader_guid)); + if (local_reader) + { + res = local_reader->add_statistics_listener(listener); + } +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } return res; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 25059076623..8eb51f1a9da 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -55,6 +55,11 @@ #include #include #include +<<<<<<< HEAD:src/cpp/rtps/participant/RTPSParticipantImpl.h +======= +#include +#include +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/participant/RTPSParticipantImpl.hpp #include #include #include @@ -479,7 +484,11 @@ class RTPSParticipantImpl /*** * @returns A pointer to a local reader given its endpoint guid, or nullptr if not found. */ +<<<<<<< HEAD:src/cpp/rtps/participant/RTPSParticipantImpl.h RTPSReader* find_local_reader( +======= + std::shared_ptr find_local_reader( +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/participant/RTPSParticipantImpl.hpp const GUID_t& reader_guid); /*** diff --git a/src/cpp/rtps/reader/BaseReader.cpp b/src/cpp/rtps/reader/BaseReader.cpp new file mode 100644 index 00000000000..4beb9f2e807 --- /dev/null +++ b/src/cpp/rtps/reader/BaseReader.cpp @@ -0,0 +1,548 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* + * BaseReader.cpp + */ + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +BaseReader::BaseReader( + fastdds::rtps::RTPSParticipantImpl* pimpl, + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::ReaderAttributes& att, + fastdds::rtps::ReaderHistory* hist, + fastdds::rtps::ReaderListener* listen) + : fastdds::rtps::RTPSReader(pimpl, guid, att, hist) + , listener_(listen) + , accept_messages_from_unkown_writers_(att.accept_messages_from_unkown_writers) + , expects_inline_qos_(att.expects_inline_qos) + , history_state_(new fastdds::rtps::ReaderHistoryState(att.matched_writers_allocation.initial)) + , liveliness_kind_(att.liveliness_kind) + , liveliness_lease_duration_(att.liveliness_lease_duration) +{ + PoolConfig cfg = PoolConfig::from_history_attributes(hist->m_att); + std::shared_ptr change_pool; + std::shared_ptr payload_pool; + payload_pool = BasicPayloadPool::get(cfg, change_pool); + + init(payload_pool, change_pool); + setup_datasharing(att); +} + +BaseReader::BaseReader( + fastdds::rtps::RTPSParticipantImpl* pimpl, + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::ReaderAttributes& att, + const std::shared_ptr& payload_pool, + fastdds::rtps::ReaderHistory* hist, + fastdds::rtps::ReaderListener* listen) + : BaseReader( + pimpl, guid, att, payload_pool, + std::make_shared(PoolConfig::from_history_attributes(hist->m_att)), + hist, listen) +{ +} + +BaseReader::BaseReader( + fastdds::rtps::RTPSParticipantImpl* pimpl, + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::ReaderAttributes& att, + const std::shared_ptr& payload_pool, + const std::shared_ptr& change_pool, + fastdds::rtps::ReaderHistory* hist, + fastdds::rtps::ReaderListener* listen) + : fastdds::rtps::RTPSReader(pimpl, guid, att, hist) + , listener_(listen) + , accept_messages_from_unkown_writers_(att.accept_messages_from_unkown_writers) + , expects_inline_qos_(att.expects_inline_qos) + , history_state_(new fastdds::rtps::ReaderHistoryState(att.matched_writers_allocation.initial)) + , liveliness_kind_(att.liveliness_kind) + , liveliness_lease_duration_(att.liveliness_lease_duration) +{ + init(payload_pool, change_pool); + setup_datasharing(att); +} + +void BaseReader::local_actions_on_reader_removed() +{ + local_ptr_->deactivate(); +} + +BaseReader::~BaseReader() +{ + EPROSIMA_LOG_INFO(RTPS_READER, "Removing reader " << this->getGuid().entityId); + + for (auto it = history_->changesBegin(); it != history_->changesEnd(); ++it) + { + release_cache(*it); + } + + delete history_state_; + + // As releasing the change pool will delete the cache changes it owns, + // the payload pool may be called to release their payloads, so we should + // ensure that the payload pool is destroyed after the change pool. + change_pool_.reset(); + payload_pool_.reset(); +} + +bool BaseReader::matched_writer_add( + const PublicationBuiltinTopicData& info) +{ + const auto& alloc = mp_RTPSParticipant->get_attributes().allocation; + WriterProxyData wdata( + alloc.locators.max_unicast_locators, + alloc.locators.max_multicast_locators, + alloc.data_limits); + + from_builtin_to_proxy(info, wdata); + return matched_writer_add_edp(wdata); +} + +ReaderListener* BaseReader::get_listener() const +{ + std::lock_guard lock(mp_mutex); + return listener_; +} + +void BaseReader::set_listener( + ReaderListener* target) +{ + std::lock_guard lock(mp_mutex); + listener_ = target; +} + +bool BaseReader::expects_inline_qos() const +{ + return expects_inline_qos_; +} + +ReaderHistory* BaseReader::get_history() const +{ + return history_; +} + +//! @return The content filter associated to this reader. +IReaderDataFilter* BaseReader::get_content_filter() const +{ + std::lock_guard lock(mp_mutex); + return data_filter_; +} + +//! Set the content filter associated to this reader. +//! @param filter Pointer to the content filter to associate to this reader. +void BaseReader::set_content_filter( + IReaderDataFilter* filter) +{ + std::lock_guard lock(mp_mutex); + data_filter_ = filter; +} + +uint64_t BaseReader::get_unread_count() const +{ + std::lock_guard lock(mp_mutex); + return total_unread_; +} + +uint64_t BaseReader::get_unread_count( + bool mark_as_read) +{ + std::lock_guard lock(mp_mutex); + uint64_t ret_val = total_unread_; + + if (mark_as_read) + { + for (auto it = history_->changesBegin(); 0 < total_unread_ && it != history_->changesEnd(); ++it) + { + fastdds::rtps::CacheChange_t* change = *it; + if (!change->isRead && get_last_notified(change->writerGUID) >= change->sequenceNumber) + { + change->isRead = true; + assert(0 < total_unread_); + --total_unread_; + } + } + assert(0 == total_unread_); + } + return ret_val; +} + +bool BaseReader::wait_for_unread_cache( + const eprosima::fastdds::dds::Duration_t& timeout) +{ + auto time_out = std::chrono::steady_clock::now() + std::chrono::seconds(timeout.seconds) + + std::chrono::nanoseconds(timeout.nanosec); + +#if HAVE_STRICT_REALTIME + std::unique_lock lock(mp_mutex, std::defer_lock); + if (lock.try_lock_until(time_out)) +#else + std::unique_lock lock(mp_mutex); +#endif // HAVE_STRICT_REALTIME + { + if (new_notification_cv_.wait_until( + lock, time_out, + [&]() + { + return total_unread_ > 0; + })) + { + return true; + } + } + + return false; +} + +bool BaseReader::is_sample_valid( + const void* data, + const fastdds::rtps::GUID_t& writer, + const fastdds::rtps::SequenceNumber_t& sn) const +{ + if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched(writer)) + { + // Check if the payload is dirty + // Note the Payloads used in loans include a mandatory RTPS 2.3 extra header + auto payload = static_cast(data); + payload -= fastdds::rtps::SerializedPayload_t::representation_header_size; + if (!fastdds::rtps::DataSharingPayloadPool::check_sequence_number(payload, sn)) + { + return false; + } + } + return true; +} + +BaseReader* BaseReader::downcast( + fastdds::rtps::RTPSReader* reader) +{ + assert(nullptr != dynamic_cast(reader)); + return static_cast(reader); +} + +BaseReader* BaseReader::downcast( + fastdds::rtps::Endpoint* endpoint) +{ + assert(nullptr != dynamic_cast(endpoint)); + return static_cast(endpoint); +} + +void BaseReader::allow_unknown_writers() +{ + assert(fastdds::rtps::EntityId_t::unknown() != trusted_writer_entity_id_); + accept_messages_from_unkown_writers_ = true; +} + +std::shared_ptr BaseReader::get_local_pointer() +{ + return local_ptr_; +} + +bool BaseReader::reserve_cache( + uint32_t cdr_payload_size, + fastdds::rtps::CacheChange_t*& change) +{ + std::lock_guard guard(mp_mutex); + + change = nullptr; + + fastdds::rtps::CacheChange_t* reserved_change = nullptr; + if (!change_pool_->reserve_cache(reserved_change)) + { + EPROSIMA_LOG_WARNING(RTPS_READER, "Problem reserving cache from pool"); + return false; + } + + uint32_t payload_size = fixed_payload_size_ ? fixed_payload_size_ : cdr_payload_size; + if (!payload_pool_->get_payload(payload_size, reserved_change->serializedPayload)) + { + change_pool_->release_cache(reserved_change); + EPROSIMA_LOG_WARNING(RTPS_READER, "Problem reserving payload from pool"); + return false; + } + + change = reserved_change; + return true; +} + +void BaseReader::release_cache( + fastdds::rtps::CacheChange_t* change) +{ + std::lock_guard guard(mp_mutex); + + fastdds::rtps::IPayloadPool* pool = change->serializedPayload.payload_owner; + if (pool) + { + pool->release_payload(change->serializedPayload); + } + change_pool_->release_cache(change); +} + +void BaseReader::update_liveliness_changed_status( + const fastdds::rtps::GUID_t& writer, + int32_t alive_change, + int32_t not_alive_change) +{ + std::lock_guard lock(mp_mutex); + + liveliness_changed_status_.alive_count += alive_change; + liveliness_changed_status_.alive_count_change += alive_change; + liveliness_changed_status_.not_alive_count += not_alive_change; + liveliness_changed_status_.not_alive_count_change += not_alive_change; + liveliness_changed_status_.last_publication_handle = writer; + + if (nullptr != listener_) + { + listener_->on_liveliness_changed(this, liveliness_changed_status_); + + liveliness_changed_status_.alive_count_change = 0; + liveliness_changed_status_.not_alive_count_change = 0; + } +} + +#ifdef FASTDDS_STATISTICS + +bool BaseReader::add_statistics_listener( + std::shared_ptr listener) +{ + return add_statistics_listener_impl(listener); +} + +bool BaseReader::remove_statistics_listener( + std::shared_ptr listener) +{ + return remove_statistics_listener_impl(listener); +} + +void BaseReader::set_enabled_statistics_writers_mask( + uint32_t enabled_writers) +{ + set_enabled_statistics_writers_mask_impl(enabled_writers); +} + +#endif // FASTDDS_STATISTICS + +bool BaseReader::may_remove_history_record( + bool removed_by_lease) +{ + return !removed_by_lease; +} + +void BaseReader::add_persistence_guid( + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::GUID_t& persistence_guid) +{ + if (fastdds::rtps::c_Guid_Unknown == persistence_guid || persistence_guid == guid) + { + std::lock_guard guard(mp_mutex); + history_state_->persistence_guid_map[guid] = guid; + history_state_->persistence_guid_count[guid]++; + } + else + { + std::lock_guard guard(mp_mutex); + history_state_->persistence_guid_map[guid] = persistence_guid; + history_state_->persistence_guid_count[persistence_guid]++; + + // Could happen that a value has already been stored in the record with the guid and not the persistence guid + // This is because received_change is called before Proxy is created + // In this case, we substitute the guid for the persistence (in case they are not equal) + auto spourious_record = history_state_->history_record.find(guid); + if (spourious_record != history_state_->history_record.end()) + { + EPROSIMA_LOG_INFO(RTPS_READER, "Sporious record found, changing guid " + << guid << " for persistence guid " << persistence_guid); + update_last_notified(guid, spourious_record->second); + history_state_->history_record.erase(spourious_record); + } + } +} + +void BaseReader::remove_persistence_guid( + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::GUID_t& persistence_guid, + bool removed_by_lease) +{ + std::lock_guard guard(mp_mutex); + auto persistence_guid_stored = (fastdds::rtps::c_Guid_Unknown == persistence_guid) ? guid : persistence_guid; + history_state_->persistence_guid_map.erase(guid); + auto count = --history_state_->persistence_guid_count[persistence_guid_stored]; + if (count <= 0 && may_remove_history_record(removed_by_lease)) + { + history_state_->history_record.erase(persistence_guid_stored); + history_state_->persistence_guid_count.erase(persistence_guid_stored); + } +} + +fastdds::rtps::SequenceNumber_t BaseReader::get_last_notified( + const fastdds::rtps::GUID_t& guid) +{ + fastdds::rtps::SequenceNumber_t ret_val; + std::lock_guard guard(mp_mutex); + fastdds::rtps::GUID_t guid_to_look = guid; + auto p_guid = history_state_->persistence_guid_map.find(guid); + if (p_guid != history_state_->persistence_guid_map.end()) + { + guid_to_look = p_guid->second; + } + + auto p_seq = history_state_->history_record.find(guid_to_look); + if (p_seq != history_state_->history_record.end()) + { + ret_val = p_seq->second; + } + + return ret_val; +} + +fastdds::rtps::SequenceNumber_t BaseReader::update_last_notified( + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::SequenceNumber_t& seq) +{ + fastdds::rtps::SequenceNumber_t ret_val; + std::lock_guard guard(mp_mutex); + fastdds::rtps::GUID_t guid_to_look = guid; + auto p_guid = history_state_->persistence_guid_map.find(guid); + if (p_guid != history_state_->persistence_guid_map.end()) + { + guid_to_look = p_guid->second; + } + + auto p_seq = history_state_->history_record.find(guid_to_look); + if (p_seq != history_state_->history_record.end()) + { + ret_val = p_seq->second; + } + + if (ret_val < seq) + { + history_state_->history_record[guid_to_look] = seq; + persist_last_notified_nts(guid_to_look, seq); + new_notification_cv_.notify_all(); + } + + return ret_val; +} + +void BaseReader::persist_last_notified_nts( + const fastdds::rtps::GUID_t& peristence_guid, + const fastdds::rtps::SequenceNumber_t& seq) +{ + // Empty base implementation since base behavior is to not persist data + static_cast(peristence_guid); + static_cast(seq); +} + +bool BaseReader::is_datasharing_compatible_with( + const fastdds::rtps::WriterProxyData& wdata) +{ + if (!is_datasharing_compatible_ || + wdata.m_qos.data_sharing.kind() == fastdds::dds::DataSharingKind::OFF) + { + return false; + } + + for (auto id : wdata.m_qos.data_sharing.domain_ids()) + { + if (std::find(m_att.data_sharing_configuration().domain_ids().begin(), + m_att.data_sharing_configuration().domain_ids().end(), id) + != m_att.data_sharing_configuration().domain_ids().end()) + { + return true; + } + } + + return false; +} + +void BaseReader::init( + const std::shared_ptr& payload_pool, + const std::shared_ptr& change_pool) +{ + payload_pool_ = payload_pool; + change_pool_ = change_pool; + fixed_payload_size_ = 0; + if (history_->m_att.memoryPolicy == PREALLOCATED_MEMORY_MODE) + { + fixed_payload_size_ = history_->m_att.payloadMaxSize; + } + + local_ptr_ = std::make_shared(this); + + EPROSIMA_LOG_INFO(RTPS_READER, "RTPSReader created correctly"); +} + +void BaseReader::setup_datasharing( + const fastdds::rtps::ReaderAttributes& att) +{ + + if (att.endpoint.data_sharing_configuration().kind() != fastdds::dds::DataSharingKind::OFF) + { + using std::placeholders::_1; + std::shared_ptr notification = DataSharingNotification::create_notification( + getGuid(), att.endpoint.data_sharing_configuration().shm_directory()); + if (notification) + { + is_datasharing_compatible_ = true; + datasharing_listener_.reset(new DataSharingListener( + notification, + att.endpoint.data_sharing_configuration().shm_directory(), + att.data_sharing_listener_thread, + att.matched_writers_allocation, + this)); + + // We can start the listener here, as no writer can be matched already, + // so no notification will occur until the non-virtual instance is constructed. + // But we need to stop the listener in the non-virtual instance destructor. + datasharing_listener_->start(); + } + } +} + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/rtps/reader/BaseReader.hpp b/src/cpp/rtps/reader/BaseReader.hpp new file mode 100644 index 00000000000..596e335ce53 --- /dev/null +++ b/src/cpp/rtps/reader/BaseReader.hpp @@ -0,0 +1,506 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file BaseReader.hpp + */ + +#ifndef FASTDDS_RTPS_READER__BASEREADER_HPP +#define FASTDDS_RTPS_READER__BASEREADER_HPP + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace eprosima { + +namespace fastdds { +namespace rtps { + +struct CacheChange_t; +class IDataSharingListener; +struct ReaderHistoryState; +class ReaderListener; +class RTPSParticipantImpl; +class WriterProxy; +class WriterProxyData; + +class BaseReader + : public fastdds::rtps::RTPSReader + , public fastdds::statistics::StatisticsReaderImpl +{ + +public: + + bool matched_writer_add( + const PublicationBuiltinTopicData& info) final; + + /** + * @brief Add a matched writer represented by its attributes. + * + * @param wdata Discovery information regarding the writer to add. + * + * @return True if correctly added. + */ + virtual bool matched_writer_add_edp( + const WriterProxyData& wdata) = 0; + + fastdds::rtps::ReaderListener* get_listener() const override; + + void set_listener( + fastdds::rtps::ReaderListener* listener) override; + + bool expects_inline_qos() const override; + + fastdds::rtps::ReaderHistory* get_history() const override; + + IReaderDataFilter* get_content_filter() const override; + + void set_content_filter( + IReaderDataFilter* filter) override; + + uint64_t get_unread_count() const override; + + uint64_t get_unread_count( + bool mark_as_read) override; + + bool wait_for_unread_cache( + const fastdds::dds::Duration_t& timeout) override; + + bool is_sample_valid( + const void* data, + const fastdds::rtps::GUID_t& writer, + const fastdds::rtps::SequenceNumber_t& sn) const override; + + /** + * @brief Get a pointer to a BaseReader object from a RTPSReader pointer. + * + * @param reader Pointer to the RTPSReader object. + * + * @return Pointer to the BaseReader object. + */ + static BaseReader* downcast( + fastdds::rtps::RTPSReader* reader); + + /** + * @brief Get a pointer to a BaseReader object from a Endpoint pointer. + * + * @param endpoint Pointer to the Endpoint object. + * + * @return Pointer to the BaseReader object. + */ + static BaseReader* downcast( + fastdds::rtps::Endpoint* endpoint); + + /** + * @brief Set the entity ID of the trusted writer. + * + * @param writer Entity ID of the trusted writer. + */ + void set_trusted_writer( + const fastdds::rtps::EntityId_t& writer) + { + accept_messages_from_unkown_writers_ = false; + trusted_writer_entity_id_ = writer; + } + + /** + * @brief Allow reception ALIVE changes from non-matched writers. + */ + void allow_unknown_writers(); + + /** + * @return The liveliness kind of this reader + */ + fastdds::dds::LivelinessQosPolicyKind liveliness_kind() const + { + return liveliness_kind_; + } + + /** + * @return The liveliness lease duration of this reader + */ + fastdds::dds::Duration_t liveliness_lease_duration() const + { + return liveliness_lease_duration_; + } + + /** + * @return the datasharing listener associated with this reader. + */ + const std::unique_ptr& datasharing_listener() const + { + return datasharing_listener_; + } + + /** + * @brief Retrieves the local pointer to this reader + * to be used by other local entities. + * + * @return Local pointer to this reader. + */ + std::shared_ptr get_local_pointer(); + + /** + * @brief Reserve a CacheChange_t. + * + * @param [in] cdr_payload_size Size of the received payload. + * @param [out] change Pointer to the reserved change. + * + * @return True if correctly reserved. + */ + bool reserve_cache( + uint32_t cdr_payload_size, + fastdds::rtps::CacheChange_t*& change); + + /** + * @brief Release a CacheChange_t. + * + * @param change Pointer to the change to release. + */ + void release_cache( + fastdds::rtps::CacheChange_t* change); + + /** + * @brief Method to notify the reader that a change has been removed from its history. + * + * @param change Pointer to the CacheChange_t that was removed from the history. + * + * @return True if correctly removed. + */ + virtual bool change_removed_by_history( + fastdds::rtps::CacheChange_t* change) = 0; + + /** + * @brief Called just before a change is going to be deserialized. + * + * @param [in] change Pointer to the change being accessed. + * @param [out] writer Writer proxy the @c change belongs to. + * @param [out] is_future_change Whether the change is in the future (i.e. there are + * earlier unreceived changes from the same writer). + * + * @return Whether the change is still valid or not. + */ + virtual bool begin_sample_access_nts( + fastdds::rtps::CacheChange_t* change, + fastdds::rtps::WriterProxy*& writer, + bool& is_future_change) = 0; + + /** + * @brief Called after the change has been deserialized. + * + * @param [in] change Pointer to the change being accessed. + * @param [in] writer Writer proxy the @c change belongs to. + * @param [in] mark_as_read Whether the @c change should be marked as read or not. + */ + virtual void end_sample_access_nts( + fastdds::rtps::CacheChange_t* change, + fastdds::rtps::WriterProxy*& writer, + bool mark_as_read) = 0; + + /** + * @brief A method to update the liveliness changed status of the reader + * + * @param writer The writer changing liveliness, specified by its guid + * @param alive_change The change requested for alive count. Should be -1, 0 or +1 + * @param not_alive_change The change requested for not alive count. Should be -1, 0 or +1 + */ + void update_liveliness_changed_status( + const fastdds::rtps::GUID_t& writer, + int32_t alive_change, + int32_t not_alive_change); + + /** + * @brief Process an incoming DATA message. + * + * @param change Pointer to the incoming CacheChange_t. + * + * @return true if the reader accepts message. + */ + virtual bool process_data_msg( + fastdds::rtps::CacheChange_t* change) = 0; + + /** + * @brief Process an incoming DATA_FRAG message. + * + * @param change Pointer to the incoming CacheChange_t. + * @param sampleSize Size of the complete, assembled message. + * @param fragmentStartingNum Starting number of this particular message. + * @param fragmentsInSubmessage Number of fragments on this particular message. + * + * @return true if the reader accepts message. + */ + virtual bool process_data_frag_msg( + fastdds::rtps::CacheChange_t* change, + uint32_t sampleSize, + uint32_t fragmentStartingNum, + uint16_t fragmentsInSubmessage) = 0; + + /** + * @brief Process an incoming HEARTBEAT message. + * + * @param writerGUID + * @param hbCount + * @param firstSN + * @param lastSN + * @param finalFlag + * @param livelinessFlag + * @param origin_vendor_id + * + * @return true if the reader accepts message. + */ + virtual bool process_heartbeat_msg( + const fastdds::rtps::GUID_t& writerGUID, + uint32_t hbCount, + const fastdds::rtps::SequenceNumber_t& firstSN, + const fastdds::rtps::SequenceNumber_t& lastSN, + bool finalFlag, + bool livelinessFlag, + VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0; + + /** + * @brief Process an incoming GAP message. + * + * @param writerGUID + * @param gapStart + * @param gapList + * @param origin_vendor_id + * + * @return true if the reader accepts message. + */ + virtual bool process_gap_msg( + const fastdds::rtps::GUID_t& writerGUID, + const fastdds::rtps::SequenceNumber_t& gapStart, + const fastdds::rtps::SequenceNumberSet_t& gapList, + VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0; + + /** + * @brief Waits for not being referenced/used by any other entity. + */ + virtual void local_actions_on_reader_removed(); + +#ifdef FASTDDS_STATISTICS + + bool add_statistics_listener( + std::shared_ptr listener) override; + + bool remove_statistics_listener( + std::shared_ptr listener) override; + + void set_enabled_statistics_writers_mask( + uint32_t enabled_writers) override; + +#endif // FASTDDS_STATISTICS + + virtual ~BaseReader(); + +protected: + + BaseReader( + fastdds::rtps::RTPSParticipantImpl* pimpl, + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::ReaderAttributes& att, + fastdds::rtps::ReaderHistory* hist, + fastdds::rtps::ReaderListener* listen); + + BaseReader( + fastdds::rtps::RTPSParticipantImpl* pimpl, + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::ReaderAttributes& att, + const std::shared_ptr& payload_pool, + fastdds::rtps::ReaderHistory* hist, + fastdds::rtps::ReaderListener* listen); + + BaseReader( + fastdds::rtps::RTPSParticipantImpl* pimpl, + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::ReaderAttributes& att, + const std::shared_ptr& payload_pool, + const std::shared_ptr& change_pool, + fastdds::rtps::ReaderHistory* hist, + fastdds::rtps::ReaderListener* listen); + + /** + * @brief Whether a history record may be removed. + * + * @param removed_by_lease Whether the history record is to be removed due to a participant drop. + * + * @return Whether the history record may be removed. + */ + virtual bool may_remove_history_record( + bool removed_by_lease); + + /** + * @brief Add a remote writer to the persistence_guid map. + * + * @param guid GUID of the remote writer. + * @param persistence_guid Persistence GUID of the remote writer. + */ + void add_persistence_guid( + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::GUID_t& persistence_guid); + + /** + * @brief Remove a remote writer from the persistence_guid map. + * + * @param guid GUID of the remote writer. + * @param persistence_guid Persistence GUID of the remote writer. + * @param removed_by_lease Whether the GUIDs are being removed due to a participant drop. + */ + void remove_persistence_guid( + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::GUID_t& persistence_guid, + bool removed_by_lease); + + /** + * @brief Get the last notified sequence for a writer's GUID. + * + * @param guid The writer GUID to query. + * + * @return Last notified sequence number for input guid. + * @remarks Takes persistence_guid into consideration. + */ + fastdds::rtps::SequenceNumber_t get_last_notified( + const fastdds::rtps::GUID_t& guid); + + /** + * @brief Update the last notified sequence for a writer's GUID. + * + * @param guid The GUID of the writer. + * @param seq Max sequence number available on writer. + * + * @return Previous value of last notified sequence number for input GUID. + * @remarks Takes persistence_guid into consideration. + */ + fastdds::rtps::SequenceNumber_t update_last_notified( + const fastdds::rtps::GUID_t& guid, + const fastdds::rtps::SequenceNumber_t& seq); + + /** + * @brief Persist the last notified sequence for a persistence guid. + * This method is called inside update_last_notified just after updating the last notified sequence for a writer + * and gives persistent readers the opportunity to write the new sequence number to the database. + * + * @param persistence_guid The persistence guid to update. + * @param seq Sequence number to set for input guid. + */ + virtual void persist_last_notified_nts( + const fastdds::rtps::GUID_t& persistence_guid, + const fastdds::rtps::SequenceNumber_t& seq); + + /** + * @brief Check if a writer can communicate with this reader using data-sharing. + * + * @param wdata Discovery information of the writer to check. + * + * @return Whether the writer is datasharing compatible with this reader or not. + */ + bool is_datasharing_compatible_with( + const fastdds::rtps::WriterProxyData& wdata); + + /// Pool of serialized payloads. + std::shared_ptr payload_pool_; + + /// Pool of cache changes. + std::shared_ptr change_pool_; + + /// Pointer to the listener associated with this reader. + fastdds::rtps::ReaderListener* listener_; + /// Whether the reader accepts messages from unmatched writers. + bool accept_messages_from_unkown_writers_; + /// Whether the reader expects inline QoS. + bool expects_inline_qos_; + + /// The data filter associated with this reader. + IReaderDataFilter* data_filter_ = nullptr; + + /// The history record associated with this reader. + fastdds::rtps::ReaderHistoryState* history_state_; + + /// Total number of unread samples in the history. + uint64_t total_unread_ = 0; + /// Condition variable to wait for unread samples. + fastdds::TimedConditionVariable new_notification_cv_; + + /// The liveliness kind of this reader. + fastdds::dds::LivelinessQosPolicyKind liveliness_kind_; + /// The liveliness lease duration of this reader. + fastdds::dds::Duration_t liveliness_lease_duration_; + + /// Whether the reader is datasharing compatible. + bool is_datasharing_compatible_ = false; + /// The listener for the datasharing notifications. + std::unique_ptr datasharing_listener_; + + /// The liveliness changed status struct as defined in the DDS standard. + fastdds::dds::LivelinessChangedStatus liveliness_changed_status_; + + /// Trusted writer (for Builtin) + fastdds::rtps::EntityId_t trusted_writer_entity_id_; + + /// RefCountedPointer of this instance. + std::shared_ptr local_ptr_; + +private: + + /** + * @brief Perform pools related setup. + * This method is called from the constructor to perform the necessary setup for the payload and change pools. + * + * @param payload_pool Payload pool to use. + * @param change_pool Change pool to use. + */ + void init( + const std::shared_ptr& payload_pool, + const std::shared_ptr& change_pool); + + /** + * @brief Perform datasharing related setup. + * This method is called from the constructor to perform the necessary setup for datasharing. + * If datasharing is enabled in the reader attributes, the method will create the necessary notification + * segment, along with the corresponding listener. + * + * @param att Attributes of the reader. + */ + void setup_datasharing( + const fastdds::rtps::ReaderAttributes& att); + +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // FASTDDS_RTPS_READER__BASEREADER_HPP diff --git a/src/cpp/rtps/reader/LocalReaderPointer.hpp b/src/cpp/rtps/reader/LocalReaderPointer.hpp new file mode 100644 index 00000000000..7e16177907d --- /dev/null +++ b/src/cpp/rtps/reader/LocalReaderPointer.hpp @@ -0,0 +1,36 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file LocalReaderPointer.hpp + */ + +#ifndef FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP +#define FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP + +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +class BaseReader; + +using LocalReaderPointer = RefCountedPointer; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP diff --git a/src/cpp/rtps/writer/ReaderLocator.cpp b/src/cpp/rtps/writer/ReaderLocator.cpp index cd847f185b7..3548591e47a 100644 --- a/src/cpp/rtps/writer/ReaderLocator.cpp +++ b/src/cpp/rtps/writer/ReaderLocator.cpp @@ -42,7 +42,7 @@ ReaderLocator::ReaderLocator( , async_locator_info_(max_unicast_locators, max_multicast_locators) , expects_inline_qos_(false) , is_local_reader_(false) - , local_reader_(nullptr) + , local_reader_() , guid_prefix_as_vector_(1u) , guid_as_vector_(1u) , datasharing_notifier_(nullptr) @@ -81,7 +81,7 @@ bool ReaderLocator::start( is_local_reader_ = RTPSDomainImpl::should_intraprocess_between(owner_->getGuid(), remote_guid); is_datasharing &= !is_local_reader_; - local_reader_ = nullptr; + local_reader_.reset(); if (!is_local_reader_ && !is_datasharing) { @@ -174,7 +174,7 @@ void ReaderLocator::stop() guid_prefix_as_vector_.at(0) = c_GuidPrefix_Unknown; expects_inline_qos_ = false; is_local_reader_ = false; - local_reader_ = nullptr; + local_reader_.reset(); } bool ReaderLocator::send( @@ -202,13 +202,17 @@ bool ReaderLocator::send( return true; } +<<<<<<< HEAD RTPSReader* ReaderLocator::local_reader() +======= +LocalReaderPointer::Instance ReaderLocator::local_reader() +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) { if (!local_reader_) { local_reader_ = RTPSDomainImpl::find_local_reader(general_locator_info_.remote_guid); } - return local_reader_; + return LocalReaderPointer::Instance(local_reader_); } bool ReaderLocator::is_datasharing_reader() const @@ -218,15 +222,22 @@ bool ReaderLocator::is_datasharing_reader() const void ReaderLocator::datasharing_notify() { - RTPSReader* reader = nullptr; if (is_local_reader()) { +<<<<<<< HEAD reader = local_reader(); } if (reader) { reader->datasharing_listener()->notify(true); +======= + LocalReaderPointer::Instance reader = local_reader(); + if (reader) + { + reader->datasharing_listener()->notify(true); + } +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } else { diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 88b8f9006c9..2254532dfc3 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -45,7 +45,15 @@ #include #include #include +<<<<<<< HEAD #include +======= +#include +#include +#include +#include +#include +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) #include #include "../builtin/discovery/database/DiscoveryDataBase.hpp" @@ -465,14 +473,23 @@ bool StatefulWriter::intraprocess_delivery( CacheChange_t* change, ReaderProxy* reader_proxy) { +<<<<<<< HEAD RTPSReader* reader = reader_proxy->local_reader(); if (reader) +======= + LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); + if (local_reader) +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) { if (change->write_params.related_sample_identity() != SampleIdentity::unknown()) { change->write_params.sample_identity(change->write_params.related_sample_identity()); } +<<<<<<< HEAD return reader->processDataMsg(change); +======= + return local_reader->process_data_msg(change); +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } return false; } @@ -482,10 +499,15 @@ bool StatefulWriter::intraprocess_gap( const SequenceNumber_t& first_seq, const SequenceNumber_t& last_seq) { - RTPSReader* reader = reader_proxy->local_reader(); - if (reader) + LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); + if (local_reader) { +<<<<<<< HEAD return reader->processGapMsg(m_guid, first_seq, SequenceNumberSet_t(last_seq), c_VendorId_eProsima); +======= + return local_reader->process_gap_msg( + m_guid, first_seq, SequenceNumberSet_t(last_seq), c_VendorId_eProsima); +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } return false; @@ -496,12 +518,11 @@ bool StatefulWriter::intraprocess_heartbeat( bool liveliness) { bool returned_value = false; + LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); - std::lock_guard guardW(mp_mutex); - RTPSReader* reader = RTPSDomainImpl::find_local_reader(reader_proxy->guid()); - - if (reader) + if (local_reader) { + std::unique_lock lockW(mp_mutex); SequenceNumber_t first_seq = get_seq_num_min(); SequenceNumber_t last_seq = get_seq_num_max(); @@ -517,10 +538,18 @@ bool StatefulWriter::intraprocess_heartbeat( if ((first_seq != c_SequenceNumber_Unknown && last_seq != c_SequenceNumber_Unknown) && (liveliness || reader_proxy->has_changes())) { +<<<<<<< HEAD incrementHBCount(); returned_value = reader->processHeartbeatMsg(m_guid, m_heartbeatCount, first_seq, last_seq, true, liveliness, c_VendorId_eProsima); +======= + increment_hb_count(); + Count_t hb_count = heartbeat_count_; + lockW.unlock(); + returned_value = local_reader->process_heartbeat_msg( + m_guid, hb_count, first_seq, last_seq, true, liveliness, c_VendorId_eProsima); +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } } diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 8c7a770b709..19bc388ae62 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -344,16 +344,20 @@ bool StatelessWriter::intraprocess_delivery( CacheChange_t* change, ReaderLocator& reader_locator) { - RTPSReader* reader = reader_locator.local_reader(); + LocalReaderPointer::Instance local_reader = reader_locator.local_reader(); - if (reader && + if (local_reader && (!reader_data_filter_ || reader_data_filter_->is_relevant(*change, reader_locator.remote_guid()))) { if (change->write_params.related_sample_identity() != SampleIdentity::unknown()) { change->write_params.sample_identity(change->write_params.related_sample_identity()); } +<<<<<<< HEAD return reader->processDataMsg(change); +======= + return local_reader->process_data_msg(change); +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } return false; @@ -929,8 +933,13 @@ bool StatelessWriter::get_connections( //! intraprocess for_matched_readers(matched_local_readers_, [&connection, &connection_list](ReaderLocator& reader) { +<<<<<<< HEAD connection.guid(fastdds::statistics::to_statistics_type(reader.local_reader()->getGuid())); connection.mode(fastdds::statistics::INTRAPROCESS); +======= + connection.guid(fastdds::statistics::to_statistics_type(reader.remote_guid())); + connection.mode(fastdds::statistics::ConnectionMode::INTRAPROCESS); +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) connection_list.push_back(connection); return false; diff --git a/src/cpp/utils/RefCountedPointer.hpp b/src/cpp/utils/RefCountedPointer.hpp new file mode 100644 index 00000000000..bb1cfa70b3e --- /dev/null +++ b/src/cpp/utils/RefCountedPointer.hpp @@ -0,0 +1,218 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file RefCountedPointer.hpp + */ + +#ifndef UTILS__REFCOUNTEDPOINTER_HPP +#define UTILS__REFCOUNTEDPOINTER_HPP + +#include +#include +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { + +/** + * @brief Class to manage a local pointer with reference counting. + * + * It is similar to std::shared_ptr, but designed for cases where + * a shared pointer cannot be used due to API restrictions. + * + * USAGE: + * - On T class: + * - Add a shared_ptr> local_ptr_ member. + * - Call local_ptr_->deactivate() before destroying T. + * + * - On classes that need to use a pointer to T: + * - Keep a copy of the shared_ptr>. + * - Whenever you need to access T: + * RefCountedPointer::Instance instance(local_ptr_) + * if (instance) + * { + * ptr->method(); + * } + */ +template +class RefCountedPointer +{ +public: + + class Instance; + + /** + * @brief Explicit constructor. + * @param ptr Pointer to manage. + * + * @pre nullptr != ptr. We must ensure that the pointer we + * are manaing is valid. + */ + explicit RefCountedPointer( + T* ptr) + : ptr_(ptr) + , is_active_(true) + , instances_(0) + { + assert(nullptr != ptr); + } + + ~RefCountedPointer() = default; + + // Non-copyable and non-movable + RefCountedPointer( + const RefCountedPointer&) = delete; + RefCountedPointer& operator =( + const RefCountedPointer&) = delete; + RefCountedPointer( + RefCountedPointer&&) = delete; + RefCountedPointer& operator =( + RefCountedPointer&&) = delete; + + /** + * @brief Class to manage the local pointer instance. + * It will increase the reference count on construction and decrease + * it on destruction. Provides a facade to access the pointee. + */ + class Instance + { + public: + + /** + * @brief Constructor. + * @param parent Shared pointer reference to its RefCountedPointer. + */ + explicit Instance( + const std::shared_ptr>& parent) + : parent_(parent) + , ptr_(parent && parent->is_active_ ? parent->ptr_ : nullptr) + { + if (parent_) + { + parent_->inc_instances(); + } + } + + /** + * @brief Destructor. + */ + ~Instance() + { + if (parent_) + { + parent_->dec_instances(); + } + } + + // Non-copyable, default movable + Instance( + const Instance&) = delete; + Instance& operator =( + const Instance&) = delete; + Instance( + Instance&&) = default; + Instance& operator =( + Instance&&) = default; + + /** + * @brief operator to check if the pointer is valid. + */ + operator bool() const + { + return nullptr != ptr_; + } + + /** + * @brief operator to call the T methods. + */ + T* operator ->() const + { + assert(nullptr != ptr_); + return ptr_; + } + + private: + + std::shared_ptr> parent_; + T* const ptr_; + }; + + /** + * @brief Ensure no more valid local pointer instances are created, and wait for current ones to die. + */ + void deactivate() + { + std::unique_lock lock(mutex_); + is_active_ = false; + cv_.wait(lock, [this]() -> bool + { + return instances_ == 0; + }); + } + +private: + + /** + * @brief Increase the reference count. + */ + void inc_instances() + { + std::unique_lock lock(mutex_); + ++instances_; + } + + /** + * @brief Decrease the reference count. + */ + void dec_instances() + { + std::unique_lock lock(mutex_); + --instances_; + if (instances_ == 0) + { + cv_.notify_one(); + } + } + + /** + * Pointer to the managed object. + */ + T* const ptr_; + + /** + * Indicates whether the pointee is still alive + * and accessing the pointer is valid. + */ + std::atomic is_active_; + + /** + * Protections for the number of instances. + */ + mutable std::mutex mutex_; + std::condition_variable cv_; + + /** + * Number of active instances (currently using the pointee). + */ + size_t instances_; +}; + +} // namespace fastdds +} // namespace eprosima + +#endif // UTILS__REFCOUNTEDPOINTER_HPP diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index a2fedada3e2..4bdcbc50bbd 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -46,6 +46,7 @@ #include "BlackboxTests.hpp" #include "mock/BlackboxMockConsumer.h" #include "../api/dds-pim/CustomPayloadPool.hpp" +#include "../api/dds-pim/PubSubParticipant.hpp" #include "../api/dds-pim/PubSubReader.hpp" #include "../api/dds-pim/PubSubWriter.hpp" #include "../api/dds-pim/PubSubWriterReader.hpp" @@ -899,6 +900,116 @@ TEST(DDSBasic, max_output_message_size_writer) } +<<<<<<< HEAD +======= +/** + * @test This test checks that it is possible to register two TypeSupport instances of the same type + * under the same DomainParticipant. + */ +TEST(DDSBasic, register_two_identical_typesupports) +{ + // Set DomainParticipantFactory to create disabled entities + DomainParticipantFactory* factory = DomainParticipantFactory::get_instance(); + ASSERT_NE(nullptr, factory); + + // Create a disabled DomainParticipant, setting it to in turn create disable entities + DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT); + ASSERT_NE(nullptr, participant); + + // Register a type support + TypeSupport type_support_1; + type_support_1.reset(new HelloWorldPubSubType()); + EXPECT_EQ(RETCODE_OK, participant->register_type(type_support_1)); + + // Register a second instance of the type support with the same TopicDataType + TypeSupport type_support_2; + type_support_2.reset(new HelloWorldPubSubType()); + EXPECT_EQ(RETCODE_OK, participant->register_type(type_support_2)); +} + +/** + * @test This is a regression test for Redmine Issue 21293. + * The destruction among intra-process participants should be correctly performed. + * local_reader() has to return a valid pointer. + * + */ +TEST(DDSBasic, successful_destruction_among_intraprocess_participants) +{ + namespace dds = eprosima::fastdds::dds; + auto factory = dds::DomainParticipantFactory::get_instance(); + + // Set intraprocess delivery to full + LibrarySettings library_settings; + factory->get_library_settings(library_settings); + auto old_library_settings = library_settings; + library_settings.intraprocess_delivery = INTRAPROCESS_FULL; + factory->set_library_settings(library_settings); + + { + auto participant_1 = std::make_shared>(1u, 1u, 1u, 1u); + + ASSERT_TRUE(participant_1->init_participant()); + participant_1->pub_topic_name(TEST_TOPIC_NAME); + ASSERT_TRUE(participant_1->init_publisher(0u)); + participant_1->sub_topic_name(TEST_TOPIC_NAME + "_Return"); + ASSERT_TRUE(participant_1->init_subscriber(0u)); + + std::vector>> reception_participants; + + size_t num_reception_participants = 50; + + for (size_t i = 0; i < num_reception_participants; i++) + { + reception_participants.push_back(std::make_shared>(1u, 1u, 1u, 1u)); + ASSERT_TRUE(reception_participants.back()->init_participant()); + reception_participants.back()->sub_topic_name(TEST_TOPIC_NAME); + ASSERT_TRUE(reception_participants.back()->init_subscriber(0u)); + reception_participants.back()->pub_topic_name(TEST_TOPIC_NAME + "_Return"); + ASSERT_TRUE(reception_participants.back()->init_publisher(0u)); + } + + participant_1->wait_discovery(std::chrono::seconds::zero(), (uint8_t)num_reception_participants, true); + + participant_1->pub_wait_discovery((unsigned int)num_reception_participants); + participant_1->sub_wait_discovery((unsigned int)num_reception_participants); + + auto data_12 = default_helloworld_data_generator(); + + std::thread p1_thread([&participant_1, &data_12]() + { + auto data_size = data_12.size(); + for (size_t i = 0; i < data_size; i++) + { + participant_1->send_sample(data_12.back()); + data_12.pop_back(); + } + }); + + std::vector reception_threads; + reception_threads.reserve(num_reception_participants); + for (auto& reception_participant : reception_participants) + { + reception_threads.emplace_back([&reception_participant]() + { + auto data_21 = default_helloworld_data_generator(); + for (auto& data : data_21) + { + reception_participant->send_sample(data); + } + + reception_participant.reset(); + }); + } + + p1_thread.join(); + for (auto& rec_thread : reception_threads) + { + rec_thread.join(); + } + } +} + +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h b/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h index dc3be493e06..48f5ef59434 100644 --- a/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h +++ b/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h @@ -31,6 +31,8 @@ #include +#include + namespace eprosima { namespace fastrtps { namespace rtps { @@ -204,9 +206,13 @@ class ReaderLocator : public RTPSMessageSenderInterface return false; } +<<<<<<< HEAD:test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h RTPSReader* local_reader() +======= + LocalReaderPointer::Instance local_reader() +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):test/mock/rtps/ReaderLocator/rtps/writer/ReaderLocator.hpp { - return nullptr; + return LocalReaderPointer::Instance(std::shared_ptr()); } bool is_datasharing_reader() const diff --git a/test/unittest/utils/CMakeLists.txt b/test/unittest/utils/CMakeLists.txt index f528cf6d0f7..a5f8c20e938 100644 --- a/test/unittest/utils/CMakeLists.txt +++ b/test/unittest/utils/CMakeLists.txt @@ -80,6 +80,15 @@ set(SYSTEMINFOTESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp) +<<<<<<< HEAD +======= +set(TREETESTS_SOURCE + TreeNodeTests.cpp) + +set(REF_COUNTED_POINTER_TESTS_SOURCE + RefCountedPointerTests.cpp) + +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) include_directories(mock/) add_executable(StringMatchingTests ${STRINGMATCHINGTESTS_SOURCE}) @@ -179,6 +188,19 @@ target_include_directories(SharedMutexTests PUBLIC ${PROJECT_SOURCE_DIR}/include target_link_libraries(SharedMutexTests PUBLIC GTest::gtest) gtest_discover_tests(SharedMutexTests) +<<<<<<< HEAD +======= +add_executable(TreeNodeTests ${TREETESTS_SOURCE}) +target_include_directories(TreeNodeTests PRIVATE ${PROJECT_SOURCE_DIR}/src/cpp) +target_link_libraries(TreeNodeTests PUBLIC GTest::gtest) +gtest_discover_tests(TreeNodeTests) + +add_executable(RefCountedPointerTests ${REF_COUNTED_POINTER_TESTS_SOURCE}) +target_include_directories(RefCountedPointerTests PRIVATE ${PROJECT_SOURCE_DIR}/src/cpp) +target_link_libraries(RefCountedPointerTests PUBLIC GTest::gtest) +gtest_discover_tests(RefCountedPointerTests) + +>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) ############################################################################### # Necessary files ############################################################################### diff --git a/test/unittest/utils/RefCountedPointerTests.cpp b/test/unittest/utils/RefCountedPointerTests.cpp new file mode 100644 index 00000000000..a939cc7b414 --- /dev/null +++ b/test/unittest/utils/RefCountedPointerTests.cpp @@ -0,0 +1,183 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include + +using namespace std; + +namespace eprosima { +namespace fastdds { + +struct EntityMock +{ + EntityMock() + : local_pointer(std::make_shared>(this)) + , n_times_data_processed(0) + { + } + + std::shared_ptr> get_refcounter_pointer() const + { + return local_pointer; + } + + void dummy_process_data( + void*) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ++n_times_data_processed; + } + + void destroy() + { + local_pointer->deactivate(); + } + + std::shared_ptr> local_pointer; + std::atomic n_times_data_processed; +}; + +enum class RoutineStatus +{ + NON_INITIALIZED, + SUCCESS, + FAILURE +}; + +struct EntityOwner +{ + EntityOwner( + const EntityMock& entity) + : entity_ptr(entity.get_refcounter_pointer()) + , routine_status(RoutineStatus::NON_INITIALIZED) + { + } + + void spawn_routine() + { + th = std::thread([&]() + { + RefCountedPointer::Instance entity_instance(entity_ptr); + if (entity_instance) + { + entity_instance->dummy_process_data(nullptr); + routine_status = RoutineStatus::SUCCESS; + } + else + { + routine_status = RoutineStatus::FAILURE; + } + }); + } + + void join() + { + th.join(); + } + + std::shared_ptr> entity_ptr; + RoutineStatus routine_status; + std::thread th; +}; + +class RefCountedPointerTests : public ::testing::Test +{ +public: + + static constexpr std::size_t n_owners = 5; + + void SetUp() override + { + owners_.reserve(5); + for (std::size_t i = 0; i < n_owners; ++i) + { + owners_.emplace_back(entity_); + } + } + + void TearDown() override + { + for (std::size_t i = 0; i < n_owners; ++i) + { + owners_[i].join(); + } + } + +protected: + + EntityMock entity_; + std::vector owners_; +}; + +TEST_F(RefCountedPointerTests, refcountedpointer_inactive) +{ + // Make the first owner spawn a routine + owners_[0].spawn_routine(); + + // Wait for the routine to finish + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ASSERT_EQ(owners_[0].routine_status, RoutineStatus::SUCCESS); + + // Destroy the entity + entity_.destroy(); + + // Make the rest of the owners spawn a routine + for (std::size_t i = 1; i < n_owners; ++i) + { + owners_[i].spawn_routine(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // The routine should fail + ASSERT_EQ(owners_[i].routine_status, RoutineStatus::FAILURE); + } + + // The entity should have been processed only once + ASSERT_EQ(1, entity_.n_times_data_processed); +} + +TEST_F(RefCountedPointerTests, refcounterpointer_deactivate_waits_for_no_references) +{ + // Spawn some routines + for (std::size_t i = 0; i < n_owners; ++i) + { + owners_[i].spawn_routine(); + } + + // Ensure owners' routines have started + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + auto t0 = std::chrono::steady_clock::now(); + entity_.destroy(); + auto elapsed = std::chrono::duration_cast(std::chrono::steady_clock::now() - t0).count(); + + std::cout << "Elapsed time: " << elapsed << " ms" << std::endl; + ASSERT_GT(elapsed, 50); // destroy should have taken at least 50 ms. Being strict it should be 80, but we allow some margin + ASSERT_EQ(entity_.n_times_data_processed, 5); +} + +} // namespace fastdds +} // namespace eprosima + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 8560483c4794c124240321300ff8dcd658df07d1 Mon Sep 17 00:00:00 2001 From: Mario Dominguez Date: Fri, 15 Nov 2024 10:47:42 +0100 Subject: [PATCH 2/5] Solve conflicts Signed-off-by: Mario Dominguez --- .../rtps/reader/LocalReaderPointer.hpp | 9 +- include/fastdds/rtps/reader/RTPSReader.h | 18 + include/fastdds/rtps/writer/ReaderLocator.h | 18 +- include/fastdds/rtps/writer/ReaderProxy.h | 4 - .../fastrtps}/utils/RefCountedPointer.hpp | 2 +- src/cpp/rtps/RTPSDomain.cpp | 18 +- src/cpp/rtps/RTPSDomainImpl.hpp | 12 +- .../rtps/participant/RTPSParticipantImpl.cpp | 28 +- .../rtps/participant/RTPSParticipantImpl.h | 10 +- src/cpp/rtps/reader/BaseReader.cpp | 548 ------------------ src/cpp/rtps/reader/BaseReader.hpp | 506 ---------------- src/cpp/rtps/reader/RTPSReader.cpp | 28 +- src/cpp/rtps/writer/ReaderLocator.cpp | 13 - src/cpp/rtps/writer/StatefulWriter.cpp | 37 +- src/cpp/rtps/writer/StatelessWriter.cpp | 11 +- .../blackbox/common/DDSBlackboxTestsBasic.cpp | 42 +- .../fastdds/rtps/reader/RTPSReader.h | 4 + .../fastdds/rtps/writer/ReaderLocator.h | 14 +- test/unittest/utils/CMakeLists.txt | 17 +- .../unittest/utils/RefCountedPointerTests.cpp | 4 +- 20 files changed, 83 insertions(+), 1260 deletions(-) rename {src/cpp => include/fastdds}/rtps/reader/LocalReaderPointer.hpp (86%) rename {src/cpp => include/fastrtps}/utils/RefCountedPointer.hpp (99%) delete mode 100644 src/cpp/rtps/reader/BaseReader.cpp delete mode 100644 src/cpp/rtps/reader/BaseReader.hpp diff --git a/src/cpp/rtps/reader/LocalReaderPointer.hpp b/include/fastdds/rtps/reader/LocalReaderPointer.hpp similarity index 86% rename from src/cpp/rtps/reader/LocalReaderPointer.hpp rename to include/fastdds/rtps/reader/LocalReaderPointer.hpp index 7e16177907d..63735de0259 100644 --- a/src/cpp/rtps/reader/LocalReaderPointer.hpp +++ b/include/fastdds/rtps/reader/LocalReaderPointer.hpp @@ -19,15 +19,16 @@ #ifndef FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP #define FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP -#include +#include namespace eprosima { -namespace fastdds { + +namespace fastrtps { namespace rtps { -class BaseReader; +class RTPSReader; -using LocalReaderPointer = RefCountedPointer; +using LocalReaderPointer = RefCountedPointer; } // namespace rtps } // namespace fastdds diff --git a/include/fastdds/rtps/reader/RTPSReader.h b/include/fastdds/rtps/reader/RTPSReader.h index 6db4528e617..8b9a8c4ac17 100644 --- a/include/fastdds/rtps/reader/RTPSReader.h +++ b/include/fastdds/rtps/reader/RTPSReader.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -175,6 +176,11 @@ class RTPSReader const SequenceNumberSet_t& gapList, fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0; + /** + * @brief Waits for not being referenced/used by any other entity. + */ + virtual void local_actions_on_reader_removed(); + /** * Method to indicate the reader that some change has been removed due to HistoryQos requirements. * @param change Pointer to the CacheChange_t. @@ -202,6 +208,14 @@ class RTPSReader RTPS_DllAPI bool setListener( ReaderListener* target); + /** + * @brief Retrieves the local pointer to this reader + * to be used by other local entities. + * + * @return Local pointer to this reader. + */ + RTPS_DllAPI std::shared_ptr get_local_pointer(); + /** * Reserve a CacheChange_t. * @param change Pointer to pointer to the Cache. @@ -495,6 +509,10 @@ class RTPSReader bool m_acceptMessagesFromUnkownWriters; //!Trusted writer (for Builtin) EntityId_t m_trustedWriterEntityId; + + /// RefCountedPointer of this instance. + std::shared_ptr local_ptr_; + //!Expects Inline Qos. bool m_expectsInlineQos; diff --git a/include/fastdds/rtps/writer/ReaderLocator.h b/include/fastdds/rtps/writer/ReaderLocator.h index a78ec768789..073d370b098 100644 --- a/include/fastdds/rtps/writer/ReaderLocator.h +++ b/include/fastdds/rtps/writer/ReaderLocator.h @@ -21,13 +21,12 @@ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #include -#include #include +#include +#include #include #include -#include - -#include +#include namespace eprosima { namespace fastrtps { @@ -71,17 +70,10 @@ class ReaderLocator : public RTPSMessageSenderInterface return is_local_reader_; } -<<<<<<< HEAD:include/fastdds/rtps/writer/ReaderLocator.h - RTPSReader* local_reader(); - - void local_reader( - RTPSReader* local_reader) -======= LocalReaderPointer::Instance local_reader(); void local_reader( std::shared_ptr local_reader) ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/writer/ReaderLocator.hpp { local_reader_ = local_reader; } @@ -269,11 +261,7 @@ class ReaderLocator : public RTPSMessageSenderInterface LocatorSelectorEntry async_locator_info_; bool expects_inline_qos_; bool is_local_reader_; -<<<<<<< HEAD:include/fastdds/rtps/writer/ReaderLocator.h - RTPSReader* local_reader_; -======= std::shared_ptr local_reader_; ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/writer/ReaderLocator.hpp std::vector guid_prefix_as_vector_; std::vector guid_as_vector_; IDataSharingNotifier* datasharing_notifier_; diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index a53ffb0be76..375493fa7f2 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -294,11 +294,7 @@ class ReaderProxy * Get the local reader on the same process (if any). * @return The local reader on the same process. */ -<<<<<<< HEAD:include/fastdds/rtps/writer/ReaderProxy.h - inline RTPSReader* local_reader() -======= inline LocalReaderPointer::Instance local_reader() ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/writer/ReaderProxy.hpp { return locator_info_.local_reader(); } diff --git a/src/cpp/utils/RefCountedPointer.hpp b/include/fastrtps/utils/RefCountedPointer.hpp similarity index 99% rename from src/cpp/utils/RefCountedPointer.hpp rename to include/fastrtps/utils/RefCountedPointer.hpp index bb1cfa70b3e..2044cce22f6 100644 --- a/src/cpp/utils/RefCountedPointer.hpp +++ b/include/fastrtps/utils/RefCountedPointer.hpp @@ -27,7 +27,7 @@ #include namespace eprosima { -namespace fastdds { +namespace fastrtps { /** * @brief Class to manage a local pointer with reference counting. diff --git a/src/cpp/rtps/RTPSDomain.cpp b/src/cpp/rtps/RTPSDomain.cpp index 967a722bee2..7dd46e510cf 100644 --- a/src/cpp/rtps/RTPSDomain.cpp +++ b/src/cpp/rtps/RTPSDomain.cpp @@ -29,22 +29,10 @@ #include #include #include +#include #include #include -<<<<<<< HEAD -======= -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) #include #include #include @@ -790,11 +778,7 @@ RTPSParticipantImpl* RTPSDomainImpl::find_local_participant( return nullptr; } -<<<<<<< HEAD -RTPSReader* RTPSDomainImpl::find_local_reader( -======= std::shared_ptr RTPSDomainImpl::find_local_reader( ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) const GUID_t& reader_guid) { auto instance = get_instance(); diff --git a/src/cpp/rtps/RTPSDomainImpl.hpp b/src/cpp/rtps/RTPSDomainImpl.hpp index 9b76dd45c70..83673ec0554 100644 --- a/src/cpp/rtps/RTPSDomainImpl.hpp +++ b/src/cpp/rtps/RTPSDomainImpl.hpp @@ -25,17 +25,11 @@ #endif // defined(_WIN32) || defined(__unix__) #include +#include #include #include #include -<<<<<<< HEAD -======= -#include -#include -#include -#include ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) #include #include @@ -180,11 +174,7 @@ class RTPSDomainImpl * * @returns A pointer to a local reader given its endpoint guid, or nullptr if not found. */ -<<<<<<< HEAD - static RTPSReader* find_local_reader( -======= static std::shared_ptr find_local_reader( ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) const GUID_t& reader_guid); /** diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index f93b7d7a736..71a18a22198 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -27,29 +27,30 @@ #include #include #include +#include #include #include #include -#include #include #include #include #include #include #include -#include +#include +#include #include #include -#include +#include #include -#include +#include #include #include -#include -#include +#include +#include #include #include -#include +#include #include #include @@ -1452,11 +1453,7 @@ bool RTPSParticipantImpl::createReader( return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback); } -<<<<<<< HEAD -RTPSReader* RTPSParticipantImpl::find_local_reader( -======= std::shared_ptr RTPSParticipantImpl::find_local_reader( ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) const GUID_t& reader_guid) { shared_lock _(endpoints_list_mutex); @@ -2103,7 +2100,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint( bool found = false, found_in_users = false; Endpoint* p_endpoint = nullptr; - BaseReader* reader = nullptr; + RTPSReader* reader = nullptr; if (endpoint.entityId.is_writer()) { @@ -2301,7 +2298,7 @@ void RTPSParticipantImpl::deleteAllUserEndpoints() if (kind == READER) { - static_cast(endpoint)->local_actions_on_reader_removed(); + static_cast(endpoint)->local_actions_on_reader_removed(); } // remove the endpoints @@ -3004,16 +3001,11 @@ bool RTPSParticipantImpl::register_in_reader( } else if (!fastdds::statistics::is_statistics_builtin(reader_guid.entityId)) { -<<<<<<< HEAD - RTPSReader* reader = find_local_reader(reader_guid); - res = reader->add_statistics_listener(listener); -======= LocalReaderPointer::Instance local_reader(find_local_reader(reader_guid)); if (local_reader) { res = local_reader->add_statistics_listener(listener); } ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } return res; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 8eb51f1a9da..999e9d00f1e 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -45,6 +45,7 @@ #include #include #include +#include #include #include #include @@ -55,11 +56,6 @@ #include #include #include -<<<<<<< HEAD:src/cpp/rtps/participant/RTPSParticipantImpl.h -======= -#include -#include ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/participant/RTPSParticipantImpl.hpp #include #include #include @@ -484,11 +480,7 @@ class RTPSParticipantImpl /*** * @returns A pointer to a local reader given its endpoint guid, or nullptr if not found. */ -<<<<<<< HEAD:src/cpp/rtps/participant/RTPSParticipantImpl.h - RTPSReader* find_local_reader( -======= std::shared_ptr find_local_reader( ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/participant/RTPSParticipantImpl.hpp const GUID_t& reader_guid); /*** diff --git a/src/cpp/rtps/reader/BaseReader.cpp b/src/cpp/rtps/reader/BaseReader.cpp deleted file mode 100644 index 4beb9f2e807..00000000000 --- a/src/cpp/rtps/reader/BaseReader.cpp +++ /dev/null @@ -1,548 +0,0 @@ -// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/* - * BaseReader.cpp - */ - -#include - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace eprosima { -namespace fastdds { -namespace rtps { - -BaseReader::BaseReader( - fastdds::rtps::RTPSParticipantImpl* pimpl, - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::ReaderAttributes& att, - fastdds::rtps::ReaderHistory* hist, - fastdds::rtps::ReaderListener* listen) - : fastdds::rtps::RTPSReader(pimpl, guid, att, hist) - , listener_(listen) - , accept_messages_from_unkown_writers_(att.accept_messages_from_unkown_writers) - , expects_inline_qos_(att.expects_inline_qos) - , history_state_(new fastdds::rtps::ReaderHistoryState(att.matched_writers_allocation.initial)) - , liveliness_kind_(att.liveliness_kind) - , liveliness_lease_duration_(att.liveliness_lease_duration) -{ - PoolConfig cfg = PoolConfig::from_history_attributes(hist->m_att); - std::shared_ptr change_pool; - std::shared_ptr payload_pool; - payload_pool = BasicPayloadPool::get(cfg, change_pool); - - init(payload_pool, change_pool); - setup_datasharing(att); -} - -BaseReader::BaseReader( - fastdds::rtps::RTPSParticipantImpl* pimpl, - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::ReaderAttributes& att, - const std::shared_ptr& payload_pool, - fastdds::rtps::ReaderHistory* hist, - fastdds::rtps::ReaderListener* listen) - : BaseReader( - pimpl, guid, att, payload_pool, - std::make_shared(PoolConfig::from_history_attributes(hist->m_att)), - hist, listen) -{ -} - -BaseReader::BaseReader( - fastdds::rtps::RTPSParticipantImpl* pimpl, - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::ReaderAttributes& att, - const std::shared_ptr& payload_pool, - const std::shared_ptr& change_pool, - fastdds::rtps::ReaderHistory* hist, - fastdds::rtps::ReaderListener* listen) - : fastdds::rtps::RTPSReader(pimpl, guid, att, hist) - , listener_(listen) - , accept_messages_from_unkown_writers_(att.accept_messages_from_unkown_writers) - , expects_inline_qos_(att.expects_inline_qos) - , history_state_(new fastdds::rtps::ReaderHistoryState(att.matched_writers_allocation.initial)) - , liveliness_kind_(att.liveliness_kind) - , liveliness_lease_duration_(att.liveliness_lease_duration) -{ - init(payload_pool, change_pool); - setup_datasharing(att); -} - -void BaseReader::local_actions_on_reader_removed() -{ - local_ptr_->deactivate(); -} - -BaseReader::~BaseReader() -{ - EPROSIMA_LOG_INFO(RTPS_READER, "Removing reader " << this->getGuid().entityId); - - for (auto it = history_->changesBegin(); it != history_->changesEnd(); ++it) - { - release_cache(*it); - } - - delete history_state_; - - // As releasing the change pool will delete the cache changes it owns, - // the payload pool may be called to release their payloads, so we should - // ensure that the payload pool is destroyed after the change pool. - change_pool_.reset(); - payload_pool_.reset(); -} - -bool BaseReader::matched_writer_add( - const PublicationBuiltinTopicData& info) -{ - const auto& alloc = mp_RTPSParticipant->get_attributes().allocation; - WriterProxyData wdata( - alloc.locators.max_unicast_locators, - alloc.locators.max_multicast_locators, - alloc.data_limits); - - from_builtin_to_proxy(info, wdata); - return matched_writer_add_edp(wdata); -} - -ReaderListener* BaseReader::get_listener() const -{ - std::lock_guard lock(mp_mutex); - return listener_; -} - -void BaseReader::set_listener( - ReaderListener* target) -{ - std::lock_guard lock(mp_mutex); - listener_ = target; -} - -bool BaseReader::expects_inline_qos() const -{ - return expects_inline_qos_; -} - -ReaderHistory* BaseReader::get_history() const -{ - return history_; -} - -//! @return The content filter associated to this reader. -IReaderDataFilter* BaseReader::get_content_filter() const -{ - std::lock_guard lock(mp_mutex); - return data_filter_; -} - -//! Set the content filter associated to this reader. -//! @param filter Pointer to the content filter to associate to this reader. -void BaseReader::set_content_filter( - IReaderDataFilter* filter) -{ - std::lock_guard lock(mp_mutex); - data_filter_ = filter; -} - -uint64_t BaseReader::get_unread_count() const -{ - std::lock_guard lock(mp_mutex); - return total_unread_; -} - -uint64_t BaseReader::get_unread_count( - bool mark_as_read) -{ - std::lock_guard lock(mp_mutex); - uint64_t ret_val = total_unread_; - - if (mark_as_read) - { - for (auto it = history_->changesBegin(); 0 < total_unread_ && it != history_->changesEnd(); ++it) - { - fastdds::rtps::CacheChange_t* change = *it; - if (!change->isRead && get_last_notified(change->writerGUID) >= change->sequenceNumber) - { - change->isRead = true; - assert(0 < total_unread_); - --total_unread_; - } - } - assert(0 == total_unread_); - } - return ret_val; -} - -bool BaseReader::wait_for_unread_cache( - const eprosima::fastdds::dds::Duration_t& timeout) -{ - auto time_out = std::chrono::steady_clock::now() + std::chrono::seconds(timeout.seconds) + - std::chrono::nanoseconds(timeout.nanosec); - -#if HAVE_STRICT_REALTIME - std::unique_lock lock(mp_mutex, std::defer_lock); - if (lock.try_lock_until(time_out)) -#else - std::unique_lock lock(mp_mutex); -#endif // HAVE_STRICT_REALTIME - { - if (new_notification_cv_.wait_until( - lock, time_out, - [&]() - { - return total_unread_ > 0; - })) - { - return true; - } - } - - return false; -} - -bool BaseReader::is_sample_valid( - const void* data, - const fastdds::rtps::GUID_t& writer, - const fastdds::rtps::SequenceNumber_t& sn) const -{ - if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched(writer)) - { - // Check if the payload is dirty - // Note the Payloads used in loans include a mandatory RTPS 2.3 extra header - auto payload = static_cast(data); - payload -= fastdds::rtps::SerializedPayload_t::representation_header_size; - if (!fastdds::rtps::DataSharingPayloadPool::check_sequence_number(payload, sn)) - { - return false; - } - } - return true; -} - -BaseReader* BaseReader::downcast( - fastdds::rtps::RTPSReader* reader) -{ - assert(nullptr != dynamic_cast(reader)); - return static_cast(reader); -} - -BaseReader* BaseReader::downcast( - fastdds::rtps::Endpoint* endpoint) -{ - assert(nullptr != dynamic_cast(endpoint)); - return static_cast(endpoint); -} - -void BaseReader::allow_unknown_writers() -{ - assert(fastdds::rtps::EntityId_t::unknown() != trusted_writer_entity_id_); - accept_messages_from_unkown_writers_ = true; -} - -std::shared_ptr BaseReader::get_local_pointer() -{ - return local_ptr_; -} - -bool BaseReader::reserve_cache( - uint32_t cdr_payload_size, - fastdds::rtps::CacheChange_t*& change) -{ - std::lock_guard guard(mp_mutex); - - change = nullptr; - - fastdds::rtps::CacheChange_t* reserved_change = nullptr; - if (!change_pool_->reserve_cache(reserved_change)) - { - EPROSIMA_LOG_WARNING(RTPS_READER, "Problem reserving cache from pool"); - return false; - } - - uint32_t payload_size = fixed_payload_size_ ? fixed_payload_size_ : cdr_payload_size; - if (!payload_pool_->get_payload(payload_size, reserved_change->serializedPayload)) - { - change_pool_->release_cache(reserved_change); - EPROSIMA_LOG_WARNING(RTPS_READER, "Problem reserving payload from pool"); - return false; - } - - change = reserved_change; - return true; -} - -void BaseReader::release_cache( - fastdds::rtps::CacheChange_t* change) -{ - std::lock_guard guard(mp_mutex); - - fastdds::rtps::IPayloadPool* pool = change->serializedPayload.payload_owner; - if (pool) - { - pool->release_payload(change->serializedPayload); - } - change_pool_->release_cache(change); -} - -void BaseReader::update_liveliness_changed_status( - const fastdds::rtps::GUID_t& writer, - int32_t alive_change, - int32_t not_alive_change) -{ - std::lock_guard lock(mp_mutex); - - liveliness_changed_status_.alive_count += alive_change; - liveliness_changed_status_.alive_count_change += alive_change; - liveliness_changed_status_.not_alive_count += not_alive_change; - liveliness_changed_status_.not_alive_count_change += not_alive_change; - liveliness_changed_status_.last_publication_handle = writer; - - if (nullptr != listener_) - { - listener_->on_liveliness_changed(this, liveliness_changed_status_); - - liveliness_changed_status_.alive_count_change = 0; - liveliness_changed_status_.not_alive_count_change = 0; - } -} - -#ifdef FASTDDS_STATISTICS - -bool BaseReader::add_statistics_listener( - std::shared_ptr listener) -{ - return add_statistics_listener_impl(listener); -} - -bool BaseReader::remove_statistics_listener( - std::shared_ptr listener) -{ - return remove_statistics_listener_impl(listener); -} - -void BaseReader::set_enabled_statistics_writers_mask( - uint32_t enabled_writers) -{ - set_enabled_statistics_writers_mask_impl(enabled_writers); -} - -#endif // FASTDDS_STATISTICS - -bool BaseReader::may_remove_history_record( - bool removed_by_lease) -{ - return !removed_by_lease; -} - -void BaseReader::add_persistence_guid( - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::GUID_t& persistence_guid) -{ - if (fastdds::rtps::c_Guid_Unknown == persistence_guid || persistence_guid == guid) - { - std::lock_guard guard(mp_mutex); - history_state_->persistence_guid_map[guid] = guid; - history_state_->persistence_guid_count[guid]++; - } - else - { - std::lock_guard guard(mp_mutex); - history_state_->persistence_guid_map[guid] = persistence_guid; - history_state_->persistence_guid_count[persistence_guid]++; - - // Could happen that a value has already been stored in the record with the guid and not the persistence guid - // This is because received_change is called before Proxy is created - // In this case, we substitute the guid for the persistence (in case they are not equal) - auto spourious_record = history_state_->history_record.find(guid); - if (spourious_record != history_state_->history_record.end()) - { - EPROSIMA_LOG_INFO(RTPS_READER, "Sporious record found, changing guid " - << guid << " for persistence guid " << persistence_guid); - update_last_notified(guid, spourious_record->second); - history_state_->history_record.erase(spourious_record); - } - } -} - -void BaseReader::remove_persistence_guid( - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::GUID_t& persistence_guid, - bool removed_by_lease) -{ - std::lock_guard guard(mp_mutex); - auto persistence_guid_stored = (fastdds::rtps::c_Guid_Unknown == persistence_guid) ? guid : persistence_guid; - history_state_->persistence_guid_map.erase(guid); - auto count = --history_state_->persistence_guid_count[persistence_guid_stored]; - if (count <= 0 && may_remove_history_record(removed_by_lease)) - { - history_state_->history_record.erase(persistence_guid_stored); - history_state_->persistence_guid_count.erase(persistence_guid_stored); - } -} - -fastdds::rtps::SequenceNumber_t BaseReader::get_last_notified( - const fastdds::rtps::GUID_t& guid) -{ - fastdds::rtps::SequenceNumber_t ret_val; - std::lock_guard guard(mp_mutex); - fastdds::rtps::GUID_t guid_to_look = guid; - auto p_guid = history_state_->persistence_guid_map.find(guid); - if (p_guid != history_state_->persistence_guid_map.end()) - { - guid_to_look = p_guid->second; - } - - auto p_seq = history_state_->history_record.find(guid_to_look); - if (p_seq != history_state_->history_record.end()) - { - ret_val = p_seq->second; - } - - return ret_val; -} - -fastdds::rtps::SequenceNumber_t BaseReader::update_last_notified( - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::SequenceNumber_t& seq) -{ - fastdds::rtps::SequenceNumber_t ret_val; - std::lock_guard guard(mp_mutex); - fastdds::rtps::GUID_t guid_to_look = guid; - auto p_guid = history_state_->persistence_guid_map.find(guid); - if (p_guid != history_state_->persistence_guid_map.end()) - { - guid_to_look = p_guid->second; - } - - auto p_seq = history_state_->history_record.find(guid_to_look); - if (p_seq != history_state_->history_record.end()) - { - ret_val = p_seq->second; - } - - if (ret_val < seq) - { - history_state_->history_record[guid_to_look] = seq; - persist_last_notified_nts(guid_to_look, seq); - new_notification_cv_.notify_all(); - } - - return ret_val; -} - -void BaseReader::persist_last_notified_nts( - const fastdds::rtps::GUID_t& peristence_guid, - const fastdds::rtps::SequenceNumber_t& seq) -{ - // Empty base implementation since base behavior is to not persist data - static_cast(peristence_guid); - static_cast(seq); -} - -bool BaseReader::is_datasharing_compatible_with( - const fastdds::rtps::WriterProxyData& wdata) -{ - if (!is_datasharing_compatible_ || - wdata.m_qos.data_sharing.kind() == fastdds::dds::DataSharingKind::OFF) - { - return false; - } - - for (auto id : wdata.m_qos.data_sharing.domain_ids()) - { - if (std::find(m_att.data_sharing_configuration().domain_ids().begin(), - m_att.data_sharing_configuration().domain_ids().end(), id) - != m_att.data_sharing_configuration().domain_ids().end()) - { - return true; - } - } - - return false; -} - -void BaseReader::init( - const std::shared_ptr& payload_pool, - const std::shared_ptr& change_pool) -{ - payload_pool_ = payload_pool; - change_pool_ = change_pool; - fixed_payload_size_ = 0; - if (history_->m_att.memoryPolicy == PREALLOCATED_MEMORY_MODE) - { - fixed_payload_size_ = history_->m_att.payloadMaxSize; - } - - local_ptr_ = std::make_shared(this); - - EPROSIMA_LOG_INFO(RTPS_READER, "RTPSReader created correctly"); -} - -void BaseReader::setup_datasharing( - const fastdds::rtps::ReaderAttributes& att) -{ - - if (att.endpoint.data_sharing_configuration().kind() != fastdds::dds::DataSharingKind::OFF) - { - using std::placeholders::_1; - std::shared_ptr notification = DataSharingNotification::create_notification( - getGuid(), att.endpoint.data_sharing_configuration().shm_directory()); - if (notification) - { - is_datasharing_compatible_ = true; - datasharing_listener_.reset(new DataSharingListener( - notification, - att.endpoint.data_sharing_configuration().shm_directory(), - att.data_sharing_listener_thread, - att.matched_writers_allocation, - this)); - - // We can start the listener here, as no writer can be matched already, - // so no notification will occur until the non-virtual instance is constructed. - // But we need to stop the listener in the non-virtual instance destructor. - datasharing_listener_->start(); - } - } -} - -} // namespace rtps -} // namespace fastdds -} // namespace eprosima diff --git a/src/cpp/rtps/reader/BaseReader.hpp b/src/cpp/rtps/reader/BaseReader.hpp deleted file mode 100644 index 596e335ce53..00000000000 --- a/src/cpp/rtps/reader/BaseReader.hpp +++ /dev/null @@ -1,506 +0,0 @@ -// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file BaseReader.hpp - */ - -#ifndef FASTDDS_RTPS_READER__BASEREADER_HPP -#define FASTDDS_RTPS_READER__BASEREADER_HPP - -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -namespace eprosima { - -namespace fastdds { -namespace rtps { - -struct CacheChange_t; -class IDataSharingListener; -struct ReaderHistoryState; -class ReaderListener; -class RTPSParticipantImpl; -class WriterProxy; -class WriterProxyData; - -class BaseReader - : public fastdds::rtps::RTPSReader - , public fastdds::statistics::StatisticsReaderImpl -{ - -public: - - bool matched_writer_add( - const PublicationBuiltinTopicData& info) final; - - /** - * @brief Add a matched writer represented by its attributes. - * - * @param wdata Discovery information regarding the writer to add. - * - * @return True if correctly added. - */ - virtual bool matched_writer_add_edp( - const WriterProxyData& wdata) = 0; - - fastdds::rtps::ReaderListener* get_listener() const override; - - void set_listener( - fastdds::rtps::ReaderListener* listener) override; - - bool expects_inline_qos() const override; - - fastdds::rtps::ReaderHistory* get_history() const override; - - IReaderDataFilter* get_content_filter() const override; - - void set_content_filter( - IReaderDataFilter* filter) override; - - uint64_t get_unread_count() const override; - - uint64_t get_unread_count( - bool mark_as_read) override; - - bool wait_for_unread_cache( - const fastdds::dds::Duration_t& timeout) override; - - bool is_sample_valid( - const void* data, - const fastdds::rtps::GUID_t& writer, - const fastdds::rtps::SequenceNumber_t& sn) const override; - - /** - * @brief Get a pointer to a BaseReader object from a RTPSReader pointer. - * - * @param reader Pointer to the RTPSReader object. - * - * @return Pointer to the BaseReader object. - */ - static BaseReader* downcast( - fastdds::rtps::RTPSReader* reader); - - /** - * @brief Get a pointer to a BaseReader object from a Endpoint pointer. - * - * @param endpoint Pointer to the Endpoint object. - * - * @return Pointer to the BaseReader object. - */ - static BaseReader* downcast( - fastdds::rtps::Endpoint* endpoint); - - /** - * @brief Set the entity ID of the trusted writer. - * - * @param writer Entity ID of the trusted writer. - */ - void set_trusted_writer( - const fastdds::rtps::EntityId_t& writer) - { - accept_messages_from_unkown_writers_ = false; - trusted_writer_entity_id_ = writer; - } - - /** - * @brief Allow reception ALIVE changes from non-matched writers. - */ - void allow_unknown_writers(); - - /** - * @return The liveliness kind of this reader - */ - fastdds::dds::LivelinessQosPolicyKind liveliness_kind() const - { - return liveliness_kind_; - } - - /** - * @return The liveliness lease duration of this reader - */ - fastdds::dds::Duration_t liveliness_lease_duration() const - { - return liveliness_lease_duration_; - } - - /** - * @return the datasharing listener associated with this reader. - */ - const std::unique_ptr& datasharing_listener() const - { - return datasharing_listener_; - } - - /** - * @brief Retrieves the local pointer to this reader - * to be used by other local entities. - * - * @return Local pointer to this reader. - */ - std::shared_ptr get_local_pointer(); - - /** - * @brief Reserve a CacheChange_t. - * - * @param [in] cdr_payload_size Size of the received payload. - * @param [out] change Pointer to the reserved change. - * - * @return True if correctly reserved. - */ - bool reserve_cache( - uint32_t cdr_payload_size, - fastdds::rtps::CacheChange_t*& change); - - /** - * @brief Release a CacheChange_t. - * - * @param change Pointer to the change to release. - */ - void release_cache( - fastdds::rtps::CacheChange_t* change); - - /** - * @brief Method to notify the reader that a change has been removed from its history. - * - * @param change Pointer to the CacheChange_t that was removed from the history. - * - * @return True if correctly removed. - */ - virtual bool change_removed_by_history( - fastdds::rtps::CacheChange_t* change) = 0; - - /** - * @brief Called just before a change is going to be deserialized. - * - * @param [in] change Pointer to the change being accessed. - * @param [out] writer Writer proxy the @c change belongs to. - * @param [out] is_future_change Whether the change is in the future (i.e. there are - * earlier unreceived changes from the same writer). - * - * @return Whether the change is still valid or not. - */ - virtual bool begin_sample_access_nts( - fastdds::rtps::CacheChange_t* change, - fastdds::rtps::WriterProxy*& writer, - bool& is_future_change) = 0; - - /** - * @brief Called after the change has been deserialized. - * - * @param [in] change Pointer to the change being accessed. - * @param [in] writer Writer proxy the @c change belongs to. - * @param [in] mark_as_read Whether the @c change should be marked as read or not. - */ - virtual void end_sample_access_nts( - fastdds::rtps::CacheChange_t* change, - fastdds::rtps::WriterProxy*& writer, - bool mark_as_read) = 0; - - /** - * @brief A method to update the liveliness changed status of the reader - * - * @param writer The writer changing liveliness, specified by its guid - * @param alive_change The change requested for alive count. Should be -1, 0 or +1 - * @param not_alive_change The change requested for not alive count. Should be -1, 0 or +1 - */ - void update_liveliness_changed_status( - const fastdds::rtps::GUID_t& writer, - int32_t alive_change, - int32_t not_alive_change); - - /** - * @brief Process an incoming DATA message. - * - * @param change Pointer to the incoming CacheChange_t. - * - * @return true if the reader accepts message. - */ - virtual bool process_data_msg( - fastdds::rtps::CacheChange_t* change) = 0; - - /** - * @brief Process an incoming DATA_FRAG message. - * - * @param change Pointer to the incoming CacheChange_t. - * @param sampleSize Size of the complete, assembled message. - * @param fragmentStartingNum Starting number of this particular message. - * @param fragmentsInSubmessage Number of fragments on this particular message. - * - * @return true if the reader accepts message. - */ - virtual bool process_data_frag_msg( - fastdds::rtps::CacheChange_t* change, - uint32_t sampleSize, - uint32_t fragmentStartingNum, - uint16_t fragmentsInSubmessage) = 0; - - /** - * @brief Process an incoming HEARTBEAT message. - * - * @param writerGUID - * @param hbCount - * @param firstSN - * @param lastSN - * @param finalFlag - * @param livelinessFlag - * @param origin_vendor_id - * - * @return true if the reader accepts message. - */ - virtual bool process_heartbeat_msg( - const fastdds::rtps::GUID_t& writerGUID, - uint32_t hbCount, - const fastdds::rtps::SequenceNumber_t& firstSN, - const fastdds::rtps::SequenceNumber_t& lastSN, - bool finalFlag, - bool livelinessFlag, - VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0; - - /** - * @brief Process an incoming GAP message. - * - * @param writerGUID - * @param gapStart - * @param gapList - * @param origin_vendor_id - * - * @return true if the reader accepts message. - */ - virtual bool process_gap_msg( - const fastdds::rtps::GUID_t& writerGUID, - const fastdds::rtps::SequenceNumber_t& gapStart, - const fastdds::rtps::SequenceNumberSet_t& gapList, - VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0; - - /** - * @brief Waits for not being referenced/used by any other entity. - */ - virtual void local_actions_on_reader_removed(); - -#ifdef FASTDDS_STATISTICS - - bool add_statistics_listener( - std::shared_ptr listener) override; - - bool remove_statistics_listener( - std::shared_ptr listener) override; - - void set_enabled_statistics_writers_mask( - uint32_t enabled_writers) override; - -#endif // FASTDDS_STATISTICS - - virtual ~BaseReader(); - -protected: - - BaseReader( - fastdds::rtps::RTPSParticipantImpl* pimpl, - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::ReaderAttributes& att, - fastdds::rtps::ReaderHistory* hist, - fastdds::rtps::ReaderListener* listen); - - BaseReader( - fastdds::rtps::RTPSParticipantImpl* pimpl, - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::ReaderAttributes& att, - const std::shared_ptr& payload_pool, - fastdds::rtps::ReaderHistory* hist, - fastdds::rtps::ReaderListener* listen); - - BaseReader( - fastdds::rtps::RTPSParticipantImpl* pimpl, - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::ReaderAttributes& att, - const std::shared_ptr& payload_pool, - const std::shared_ptr& change_pool, - fastdds::rtps::ReaderHistory* hist, - fastdds::rtps::ReaderListener* listen); - - /** - * @brief Whether a history record may be removed. - * - * @param removed_by_lease Whether the history record is to be removed due to a participant drop. - * - * @return Whether the history record may be removed. - */ - virtual bool may_remove_history_record( - bool removed_by_lease); - - /** - * @brief Add a remote writer to the persistence_guid map. - * - * @param guid GUID of the remote writer. - * @param persistence_guid Persistence GUID of the remote writer. - */ - void add_persistence_guid( - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::GUID_t& persistence_guid); - - /** - * @brief Remove a remote writer from the persistence_guid map. - * - * @param guid GUID of the remote writer. - * @param persistence_guid Persistence GUID of the remote writer. - * @param removed_by_lease Whether the GUIDs are being removed due to a participant drop. - */ - void remove_persistence_guid( - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::GUID_t& persistence_guid, - bool removed_by_lease); - - /** - * @brief Get the last notified sequence for a writer's GUID. - * - * @param guid The writer GUID to query. - * - * @return Last notified sequence number for input guid. - * @remarks Takes persistence_guid into consideration. - */ - fastdds::rtps::SequenceNumber_t get_last_notified( - const fastdds::rtps::GUID_t& guid); - - /** - * @brief Update the last notified sequence for a writer's GUID. - * - * @param guid The GUID of the writer. - * @param seq Max sequence number available on writer. - * - * @return Previous value of last notified sequence number for input GUID. - * @remarks Takes persistence_guid into consideration. - */ - fastdds::rtps::SequenceNumber_t update_last_notified( - const fastdds::rtps::GUID_t& guid, - const fastdds::rtps::SequenceNumber_t& seq); - - /** - * @brief Persist the last notified sequence for a persistence guid. - * This method is called inside update_last_notified just after updating the last notified sequence for a writer - * and gives persistent readers the opportunity to write the new sequence number to the database. - * - * @param persistence_guid The persistence guid to update. - * @param seq Sequence number to set for input guid. - */ - virtual void persist_last_notified_nts( - const fastdds::rtps::GUID_t& persistence_guid, - const fastdds::rtps::SequenceNumber_t& seq); - - /** - * @brief Check if a writer can communicate with this reader using data-sharing. - * - * @param wdata Discovery information of the writer to check. - * - * @return Whether the writer is datasharing compatible with this reader or not. - */ - bool is_datasharing_compatible_with( - const fastdds::rtps::WriterProxyData& wdata); - - /// Pool of serialized payloads. - std::shared_ptr payload_pool_; - - /// Pool of cache changes. - std::shared_ptr change_pool_; - - /// Pointer to the listener associated with this reader. - fastdds::rtps::ReaderListener* listener_; - /// Whether the reader accepts messages from unmatched writers. - bool accept_messages_from_unkown_writers_; - /// Whether the reader expects inline QoS. - bool expects_inline_qos_; - - /// The data filter associated with this reader. - IReaderDataFilter* data_filter_ = nullptr; - - /// The history record associated with this reader. - fastdds::rtps::ReaderHistoryState* history_state_; - - /// Total number of unread samples in the history. - uint64_t total_unread_ = 0; - /// Condition variable to wait for unread samples. - fastdds::TimedConditionVariable new_notification_cv_; - - /// The liveliness kind of this reader. - fastdds::dds::LivelinessQosPolicyKind liveliness_kind_; - /// The liveliness lease duration of this reader. - fastdds::dds::Duration_t liveliness_lease_duration_; - - /// Whether the reader is datasharing compatible. - bool is_datasharing_compatible_ = false; - /// The listener for the datasharing notifications. - std::unique_ptr datasharing_listener_; - - /// The liveliness changed status struct as defined in the DDS standard. - fastdds::dds::LivelinessChangedStatus liveliness_changed_status_; - - /// Trusted writer (for Builtin) - fastdds::rtps::EntityId_t trusted_writer_entity_id_; - - /// RefCountedPointer of this instance. - std::shared_ptr local_ptr_; - -private: - - /** - * @brief Perform pools related setup. - * This method is called from the constructor to perform the necessary setup for the payload and change pools. - * - * @param payload_pool Payload pool to use. - * @param change_pool Change pool to use. - */ - void init( - const std::shared_ptr& payload_pool, - const std::shared_ptr& change_pool); - - /** - * @brief Perform datasharing related setup. - * This method is called from the constructor to perform the necessary setup for datasharing. - * If datasharing is enabled in the reader attributes, the method will create the necessary notification - * segment, along with the corresponding listener. - * - * @param att Attributes of the reader. - */ - void setup_datasharing( - const fastdds::rtps::ReaderAttributes& att); - -}; - -} // namespace rtps -} // namespace fastdds -} // namespace eprosima - -#endif // FASTDDS_RTPS_READER__BASEREADER_HPP diff --git a/src/cpp/rtps/reader/RTPSReader.cpp b/src/cpp/rtps/reader/RTPSReader.cpp index 297feee6ef9..d98bcd7aa72 100644 --- a/src/cpp/rtps/reader/RTPSReader.cpp +++ b/src/cpp/rtps/reader/RTPSReader.cpp @@ -21,19 +21,11 @@ #include #include -#include -#include - -#include - -#include - -#include +#include #include - -#include #include +#include #include #include @@ -41,6 +33,12 @@ #include +#include +#include +#include +#include +#include + namespace eprosima { namespace fastrtps { @@ -105,6 +103,11 @@ RTPSReader::RTPSReader( init(payload_pool, change_pool, att); } +void RTPSReader::local_actions_on_reader_removed() +{ + local_ptr_->deactivate(); +} + void RTPSReader::init( const std::shared_ptr& payload_pool, const std::shared_ptr& change_pool, @@ -214,6 +217,11 @@ bool RTPSReader::setListener( return true; } +std::shared_ptr RTPSReader::get_local_pointer() +{ + return local_ptr_; +} + History::const_iterator RTPSReader::findCacheInFragmentedProcess( const SequenceNumber_t& sequence_number, const GUID_t& writer_guid, diff --git a/src/cpp/rtps/writer/ReaderLocator.cpp b/src/cpp/rtps/writer/ReaderLocator.cpp index 3548591e47a..854af3c03b2 100644 --- a/src/cpp/rtps/writer/ReaderLocator.cpp +++ b/src/cpp/rtps/writer/ReaderLocator.cpp @@ -202,11 +202,7 @@ bool ReaderLocator::send( return true; } -<<<<<<< HEAD -RTPSReader* ReaderLocator::local_reader() -======= LocalReaderPointer::Instance ReaderLocator::local_reader() ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) { if (!local_reader_) { @@ -224,20 +220,11 @@ void ReaderLocator::datasharing_notify() { if (is_local_reader()) { -<<<<<<< HEAD - reader = local_reader(); - } - - if (reader) - { - reader->datasharing_listener()->notify(true); -======= LocalReaderPointer::Instance reader = local_reader(); if (reader) { reader->datasharing_listener()->notify(true); } ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } else { diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 2254532dfc3..df8e15a1a22 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -45,15 +46,7 @@ #include #include #include -<<<<<<< HEAD #include -======= -#include -#include -#include -#include -#include ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) #include #include "../builtin/discovery/database/DiscoveryDataBase.hpp" @@ -473,23 +466,14 @@ bool StatefulWriter::intraprocess_delivery( CacheChange_t* change, ReaderProxy* reader_proxy) { -<<<<<<< HEAD - RTPSReader* reader = reader_proxy->local_reader(); - if (reader) -======= LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); if (local_reader) ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) { if (change->write_params.related_sample_identity() != SampleIdentity::unknown()) { change->write_params.sample_identity(change->write_params.related_sample_identity()); } -<<<<<<< HEAD - return reader->processDataMsg(change); -======= - return local_reader->process_data_msg(change); ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) + return local_reader->processDataMsg(change); } return false; } @@ -502,12 +486,8 @@ bool StatefulWriter::intraprocess_gap( LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); if (local_reader) { -<<<<<<< HEAD - return reader->processGapMsg(m_guid, first_seq, SequenceNumberSet_t(last_seq), c_VendorId_eProsima); -======= - return local_reader->process_gap_msg( + return local_reader->processGapMsg( m_guid, first_seq, SequenceNumberSet_t(last_seq), c_VendorId_eProsima); ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } return false; @@ -538,18 +518,11 @@ bool StatefulWriter::intraprocess_heartbeat( if ((first_seq != c_SequenceNumber_Unknown && last_seq != c_SequenceNumber_Unknown) && (liveliness || reader_proxy->has_changes())) { -<<<<<<< HEAD incrementHBCount(); - returned_value = - reader->processHeartbeatMsg(m_guid, m_heartbeatCount, first_seq, last_seq, true, liveliness, - c_VendorId_eProsima); -======= - increment_hb_count(); - Count_t hb_count = heartbeat_count_; + Count_t hb_count = m_heartbeatCount; lockW.unlock(); - returned_value = local_reader->process_heartbeat_msg( + returned_value = local_reader->processHeartbeatMsg( m_guid, hb_count, first_seq, last_seq, true, liveliness, c_VendorId_eProsima); ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } } diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 19bc388ae62..2fa8e2d041c 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -353,11 +353,7 @@ bool StatelessWriter::intraprocess_delivery( { change->write_params.sample_identity(change->write_params.related_sample_identity()); } -<<<<<<< HEAD - return reader->processDataMsg(change); -======= - return local_reader->process_data_msg(change); ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) + return local_reader->processDataMsg(change); } return false; @@ -933,13 +929,8 @@ bool StatelessWriter::get_connections( //! intraprocess for_matched_readers(matched_local_readers_, [&connection, &connection_list](ReaderLocator& reader) { -<<<<<<< HEAD - connection.guid(fastdds::statistics::to_statistics_type(reader.local_reader()->getGuid())); - connection.mode(fastdds::statistics::INTRAPROCESS); -======= connection.guid(fastdds::statistics::to_statistics_type(reader.remote_guid())); connection.mode(fastdds::statistics::ConnectionMode::INTRAPROCESS); ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) connection_list.push_back(connection); return false; diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index 4bdcbc50bbd..755e9f833ce 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -900,33 +900,6 @@ TEST(DDSBasic, max_output_message_size_writer) } -<<<<<<< HEAD -======= -/** - * @test This test checks that it is possible to register two TypeSupport instances of the same type - * under the same DomainParticipant. - */ -TEST(DDSBasic, register_two_identical_typesupports) -{ - // Set DomainParticipantFactory to create disabled entities - DomainParticipantFactory* factory = DomainParticipantFactory::get_instance(); - ASSERT_NE(nullptr, factory); - - // Create a disabled DomainParticipant, setting it to in turn create disable entities - DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT); - ASSERT_NE(nullptr, participant); - - // Register a type support - TypeSupport type_support_1; - type_support_1.reset(new HelloWorldPubSubType()); - EXPECT_EQ(RETCODE_OK, participant->register_type(type_support_1)); - - // Register a second instance of the type support with the same TopicDataType - TypeSupport type_support_2; - type_support_2.reset(new HelloWorldPubSubType()); - EXPECT_EQ(RETCODE_OK, participant->register_type(type_support_2)); -} - /** * @test This is a regression test for Redmine Issue 21293. * The destruction among intra-process participants should be correctly performed. @@ -935,15 +908,12 @@ TEST(DDSBasic, register_two_identical_typesupports) */ TEST(DDSBasic, successful_destruction_among_intraprocess_participants) { - namespace dds = eprosima::fastdds::dds; - auto factory = dds::DomainParticipantFactory::get_instance(); - // Set intraprocess delivery to full - LibrarySettings library_settings; - factory->get_library_settings(library_settings); + fastrtps::LibrarySettingsAttributes library_settings; + library_settings = fastrtps::xmlparser::XMLProfileManager::library_settings(); auto old_library_settings = library_settings; - library_settings.intraprocess_delivery = INTRAPROCESS_FULL; - factory->set_library_settings(library_settings); + library_settings.intraprocess_delivery = fastrtps::INTRAPROCESS_FULL; + fastrtps::xmlparser::XMLProfileManager::library_settings(library_settings); { auto participant_1 = std::make_shared>(1u, 1u, 1u, 1u); @@ -1007,9 +977,11 @@ TEST(DDSBasic, successful_destruction_among_intraprocess_participants) rec_thread.join(); } } + + // Restore library settings + fastrtps::xmlparser::XMLProfileManager::library_settings(old_library_settings); } ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/test/mock/rtps/RTPSReader/fastdds/rtps/reader/RTPSReader.h b/test/mock/rtps/RTPSReader/fastdds/rtps/reader/RTPSReader.h index 77743ee37d8..1cdfb9194ca 100644 --- a/test/mock/rtps/RTPSReader/fastdds/rtps/reader/RTPSReader.h +++ b/test/mock/rtps/RTPSReader/fastdds/rtps/reader/RTPSReader.h @@ -156,6 +156,10 @@ class RTPSReader : public Endpoint return true; } + virtual void local_actions_on_reader_removed() + { + } + virtual bool change_removed_by_history( CacheChange_t*, WriterProxy*) diff --git a/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h b/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h index 48f5ef59434..45318bc561a 100644 --- a/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h +++ b/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h @@ -22,16 +22,14 @@ #define _FASTDDS_RTPS_READERLOCATOR_H_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -#include -#include +#include #include +#include +#include #include #include -#include #include - - -#include +#include namespace eprosima { namespace fastrtps { @@ -206,11 +204,7 @@ class ReaderLocator : public RTPSMessageSenderInterface return false; } -<<<<<<< HEAD:test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h - RTPSReader* local_reader() -======= LocalReaderPointer::Instance local_reader() ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):test/mock/rtps/ReaderLocator/rtps/writer/ReaderLocator.hpp { return LocalReaderPointer::Instance(std::shared_ptr()); } diff --git a/test/unittest/utils/CMakeLists.txt b/test/unittest/utils/CMakeLists.txt index a5f8c20e938..a79af73b556 100644 --- a/test/unittest/utils/CMakeLists.txt +++ b/test/unittest/utils/CMakeLists.txt @@ -80,15 +80,9 @@ set(SYSTEMINFOTESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp) -<<<<<<< HEAD -======= -set(TREETESTS_SOURCE - TreeNodeTests.cpp) - set(REF_COUNTED_POINTER_TESTS_SOURCE RefCountedPointerTests.cpp) ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) include_directories(mock/) add_executable(StringMatchingTests ${STRINGMATCHINGTESTS_SOURCE}) @@ -188,19 +182,12 @@ target_include_directories(SharedMutexTests PUBLIC ${PROJECT_SOURCE_DIR}/include target_link_libraries(SharedMutexTests PUBLIC GTest::gtest) gtest_discover_tests(SharedMutexTests) -<<<<<<< HEAD -======= -add_executable(TreeNodeTests ${TREETESTS_SOURCE}) -target_include_directories(TreeNodeTests PRIVATE ${PROJECT_SOURCE_DIR}/src/cpp) -target_link_libraries(TreeNodeTests PUBLIC GTest::gtest) -gtest_discover_tests(TreeNodeTests) - add_executable(RefCountedPointerTests ${REF_COUNTED_POINTER_TESTS_SOURCE}) -target_include_directories(RefCountedPointerTests PRIVATE ${PROJECT_SOURCE_DIR}/src/cpp) +target_include_directories(RefCountedPointerTests PRIVATE + ${PROJECT_SOURCE_DIR}/include ${PROJECT_SOURCE_DIR}/src/cpp) target_link_libraries(RefCountedPointerTests PUBLIC GTest::gtest) gtest_discover_tests(RefCountedPointerTests) ->>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)) ############################################################################### # Necessary files ############################################################################### diff --git a/test/unittest/utils/RefCountedPointerTests.cpp b/test/unittest/utils/RefCountedPointerTests.cpp index a939cc7b414..705fec623e7 100644 --- a/test/unittest/utils/RefCountedPointerTests.cpp +++ b/test/unittest/utils/RefCountedPointerTests.cpp @@ -18,12 +18,12 @@ #include #include -#include +#include using namespace std; namespace eprosima { -namespace fastdds { +namespace fastrtps { struct EntityMock { From e837ad718dac96208bb6387f76941b8f67a3f623 Mon Sep 17 00:00:00 2001 From: Mario Dominguez Date: Fri, 15 Nov 2024 11:24:55 +0100 Subject: [PATCH 3/5] Apply Miguel's suggestions: make LocalReaderPointer inherit RefCounterPointer<> and add DOXYGEN_SHOULD_SKIP_THIS_PUBLIC Signed-off-by: Mario Dominguez --- .../fastdds/rtps/reader/LocalReaderPointer.hpp | 9 ++++++++- include/fastdds/rtps/reader/RTPSReader.h | 18 +++++++++--------- include/fastrtps/utils/RefCountedPointer.hpp | 4 ++++ src/cpp/rtps/participant/RTPSParticipantImpl.h | 2 +- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/include/fastdds/rtps/reader/LocalReaderPointer.hpp b/include/fastdds/rtps/reader/LocalReaderPointer.hpp index 63735de0259..c5adb067f4a 100644 --- a/include/fastdds/rtps/reader/LocalReaderPointer.hpp +++ b/include/fastdds/rtps/reader/LocalReaderPointer.hpp @@ -19,6 +19,8 @@ #ifndef FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP #define FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP +#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + #include namespace eprosima { @@ -28,10 +30,15 @@ namespace rtps { class RTPSReader; -using LocalReaderPointer = RefCountedPointer; +struct LocalReaderPointer : public RefCountedPointer +{ + +}; } // namespace rtps } // namespace fastdds } // namespace eprosima +#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + #endif // FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP diff --git a/include/fastdds/rtps/reader/RTPSReader.h b/include/fastdds/rtps/reader/RTPSReader.h index 8b9a8c4ac17..fd6deb15316 100644 --- a/include/fastdds/rtps/reader/RTPSReader.h +++ b/include/fastdds/rtps/reader/RTPSReader.h @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -47,6 +46,7 @@ struct CacheChange_t; struct ReaderHistoryState; class WriterProxyData; class IDataSharingListener; +struct LocalReaderPointer; /** * Class RTPSReader, manages the reception of data from its matched writers. @@ -208,14 +208,6 @@ class RTPSReader RTPS_DllAPI bool setListener( ReaderListener* target); - /** - * @brief Retrieves the local pointer to this reader - * to be used by other local entities. - * - * @return Local pointer to this reader. - */ - RTPS_DllAPI std::shared_ptr get_local_pointer(); - /** * Reserve a CacheChange_t. * @param change Pointer to pointer to the Cache. @@ -499,6 +491,14 @@ class RTPSReader bool is_datasharing_compatible_with( const WriterProxyData& wdata); + /** + * @brief Retrieves the local pointer to this reader + * to be used by other local entities. + * + * @return Local pointer to this reader. + */ + std::shared_ptr get_local_pointer(); + //!ReaderHistory ReaderHistory* mp_history; //!Listener diff --git a/include/fastrtps/utils/RefCountedPointer.hpp b/include/fastrtps/utils/RefCountedPointer.hpp index 2044cce22f6..572ef84dffb 100644 --- a/include/fastrtps/utils/RefCountedPointer.hpp +++ b/include/fastrtps/utils/RefCountedPointer.hpp @@ -19,6 +19,8 @@ #ifndef UTILS__REFCOUNTEDPOINTER_HPP #define UTILS__REFCOUNTEDPOINTER_HPP +#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + #include #include #include @@ -215,4 +217,6 @@ class RefCountedPointer } // namespace fastdds } // namespace eprosima +#endif // DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + #endif // UTILS__REFCOUNTEDPOINTER_HPP diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 999e9d00f1e..4cb4f3f8b22 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -45,7 +45,6 @@ #include #include #include -#include #include #include #include @@ -122,6 +121,7 @@ class PDP; class PDPSimple; class IPersistenceService; class WLP; +struct LocalReaderPointer; /** * @brief Class RTPSParticipantImpl, it contains the private implementation of the RTPSParticipant functions and From e2c4e5e599bb2e57e06c74a362a1f833bf59d3d7 Mon Sep 17 00:00:00 2001 From: Mario Dominguez Date: Fri, 15 Nov 2024 11:38:45 +0100 Subject: [PATCH 4/5] Apply NIT Signed-off-by: Mario Dominguez --- include/fastdds/rtps/reader/LocalReaderPointer.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/include/fastdds/rtps/reader/LocalReaderPointer.hpp b/include/fastdds/rtps/reader/LocalReaderPointer.hpp index c5adb067f4a..848586cb092 100644 --- a/include/fastdds/rtps/reader/LocalReaderPointer.hpp +++ b/include/fastdds/rtps/reader/LocalReaderPointer.hpp @@ -24,7 +24,6 @@ #include namespace eprosima { - namespace fastrtps { namespace rtps { From f2015fe4ced56cf337db89cc0182bbb13ea52cda Mon Sep 17 00:00:00 2001 From: Mario Dominguez Date: Tue, 19 Nov 2024 07:54:30 +0100 Subject: [PATCH 5/5] Construct LocalReaderPointer in RTPSReader Signed-off-by: Mario Dominguez --- include/fastdds/rtps/reader/LocalReaderPointer.hpp | 7 +++++++ src/cpp/rtps/reader/RTPSReader.cpp | 2 ++ 2 files changed, 9 insertions(+) diff --git a/include/fastdds/rtps/reader/LocalReaderPointer.hpp b/include/fastdds/rtps/reader/LocalReaderPointer.hpp index 848586cb092..7642dfb4e9e 100644 --- a/include/fastdds/rtps/reader/LocalReaderPointer.hpp +++ b/include/fastdds/rtps/reader/LocalReaderPointer.hpp @@ -31,6 +31,13 @@ class RTPSReader; struct LocalReaderPointer : public RefCountedPointer { + LocalReaderPointer( + RTPSReader* ptr) + : RefCountedPointer(ptr) + { + } + + virtual ~LocalReaderPointer() = default; }; diff --git a/src/cpp/rtps/reader/RTPSReader.cpp b/src/cpp/rtps/reader/RTPSReader.cpp index d98bcd7aa72..f6ec0089a3d 100644 --- a/src/cpp/rtps/reader/RTPSReader.cpp +++ b/src/cpp/rtps/reader/RTPSReader.cpp @@ -147,6 +147,8 @@ void RTPSReader::init( mp_history->mp_reader = this; mp_history->mp_mutex = &mp_mutex; + local_ptr_ = std::make_shared(this); + EPROSIMA_LOG_INFO(RTPS_READER, "RTPSReader created correctly"); }