diff --git a/include/fastdds/rtps/builtin/discovery/participant/PDP.h b/include/fastdds/rtps/builtin/discovery/participant/PDP.h index 04c090e0ee6..80d4e26b32b 100644 --- a/include/fastdds/rtps/builtin/discovery/participant/PDP.h +++ b/include/fastdds/rtps/builtin/discovery/participant/PDP.h @@ -115,6 +115,11 @@ class PDP */ bool enable(); + /** + * @brief Disable the Participant Discovery Protocol + */ + void disable(); + virtual bool init( RTPSParticipantImpl* part) = 0; @@ -545,6 +550,21 @@ class PDP */ void set_initial_announcement_interval(); + /** + * Performs all the necessary actions after removing a ParticipantProxyData from the + * participant_proxies_ collection. + * + * @param pdata ParticipantProxyData that was removed. + * @param partGUID GUID of the removed participant. + * @param reason Reason why the participant was removed. + * @param listener Listener to be notified of the unmatches / removal. + */ + void actions_on_remote_participant_removed( + ParticipantProxyData* pdata, + const GUID_t& partGUID, + ParticipantDiscoveryInfo::DISCOVERY_STATUS reason, + RTPSParticipantListener* listener); + }; diff --git a/src/cpp/rtps/builtin/BuiltinProtocols.cpp b/src/cpp/rtps/builtin/BuiltinProtocols.cpp index 54921c43dae..3a6f17b594b 100644 --- a/src/cpp/rtps/builtin/BuiltinProtocols.cpp +++ b/src/cpp/rtps/builtin/BuiltinProtocols.cpp @@ -58,9 +58,12 @@ BuiltinProtocols::BuiltinProtocols() BuiltinProtocols::~BuiltinProtocols() { // Send participant is disposed - if (mp_PDP != nullptr) + if (nullptr != mp_PDP) { + // Send participant is disposed mp_PDP->announceParticipantState(true, true); + // Consider all discovered participants as disposed + mp_PDP->disable(); } // TODO Auto-generated destructor stub @@ -194,7 +197,7 @@ bool BuiltinProtocols::addLocalWriter( { bool ok = true; - if (mp_PDP != nullptr) + if (nullptr != mp_PDP) { ok = mp_PDP->getEDP()->newLocalWriterProxyData(w, topicAtt, wqos); @@ -209,7 +212,7 @@ bool BuiltinProtocols::addLocalWriter( logWarning(RTPS_EDP, "EDP is not used in this Participant, register a Writer is impossible"); } - if (mp_WLP != nullptr) + if (nullptr != mp_WLP) { ok &= mp_WLP->add_local_writer(w, wqos); } @@ -228,7 +231,7 @@ bool BuiltinProtocols::addLocalReader( { bool ok = true; - if (mp_PDP != nullptr) + if (nullptr != mp_PDP) { ok = mp_PDP->getEDP()->newLocalReaderProxyData(R, topicAtt, rqos, content_filter); @@ -243,7 +246,7 @@ bool BuiltinProtocols::addLocalReader( logWarning(RTPS_EDP, "EDP is not used in this Participant, register a Reader is impossible"); } - if (mp_WLP != nullptr) + if (nullptr != mp_WLP) { ok &= mp_WLP->add_local_reader(R, rqos); } @@ -257,7 +260,7 @@ bool BuiltinProtocols::updateLocalWriter( const WriterQos& wqos) { bool ok = false; - if (mp_PDP != nullptr && mp_PDP->getEDP() != nullptr) + if ((nullptr != mp_PDP) && (nullptr != mp_PDP->getEDP())) { ok = mp_PDP->getEDP()->updatedLocalWriter(W, topicAtt, wqos); } @@ -271,7 +274,7 @@ bool BuiltinProtocols::updateLocalReader( const fastdds::rtps::ContentFilterProperty* content_filter) { bool ok = false; - if (mp_PDP != nullptr && mp_PDP->getEDP() != nullptr) + if ((nullptr != mp_PDP) && (nullptr != mp_PDP->getEDP())) { ok = mp_PDP->getEDP()->updatedLocalReader(R, topicAtt, rqos, content_filter); } @@ -282,11 +285,11 @@ bool BuiltinProtocols::removeLocalWriter( RTPSWriter* W) { bool ok = false; - if (mp_WLP != nullptr) + if (nullptr != mp_WLP) { ok |= mp_WLP->remove_local_writer(W); } - if (mp_PDP != nullptr && mp_PDP->getEDP() != nullptr) + if ((nullptr != mp_PDP) && (nullptr != mp_PDP->getEDP())) { ok |= mp_PDP->getEDP()->removeLocalWriter(W); } @@ -297,11 +300,11 @@ bool BuiltinProtocols::removeLocalReader( RTPSReader* R) { bool ok = false; - if (mp_WLP != nullptr) + if (nullptr != mp_WLP) { ok |= mp_WLP->remove_local_reader(R); } - if (mp_PDP != nullptr && mp_PDP->getEDP() != nullptr) + if ((nullptr != mp_PDP) && (nullptr != mp_PDP->getEDP())) { ok |= mp_PDP->getEDP()->removeLocalReader(R); } diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp index 02debbd6bd6..66138939844 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp @@ -318,7 +318,7 @@ void PDP::initializeParticipantProxyData( participant_data->metatraffic_locators.unicast.clear(); if (announce_locators) { - for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList) + for (const Locator_t& loc : mp_builtin->m_metatrafficUnicastLocatorList) { participant_data->metatraffic_locators.add_unicast_locator(loc); } @@ -329,7 +329,7 @@ void PDP::initializeParticipantProxyData( { if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty()) { - for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList) + for (const Locator_t& loc: mp_builtin->m_metatrafficMulticastLocatorList) { participant_data->metatraffic_locators.add_multicast_locator(loc); } @@ -344,7 +344,7 @@ void PDP::initializeParticipantProxyData( if (mp_RTPSParticipant->is_secure()) { IdentityToken* identity_token = nullptr; - if (mp_RTPSParticipant->security_manager().get_identity_token(&identity_token) && identity_token != nullptr) + if (mp_RTPSParticipant->security_manager().get_identity_token(&identity_token) && (nullptr != identity_token)) { participant_data->identity_token_ = std::move(*identity_token); mp_RTPSParticipant->security_manager().return_identity_token(identity_token); @@ -352,7 +352,7 @@ void PDP::initializeParticipantProxyData( PermissionsToken* permissions_token = nullptr; if (mp_RTPSParticipant->security_manager().get_permissions_token(&permissions_token) - && permissions_token != nullptr) + && (nullptr != permissions_token)) { participant_data->permissions_token_ = std::move(*permissions_token); mp_RTPSParticipant->security_manager().return_permissions_token(permissions_token); @@ -407,11 +407,13 @@ bool PDP::initPDP( //UPDATE METATRAFFIC. update_builtin_locators(); - mp_mutex->lock(); - ParticipantProxyData* pdata = add_participant_proxy_data(mp_RTPSParticipant->getGuid(), false, nullptr); - mp_mutex->unlock(); + ParticipantProxyData* pdata = nullptr; + { + std::lock_guard guardPDP(*mp_mutex); + pdata = add_participant_proxy_data(mp_RTPSParticipant->getGuid(), false, nullptr); + } - if (pdata == nullptr) + if (nullptr == pdata) { return false; } @@ -458,6 +460,24 @@ bool PDP::enable() return builtin_endpoints_->enable_pdp_readers(mp_RTPSParticipant); } +void PDP::disable() +{ + // Extract all the participant proxies excluding first one (ourselves) + std::vector participants; + { + std::lock_guard guardPDP(*mp_mutex); + participants.insert(participants.end(), participant_proxies_.begin() + 1, participant_proxies_.end()); + participant_proxies_.erase(participant_proxies_.begin() + 1, participant_proxies_.end()); + } + + // Unmatch all remote participants + for (ParticipantProxyData* pdata : participants) + { + actions_on_remote_participant_removed(pdata, pdata->m_guid, + ParticipantDiscoveryInfo::DISCOVERY_STATUS::REMOVED_PARTICIPANT, nullptr); + } +} + void PDP::announceParticipantState( RTPSWriter& writer, WriterHistory& history, @@ -474,11 +494,11 @@ void PDP::announceParticipantState( { if (m_hasChangedLocalPDP.exchange(false) || new_change) { - this->mp_mutex->lock(); + mp_mutex->lock(); ParticipantProxyData* local_participant_data = getLocalParticipantProxyData(); InstanceHandle_t key = local_participant_data->m_key; ParticipantProxyData proxy_data_copy(*local_participant_data); - this->mp_mutex->unlock(); + mp_mutex->unlock(); if (history.getHistorySize() > 0) { @@ -492,7 +512,7 @@ void PDP::announceParticipantState( }, ALIVE, key); - if (change != nullptr) + if (nullptr != change) { CDRMessage_t aux_msg(change->serializedPayload); @@ -520,11 +540,11 @@ void PDP::announceParticipantState( } else { - this->mp_mutex->lock(); + mp_mutex->lock(); ParticipantProxyData* local_participant_data = getLocalParticipantProxyData(); InstanceHandle_t key = local_participant_data->m_key; ParticipantProxyData proxy_data_copy(*local_participant_data); - this->mp_mutex->unlock(); + mp_mutex->unlock(); if (history.getHistorySize() > 0) { @@ -537,7 +557,7 @@ void PDP::announceParticipantState( }, NOT_ALIVE_DISPOSED_UNREGISTERED, key); - if (change != nullptr) + if (nullptr != change) { CDRMessage_t aux_msg(change->serializedPayload); @@ -592,7 +612,7 @@ void PDP::notify_and_maybe_ignore_new_participant( << " DefLoc:" << pdata->default_locators); RTPSParticipantListener* listener = getRTPSParticipant()->getListener(); - if (listener != nullptr) + if (listener) { { std::lock_guard cb_lock(callback_mtx_); @@ -610,7 +630,7 @@ void PDP::notify_and_maybe_ignore_new_participant( bool PDP::has_reader_proxy_data( const GUID_t& reader) { - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); for (ParticipantProxyData* pit : participant_proxies_) { if (pit->m_guid.guidPrefix == reader.guidPrefix) @@ -626,7 +646,7 @@ bool PDP::lookupReaderProxyData( const GUID_t& reader, ReaderProxyData& rdata) { - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); for (ParticipantProxyData* pit : participant_proxies_) { if (pit->m_guid.guidPrefix == reader.guidPrefix) @@ -645,7 +665,7 @@ bool PDP::lookupReaderProxyData( bool PDP::has_writer_proxy_data( const GUID_t& writer) { - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); for (ParticipantProxyData* pit : participant_proxies_) { if (pit->m_guid.guidPrefix == writer.guidPrefix) @@ -661,7 +681,7 @@ bool PDP::lookupWriterProxyData( const GUID_t& writer, WriterProxyData& wdata) { - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); for (ParticipantProxyData* pit : participant_proxies_) { if (pit->m_guid.guidPrefix == writer.guidPrefix) @@ -681,7 +701,7 @@ bool PDP::removeReaderProxyData( const GUID_t& reader_guid) { logInfo(RTPS_PDP, "Removing reader proxy data " << reader_guid); - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); for (ParticipantProxyData* pit : participant_proxies_) { @@ -718,7 +738,7 @@ bool PDP::removeWriterProxyData( const GUID_t& writer_guid) { logInfo(RTPS_PDP, "Removing writer proxy data " << writer_guid); - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); for (ParticipantProxyData* pit : participant_proxies_) { @@ -756,7 +776,7 @@ bool PDP::lookup_participant_name( const GUID_t& guid, string_255& name) { - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); for (ParticipantProxyData* pit : participant_proxies_) { if (pit->m_guid == guid) @@ -772,7 +792,7 @@ bool PDP::lookup_participant_key( const GUID_t& participant_guid, InstanceHandle_t& key) { - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); for (ParticipantProxyData* pit : participant_proxies_) { if (pit->m_guid == participant_guid) @@ -795,7 +815,7 @@ ReaderProxyData* PDP::addReaderProxyData( // notify statistics module getRTPSParticipant()->on_entity_discovery(reader_guid, ParameterPropertyList_t()); - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); for (ParticipantProxyData* pit : participant_proxies_) { @@ -891,7 +911,7 @@ WriterProxyData* PDP::addWriterProxyData( // notify statistics module getRTPSParticipant()->on_entity_discovery(writer_guid, ParameterPropertyList_t()); - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); for (ParticipantProxyData* pit : participant_proxies_) { @@ -1005,90 +1025,105 @@ bool PDP::remove_remote_participant( logInfo(RTPS_PDP, partGUID ); ParticipantProxyData* pdata = nullptr; - //Remove it from our vector or RTPSParticipantProxies: - this->mp_mutex->lock(); - for (ResourceLimitedVector::iterator pit = participant_proxies_.begin(); - pit != participant_proxies_.end(); ++pit) + //Remove it from our vector of RTPSParticipantProxies { - if ((*pit)->m_guid == partGUID) + std::lock_guard guardPDP(*mp_mutex); + for (ResourceLimitedVector::iterator pit = participant_proxies_.begin(); + pit != participant_proxies_.end(); ++pit) { - pdata = *pit; - participant_proxies_.erase(pit); - break; + if ((*pit)->m_guid == partGUID) + { + pdata = *pit; + participant_proxies_.erase(pit); + break; + } } } - this->mp_mutex->unlock(); - if (pdata != nullptr) + if (nullptr != pdata) { - if (mp_EDP != nullptr) - { - RTPSParticipantListener* listener = mp_RTPSParticipant->getListener(); + RTPSParticipantListener* listener = mp_RTPSParticipant->getListener(); + actions_on_remote_participant_removed(pdata, partGUID, reason, listener); + return true; + } + + return false; +} + +void PDP::actions_on_remote_participant_removed( + ParticipantProxyData* pdata, + const GUID_t& partGUID, + ParticipantDiscoveryInfo::DISCOVERY_STATUS reason, + RTPSParticipantListener* listener) +{ + assert(nullptr != pdata); - for (auto pit : *pdata->m_readers) + if (nullptr != mp_EDP) + { + for (auto pit : *pdata->m_readers) + { + ReaderProxyData* rit = pit.second; + GUID_t reader_guid(rit->guid()); + if (reader_guid != c_Guid_Unknown) { - ReaderProxyData* rit = pit.second; - GUID_t reader_guid(rit->guid()); - if (reader_guid != c_Guid_Unknown) - { - mp_EDP->unpairReaderProxy(partGUID, reader_guid); + mp_EDP->unpairReaderProxy(partGUID, reader_guid); - if (listener) - { - ReaderDiscoveryInfo info(std::move(*rit)); - info.status = ReaderDiscoveryInfo::REMOVED_READER; - listener->onReaderDiscovery(mp_RTPSParticipant->getUserRTPSParticipant(), std::move(info)); - } + if (listener) + { + ReaderDiscoveryInfo info(std::move(*rit)); + info.status = ReaderDiscoveryInfo::REMOVED_READER; + listener->onReaderDiscovery(mp_RTPSParticipant->getUserRTPSParticipant(), std::move(info)); } } - for (auto pit : *pdata->m_writers) + } + for (auto pit : *pdata->m_writers) + { + WriterProxyData* wit = pit.second; + GUID_t writer_guid(wit->guid()); + if (writer_guid != c_Guid_Unknown) { - WriterProxyData* wit = pit.second; - GUID_t writer_guid(wit->guid()); - if (writer_guid != c_Guid_Unknown) - { - mp_EDP->unpairWriterProxy(partGUID, writer_guid, - reason == ParticipantDiscoveryInfo::DISCOVERY_STATUS::DROPPED_PARTICIPANT); + mp_EDP->unpairWriterProxy(partGUID, writer_guid, + reason == ParticipantDiscoveryInfo::DISCOVERY_STATUS::DROPPED_PARTICIPANT); - if (listener) - { - WriterDiscoveryInfo info(std::move(*wit)); - info.status = WriterDiscoveryInfo::REMOVED_WRITER; - listener->onWriterDiscovery(mp_RTPSParticipant->getUserRTPSParticipant(), std::move(info)); - } + if (listener) + { + WriterDiscoveryInfo info(std::move(*wit)); + info.status = WriterDiscoveryInfo::REMOVED_WRITER; + listener->onWriterDiscovery(mp_RTPSParticipant->getUserRTPSParticipant(), std::move(info)); } } } + } - if (mp_builtin->mp_WLP != nullptr) - { - this->mp_builtin->mp_WLP->removeRemoteEndpoints(pdata); - } + if (nullptr != mp_builtin->mp_WLP) + { + mp_builtin->mp_WLP->removeRemoteEndpoints(pdata); + } - if (mp_builtin->tlm_ != nullptr) - { - mp_builtin->tlm_->remove_remote_endpoints(pdata); - } + if (nullptr != mp_builtin->tlm_) + { + mp_builtin->tlm_->remove_remote_endpoints(pdata); + } - this->mp_EDP->removeRemoteEndpoints(pdata); - this->removeRemoteEndpoints(pdata); + mp_EDP->removeRemoteEndpoints(pdata); + removeRemoteEndpoints(pdata); #if HAVE_SECURITY - mp_builtin->mp_participantImpl->security_manager().remove_participant(*pdata); + mp_builtin->mp_participantImpl->security_manager().remove_participant(*pdata); #endif // if HAVE_SECURITY - builtin_endpoints_->remove_from_pdp_reader_history(pdata->m_key); + builtin_endpoints_->remove_from_pdp_reader_history(pdata->m_key); - auto listener = mp_RTPSParticipant->getListener(); - if (listener != nullptr) - { - std::lock_guard lock(callback_mtx_); - ParticipantDiscoveryInfo info(*pdata); - info.status = reason; - listener->onParticipantDiscovery(mp_RTPSParticipant->getUserRTPSParticipant(), std::move(info)); - } + if (listener) + { + std::lock_guard lock(callback_mtx_); + ParticipantDiscoveryInfo info(*pdata); + info.status = reason; + listener->onParticipantDiscovery(mp_RTPSParticipant->getUserRTPSParticipant(), std::move(info)); + } - this->mp_mutex->lock(); + { + std::lock_guard guardPDP(*mp_mutex); // Delete from sender resource list (TCP only) LocatorList_t remote_participant_locators; @@ -1122,7 +1157,7 @@ bool PDP::remove_remote_participant( pdata->m_writers->clear(); // Cancel lease event - if (pdata->lease_duration_event != nullptr) + if (nullptr != pdata->lease_duration_event) { pdata->lease_duration_event->cancel_timer(); } @@ -1130,13 +1165,7 @@ bool PDP::remove_remote_participant( // Return proxy object to pool pdata->clear(); participant_proxies_pool_.push_back(pdata); - - this->mp_mutex->unlock(); - - return true; } - - return false; } const BuiltinAttributes& PDP::builtin_attributes() const @@ -1147,9 +1176,9 @@ const BuiltinAttributes& PDP::builtin_attributes() const void PDP::assert_remote_participant_liveliness( const GuidPrefix_t& remote_guid) { - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); - for (ParticipantProxyData* it : this->participant_proxies_) + for (ParticipantProxyData* it : participant_proxies_) { if (it->m_guid.guidPrefix == remote_guid) { @@ -1164,7 +1193,7 @@ void PDP::assert_remote_participant_liveliness( CDRMessage_t PDP::get_participant_proxy_data_serialized( Endianness_t endian) { - std::lock_guard guardPDP(*this->mp_mutex); + std::lock_guard guardPDP(*mp_mutex); CDRMessage_t cdr_msg(RTPSMESSAGE_DEFAULT_SIZE); cdr_msg.msg_endian = endian; @@ -1198,7 +1227,7 @@ std::list& PDP::remote_server_a void PDP::check_remote_participant_liveliness( ParticipantProxyData* remote_participant) { - std::unique_lock guard(*this->mp_mutex); + std::unique_lock guard(*mp_mutex); if (remote_participant->should_check_lease_duration) { @@ -1278,7 +1307,7 @@ void PDP::check_and_notify_type_discovery( type_name.to_string(), type_id); } - if (dyn_type != nullptr) + if (nullptr != dyn_type) { types::DynamicPubSubType type_support(dyn_type); diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index d387d796248..898e89ca792 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -34,10 +35,12 @@ #include #include #include +#include #include #include "BlackboxTests.hpp" +#include "mock/BlackboxMockConsumer.h" #include "../types/HelloWorldPubSubTypes.h" #include "PubSubReader.hpp" #include "PubSubWriter.hpp" @@ -48,6 +51,48 @@ namespace dds { using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t; +/** + * This is a regression test for redmine issue #21060. + * + * It checks that when intraprocess delivery is set to full, there are no warnings in the desctructor of WriterProxy + * when deleting a participant. + */ +TEST(DDSBasic, WarningOnDelete) +{ + using namespace eprosima::fastrtps; + namespace dds = eprosima::fastdds::dds; + auto factory = dds::DomainParticipantFactory::get_instance(); + + // Set intraprocess delivery to full + LibrarySettingsAttributes library_settings; + library_settings = xmlparser::XMLProfileManager::library_settings(); + auto old_library_settings = library_settings; + library_settings.intraprocess_delivery = INTRAPROCESS_FULL; + xmlparser::XMLProfileManager::library_settings(library_settings); + + // Create participants + auto participant_1 = factory->create_participant(0, dds::PARTICIPANT_QOS_DEFAULT); + auto participant_2 = factory->create_participant(0, dds::PARTICIPANT_QOS_DEFAULT); + + /* Set up log */ + BlackboxMockConsumer* helper_consumer = new BlackboxMockConsumer(); + Log::ClearConsumers(); // Remove default consumers + Log::RegisterConsumer(std::unique_ptr(helper_consumer)); // Registering a consumer transfer ownership + // Filter specific message + dds::Log::SetErrorStringFilter(std::regex(".*~WriterProxy.*")); + dds::Log::SetVerbosity(dds::Log::Warning); + + factory->delete_participant(participant_1); + factory->delete_participant(participant_2); + + dds::Log::Flush(); + EXPECT_EQ(helper_consumer->ConsumedEntries().size(), 0u); + helper_consumer->clear_entries(); + + // Restore library settings + xmlparser::XMLProfileManager::library_settings(old_library_settings); +} + /** * This test checks whether it is safe to delete not enabled DDS entities * */