diff --git a/include/fastdds/rtps/attributes/ServerAttributes.h b/include/fastdds/rtps/attributes/ServerAttributes.h index 369fee644f0..9f5910f3795 100644 --- a/include/fastdds/rtps/attributes/ServerAttributes.h +++ b/include/fastdds/rtps/attributes/ServerAttributes.h @@ -54,7 +54,6 @@ class RemoteServerAttributes return guidPrefix == r.guidPrefix && metatrafficUnicastLocatorList == r.metatrafficUnicastLocatorList && metatrafficMulticastLocatorList == r.metatrafficMulticastLocatorList; - // && proxy == r.proxy; } RTPS_DllAPI void clear() @@ -62,7 +61,7 @@ class RemoteServerAttributes guidPrefix = fastrtps::rtps::GuidPrefix_t::unknown(); metatrafficUnicastLocatorList.clear(); metatrafficMulticastLocatorList.clear(); - proxy = nullptr; + is_connected = false; } RTPS_DllAPI fastrtps::rtps::GUID_t GetParticipant() const; @@ -100,8 +99,8 @@ class RemoteServerAttributes //!Guid prefix fastrtps::rtps::GuidPrefix_t guidPrefix; - // Live participant proxy reference - const fastrtps::rtps::ParticipantProxyData* proxy{}; + // Whether connection has been established + bool is_connected = false; // Check if there are specific transport locators associated // the template parameter is the locator kind (e.g. LOCATOR_KIND_UDPv4) diff --git a/include/fastdds/rtps/builtin/data/ParticipantProxyData.h b/include/fastdds/rtps/builtin/data/ParticipantProxyData.h index 5602944ed2d..b3d1462875d 100644 --- a/include/fastdds/rtps/builtin/data/ParticipantProxyData.h +++ b/include/fastdds/rtps/builtin/data/ParticipantProxyData.h @@ -63,13 +63,13 @@ class ParticipantProxyData { public: - ParticipantProxyData( + RTPS_DllAPI ParticipantProxyData( const RTPSParticipantAllocationAttributes& allocation); - ParticipantProxyData( + RTPS_DllAPI ParticipantProxyData( const ParticipantProxyData& pdata); - virtual ~ParticipantProxyData(); + RTPS_DllAPI virtual ~ParticipantProxyData(); //!Protocol version ProtocolVersion_t m_protocolVersion; diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 677ab16b9d1..36fba93eb07 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -459,7 +459,6 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( return true; } -// the ParticipantProxyData* pdata must be the one kept in PDP database void PDPClient::assignRemoteEndpoints( ParticipantProxyData* pdata) { @@ -475,8 +474,7 @@ void PDPClient::assignRemoteEndpoints( { if (data_matches_with_prefix(svr.guidPrefix, *pdata)) { - std::unique_lock lock(*getMutex()); - svr.proxy = pdata; + svr.is_connected = true; } } } @@ -504,11 +502,11 @@ void PDPClient::notifyAboveRemoteEndpoints( { if (data_matches_with_prefix(svr.guidPrefix, pdata)) { - if (nullptr == svr.proxy) + if (!svr.is_connected && nullptr != get_participant_proxy_data(svr.guidPrefix)) { - //! try to retrieve the participant proxy data from an unmangled prefix in case - //! we could not fill svr.proxy in assignRemoteEndpoints() - svr.proxy = get_participant_proxy_data(svr.guidPrefix); + //! mark proxy as connected from an unmangled prefix in case + //! it could not be done in assignRemoteEndpoints() + svr.is_connected = true; } match_pdp_reader_nts_(svr, pdata.m_guid.guidPrefix); @@ -583,7 +581,7 @@ void PDPClient::removeRemoteEndpoints( if (svr.guidPrefix == pdata->m_guid.guidPrefix) { std::unique_lock lock(*getMutex()); - svr.proxy = nullptr; // reasign when we receive again server DATA(p) + svr.is_connected = false; is_server = true; mp_sync->restart_timer(); // enable announcement and sync mechanism till this server reappears } @@ -755,11 +753,11 @@ void PDPClient::announceParticipantState( for (auto& svr : mp_builtin->m_DiscoveryServers) { // if we are matched to a server report demise - if (svr.proxy != nullptr) + if (svr.is_connected) { //locators.push_back(svr.metatrafficMulticastLocatorList); locators.push_back(svr.metatrafficUnicastLocatorList); - remote_readers.emplace_back(svr.proxy->m_guid.guidPrefix, + remote_readers.emplace_back(svr.guidPrefix, endpoints->reader.reader_->getGuid().entityId); } } @@ -792,7 +790,7 @@ void PDPClient::announceParticipantState( { // non-pinging announcements like lease duration ones must be // broadcast to all servers - if (svr.proxy == nullptr || !_serverPing) + if (!svr.is_connected || !_serverPing) { locators.push_back(svr.metatrafficMulticastLocatorList); locators.push_back(svr.metatrafficUnicastLocatorList); diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp index 7fd532f6bc5..6eecef90021 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp @@ -153,11 +153,14 @@ void PDPListener::process_alive_data( // Create a new one when not found old_data = parent_pdp_->createParticipantProxyData(new_data, writer_guid); - reader->getMutex().unlock(); - lock.unlock(); - if (old_data != nullptr) { + // Copy proxy to be passed forward before releasing PDP mutex + ParticipantProxyData old_data_copy(*old_data); + + reader->getMutex().unlock(); + lock.unlock(); + // Assigning remote endpoints implies sending a DATA(p) to all matched and fixed readers, since // StatelessWriter::matched_reader_add marks the entire history as unsent if the added reader's // durability is bigger or equal to TRANSIENT_LOCAL_DURABILITY_QOS (TRANSIENT_LOCAL or TRANSIENT), @@ -168,13 +171,19 @@ void PDPListener::process_alive_data( // participant is discovered in the middle of BuiltinProtocols::initBuiltinProtocols, which will // create the first DATA(p) upon finishing, thus triggering the sent to all fixed and matched // readers anyways. - parent_pdp_->assignRemoteEndpoints(old_data); + parent_pdp_->assignRemoteEndpoints(&old_data_copy); + } + else + { + reader->getMutex().unlock(); + lock.unlock(); } } else { old_data->updateData(new_data); old_data->isAlive = true; + reader->getMutex().unlock(); logInfo(RTPS_PDP_DISCOVERY, "Update participant " @@ -187,6 +196,9 @@ void PDPListener::process_alive_data( parent_pdp_->mp_EDP->assignRemoteEndpoints(*old_data, true); } + // Copy proxy to be passed forward before releasing PDP mutex + ParticipantProxyData old_data_copy(*old_data); + lock.unlock(); RTPSParticipantListener* listener = parent_pdp_->getRTPSParticipant()->getListener(); @@ -194,7 +206,7 @@ void PDPListener::process_alive_data( { { std::lock_guard cb_lock(parent_pdp_->callback_mtx_); - ParticipantDiscoveryInfo info(*old_data); + ParticipantDiscoveryInfo info(old_data_copy); info.status = ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT; listener->onParticipantDiscovery( diff --git a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp index eb7988706f6..507d05aeef4 100644 --- a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -36,9 +37,11 @@ #include #include #include +#include #include #include #include +#include #include #include @@ -423,17 +426,21 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData) ParticipantDiscoveryInfo&& info) { std::unique_lock lck(*mtx_); - static_cast(participant); - if (nullptr != remote_participant_info) + if (info.status == + eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERY_STATUS::DISCOVERED_PARTICIPANT) { - delete remote_participant_info; + static_cast(participant); + if (nullptr != remote_participant_info) + { + delete remote_participant_info; + } + remote_participant_info = new ParticipantProxyData(info.info); + found_->store(true); + cv_->notify_one(); } - remote_participant_info = new ParticipantDiscoveryInfo(info); - found_->store(true); - cv_->notify_one(); } - ParticipantDiscoveryInfo* remote_participant_info; + ParticipantProxyData* remote_participant_info; private: @@ -485,7 +492,7 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData) participant_found.store(false); // Prevent assertion on spurious discovery of a participant from elsewhere - if (part_1->guid() == listener.remote_participant_info->info.m_guid) + if (part_1->guid() == listener.remote_participant_info->m_guid) { // Check that all three properties are present in the ParticipantProxyData, and that their value // is that of the property in part_1 (the original property value) @@ -493,13 +500,13 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData) { // Find property in ParticipantProxyData auto received_property = std::find_if( - listener.remote_participant_info->info.m_properties.begin(), - listener.remote_participant_info->info.m_properties.end(), + listener.remote_participant_info->m_properties.begin(), + listener.remote_participant_info->m_properties.end(), [&](const ParameterProperty_t& property) { return property.first() == physical_property_name; }); - ASSERT_NE(received_property, listener.remote_participant_info->info.m_properties.end()); + ASSERT_NE(received_property, listener.remote_participant_info->m_properties.end()); // Find property in first participant auto part_1_property = PropertyPolicyHelper::find_property( @@ -545,20 +552,20 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData) participant_found.store(false); // Prevent assertion on spurious discovery of a participant from elsewhere - if (part_1->guid() == listener.remote_participant_info->info.m_guid) + if (part_1->guid() == listener.remote_participant_info->m_guid) { // Check that none of the three properties are present in the ParticipantProxyData. for (auto physical_property_name : physical_property_names) { // Look for property in ParticipantProxyData auto received_property = std::find_if( - listener.remote_participant_info->info.m_properties.begin(), - listener.remote_participant_info->info.m_properties.end(), + listener.remote_participant_info->m_properties.begin(), + listener.remote_participant_info->m_properties.end(), [&](const ParameterProperty_t& property) { return property.first() == physical_property_name; }); - ASSERT_EQ(received_property, listener.remote_participant_info->info.m_properties.end()); + ASSERT_EQ(received_property, listener.remote_participant_info->m_properties.end()); } break; } @@ -567,3 +574,137 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData) DomainParticipantFactory::get_instance()->delete_participant(part_1); DomainParticipantFactory::get_instance()->delete_participant(part_2); } + +// Regression test for redmine issue 20409 +TEST(DDSDiscovery, DataracePDP) +{ + using namespace eprosima; + using namespace eprosima::fastdds::dds; + using namespace eprosima::fastdds::rtps; + + class CustomDomainParticipantListener : public DomainParticipantListener + { + public: + + CustomDomainParticipantListener() + : DomainParticipantListener() + , discovery_future(discovery_promise.get_future()) + , destruction_future(destruction_promise.get_future()) + , undiscovery_future(undiscovery_promise.get_future()) + { + } + + void on_participant_discovery( + DomainParticipant* /*participant*/, + eprosima::fastrtps::rtps::ParticipantDiscoveryInfo&& info) override + { + if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT) + { + try + { + discovery_promise.set_value(); + } + catch (std::future_error&) + { + // do nothing + } + destruction_future.wait(); + } + else if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT || + info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT) + { + try + { + undiscovery_promise.set_value(); + } + catch (std::future_error&) + { + // do nothing + } + } + } + + std::promise discovery_promise; + std::future discovery_future; + + std::promise destruction_promise; + std::future destruction_future; + + std::promise undiscovery_promise; + std::future undiscovery_future; + }; + + // Disable intraprocess + auto settings = fastrtps::xmlparser::XMLProfileManager::library_settings(); + auto prev_intraprocess_delivery = settings.intraprocess_delivery; + settings.intraprocess_delivery = fastrtps::INTRAPROCESS_OFF; + fastrtps::xmlparser::XMLProfileManager::library_settings(settings); + + // DDS Domain Id + const unsigned int DOMAIN_ID = (uint32_t)GET_PID() % 230; + + // This is a non deterministic test, so we will run it several times to increase probability of data race detection + // if it exists. + const unsigned int N_ITER = 10; + unsigned int iter_idx = 0; + while (iter_idx < N_ITER) + { + iter_idx++; + + DomainParticipantQos qos; + qos.transport().use_builtin_transports = false; + auto udp_transport = std::make_shared(); + qos.transport().user_transports.push_back(udp_transport); + + // Create discoverer participant (the one where a data race on PDP might occur) + CustomDomainParticipantListener participant_listener; + DomainParticipant* participant = DomainParticipantFactory::get_instance()->create_participant(DOMAIN_ID, qos, + &participant_listener); + + DomainParticipantQos aux_qos; + aux_qos.transport().use_builtin_transports = false; + auto aux_udp_transport = std::make_shared(); + aux_qos.transport().user_transports.push_back(aux_udp_transport); + + // Create auxiliary participant to be discovered + aux_qos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(1, 0); + aux_qos.wire_protocol().builtin.discovery_config.leaseDuration = Duration_t(1, 10); + DomainParticipant* aux_participant = DomainParticipantFactory::get_instance()->create_participant(DOMAIN_ID, + aux_qos); + + // Wait for discovery + participant_listener.discovery_future.wait(); + + // Shutdown auxiliary participant's network, so it will be removed after lease duration + test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = true; + DomainParticipantFactory::get_instance()->delete_participant(aux_participant); + std::this_thread::sleep_for(std::chrono::milliseconds(1500)); // Wait for longer than lease duration + + try + { + // NOTE: at this point, the discoverer participant is stuck in a UDP discovery thread (unicast or multicast). + // At the same time, the events thread is stuck at PDP::remove_remote_participant (lease duration expired + // and so the discovered participant is removed), trying to acquire the callback mutex taken by the + // discovery thread. + + // If we now signal the discovery thread to continue, a data race might occur if the received + // ParticipantProxyData, which is further being processed in the discovery thread (assignRemoteEndpoints), + // gets deleted/cleared by the events thread at the same time. + // Note that a similar situation might arise in other scenarios, such as on the concurrent reception of a + // data P and data uP each on a different thread (unicast and multicast), however these are harder to + // reproduce in a regression test. + participant_listener.destruction_promise.set_value(); + } + catch (std::future_error&) + { + // do nothing + } + + participant_listener.undiscovery_future.wait(); + DomainParticipantFactory::get_instance()->delete_participant(participant); + } + + // Reestablish previous intraprocess configuration + settings.intraprocess_delivery = prev_intraprocess_delivery; + fastrtps::xmlparser::XMLProfileManager::library_settings(settings); +}