Skip to content

Commit

Permalink
Fix data race on PDP (#4220)
Browse files Browse the repository at this point in the history
* Add regression test

Signed-off-by: Juan Lopez Fernandez <juanlopez@eprosima.com>

* Make a copy of the participant proxy data

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Use the copy method to ensure all the attributes are being copied correctly

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Lookup the participant proxy data to pass the discovery-server tests

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Get rid of proxy reference in RemoteServerAttributes

Signed-off-by: Juan Lopez Fernandez <juanlopez@eprosima.com>

* Fix failing test

Signed-off-by: Juan Lopez Fernandez <juanlopez@eprosima.com>

* Uncrustify

Signed-off-by: Juan Lopez Fernandez <juanlopez@eprosima.com>

---------

Signed-off-by: Juan Lopez Fernandez <juanlopez@eprosima.com>
Signed-off-by: tempate <danieldiaz@eprosima.com>
Co-authored-by: Juan Lopez Fernandez <juanlopez@eprosima.com>
  • Loading branch information
Tempate and juanlofer-eprosima authored Feb 22, 2024
1 parent 8c3e3b3 commit 3053832
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 30 deletions.
7 changes: 3 additions & 4 deletions include/fastdds/rtps/attributes/ServerAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@ class RemoteServerAttributes
return guidPrefix == r.guidPrefix
&& metatrafficUnicastLocatorList == r.metatrafficUnicastLocatorList
&& metatrafficMulticastLocatorList == r.metatrafficMulticastLocatorList;
// && proxy == r.proxy;
}

RTPS_DllAPI void clear()
{
guidPrefix = fastrtps::rtps::GuidPrefix_t::unknown();
metatrafficUnicastLocatorList.clear();
metatrafficMulticastLocatorList.clear();
proxy = nullptr;
is_connected = false;
}

RTPS_DllAPI fastrtps::rtps::GUID_t GetParticipant() const;
Expand Down Expand Up @@ -100,8 +99,8 @@ class RemoteServerAttributes
//!Guid prefix
fastrtps::rtps::GuidPrefix_t guidPrefix;

// Live participant proxy reference
const fastrtps::rtps::ParticipantProxyData* proxy{};
// Whether connection has been established
bool is_connected = false;

// Check if there are specific transport locators associated
// the template parameter is the locator kind (e.g. LOCATOR_KIND_UDPv4)
Expand Down
20 changes: 9 additions & 11 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
return true;
}

// the ParticipantProxyData* pdata must be the one kept in PDP database
void PDPClient::assignRemoteEndpoints(
ParticipantProxyData* pdata)
{
Expand All @@ -488,8 +487,7 @@ void PDPClient::assignRemoteEndpoints(
{
if (data_matches_with_prefix(svr.guidPrefix, *pdata))
{
std::unique_lock<std::recursive_mutex> lock(*getMutex());
svr.proxy = pdata;
svr.is_connected = true;
}
}
}
Expand Down Expand Up @@ -517,11 +515,11 @@ void PDPClient::notifyAboveRemoteEndpoints(
{
if (data_matches_with_prefix(svr.guidPrefix, pdata))
{
if (nullptr == svr.proxy)
if (!svr.is_connected && nullptr != get_participant_proxy_data(svr.guidPrefix))
{
//! try to retrieve the participant proxy data from an unmangled prefix in case
//! we could not fill svr.proxy in assignRemoteEndpoints()
svr.proxy = get_participant_proxy_data(svr.guidPrefix);
//! mark proxy as connected from an unmangled prefix in case
//! it could not be done in assignRemoteEndpoints()
svr.is_connected = true;
}

match_pdp_reader_nts_(svr, pdata.m_guid.guidPrefix);
Expand Down Expand Up @@ -596,7 +594,7 @@ void PDPClient::removeRemoteEndpoints(
if (svr.guidPrefix == pdata->m_guid.guidPrefix)
{
std::unique_lock<std::recursive_mutex> lock(*getMutex());
svr.proxy = nullptr; // reasign when we receive again server DATA(p)
svr.is_connected = false;
is_server = true;
mp_sync->restart_timer(); // enable announcement and sync mechanism till this server reappears
}
Expand Down Expand Up @@ -768,11 +766,11 @@ void PDPClient::announceParticipantState(
for (auto& svr : mp_builtin->m_DiscoveryServers)
{
// if we are matched to a server report demise
if (svr.proxy != nullptr)
if (svr.is_connected)
{
//locators.push_back(svr.metatrafficMulticastLocatorList);
locators.push_back(svr.metatrafficUnicastLocatorList);
remote_readers.emplace_back(svr.proxy->m_guid.guidPrefix,
remote_readers.emplace_back(svr.guidPrefix,
endpoints->reader.reader_->getGuid().entityId);
}
}
Expand Down Expand Up @@ -805,7 +803,7 @@ void PDPClient::announceParticipantState(
{
// non-pinging announcements like lease duration ones must be
// broadcast to all servers
if (svr.proxy == nullptr || !_serverPing)
if (!svr.is_connected || !_serverPing)
{
locators.push_back(svr.metatrafficMulticastLocatorList);
locators.push_back(svr.metatrafficUnicastLocatorList);
Expand Down
22 changes: 17 additions & 5 deletions src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,14 @@ void PDPListener::process_alive_data(
// Create a new one when not found
old_data = parent_pdp_->createParticipantProxyData(new_data, writer_guid);

reader->getMutex().unlock();
lock.unlock();

if (old_data != nullptr)
{
// Copy proxy to be passed forward before releasing PDP mutex
ParticipantProxyData old_data_copy(*old_data);

reader->getMutex().unlock();
lock.unlock();

// Assigning remote endpoints implies sending a DATA(p) to all matched and fixed readers, since
// StatelessWriter::matched_reader_add marks the entire history as unsent if the added reader's
// durability is bigger or equal to TRANSIENT_LOCAL_DURABILITY_QOS (TRANSIENT_LOCAL or TRANSIENT),
Expand All @@ -209,13 +212,19 @@ void PDPListener::process_alive_data(
// participant is discovered in the middle of BuiltinProtocols::initBuiltinProtocols, which will
// create the first DATA(p) upon finishing, thus triggering the sent to all fixed and matched
// readers anyways.
parent_pdp_->assignRemoteEndpoints(old_data);
parent_pdp_->assignRemoteEndpoints(&old_data_copy);
}
else
{
reader->getMutex().unlock();
lock.unlock();
}
}
else
{
old_data->updateData(new_data);
old_data->isAlive = true;

reader->getMutex().unlock();

EPROSIMA_LOG_INFO(RTPS_PDP_DISCOVERY, "Update participant "
Expand All @@ -228,6 +237,9 @@ void PDPListener::process_alive_data(
parent_pdp_->mp_EDP->assignRemoteEndpoints(*old_data, true);
}

// Copy proxy to be passed forward before releasing PDP mutex
ParticipantProxyData old_data_copy(*old_data);

lock.unlock();

RTPSParticipantListener* listener = parent_pdp_->getRTPSParticipant()->getListener();
Expand All @@ -237,7 +249,7 @@ void PDPListener::process_alive_data(

{
std::lock_guard<std::mutex> cb_lock(parent_pdp_->callback_mtx_);
ParticipantDiscoveryInfo info(*old_data);
ParticipantDiscoveryInfo info(old_data_copy);
info.status = ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT;

listener->onParticipantDiscovery(
Expand Down
157 changes: 147 additions & 10 deletions test/blackbox/common/DDSBlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <future>
#include <memory>
#include <mutex>
#include <string>
Expand All @@ -36,9 +37,11 @@
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/DomainParticipantListener.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/rtps/builtin/data/ParticipantProxyData.h>
#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/participant/ParticipantDiscoveryInfo.h>
#include <fastdds/rtps/transport/test_UDPv4TransportDescriptor.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>
#include <rtps/transport/test_UDPv4Transport.h>
#include <utils/SystemInfo.hpp>

Expand Down Expand Up @@ -438,13 +441,13 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData)
{
delete remote_participant_info;
}
remote_participant_info = new ParticipantDiscoveryInfo(info);
remote_participant_info = new ParticipantProxyData(info.info);
found_->store(true);
cv_->notify_one();
}
}

ParticipantDiscoveryInfo* remote_participant_info;
ParticipantProxyData* remote_participant_info;

private:

Expand Down Expand Up @@ -496,21 +499,21 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData)
participant_found.store(false);

// Prevent assertion on spurious discovery of a participant from elsewhere
if (part_1->guid() == listener.remote_participant_info->info.m_guid)
if (part_1->guid() == listener.remote_participant_info->m_guid)
{
// Check that all three properties are present in the ParticipantProxyData, and that their value
// is that of the property in part_1 (the original property value)
for (auto physical_property_name : physical_property_names)
{
// Find property in ParticipantProxyData
auto received_property = std::find_if(
listener.remote_participant_info->info.m_properties.begin(),
listener.remote_participant_info->info.m_properties.end(),
listener.remote_participant_info->m_properties.begin(),
listener.remote_participant_info->m_properties.end(),
[&](const ParameterProperty_t& property)
{
return property.first() == physical_property_name;
});
ASSERT_NE(received_property, listener.remote_participant_info->info.m_properties.end());
ASSERT_NE(received_property, listener.remote_participant_info->m_properties.end());

// Find property in first participant
auto part_1_property = PropertyPolicyHelper::find_property(
Expand Down Expand Up @@ -556,20 +559,20 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData)
participant_found.store(false);

// Prevent assertion on spurious discovery of a participant from elsewhere
if (part_1->guid() == listener.remote_participant_info->info.m_guid)
if (part_1->guid() == listener.remote_participant_info->m_guid)
{
// Check that none of the three properties are present in the ParticipantProxyData.
for (auto physical_property_name : physical_property_names)
{
// Look for property in ParticipantProxyData
auto received_property = std::find_if(
listener.remote_participant_info->info.m_properties.begin(),
listener.remote_participant_info->info.m_properties.end(),
listener.remote_participant_info->m_properties.begin(),
listener.remote_participant_info->m_properties.end(),
[&](const ParameterProperty_t& property)
{
return property.first() == physical_property_name;
});
ASSERT_EQ(received_property, listener.remote_participant_info->info.m_properties.end());
ASSERT_EQ(received_property, listener.remote_participant_info->m_properties.end());
}
break;
}
Expand Down Expand Up @@ -1638,3 +1641,137 @@ TEST(DDSDiscovery, WaitSetMatchedStatus)
test_DDSDiscovery_WaitSetMatchedStatus(false);
test_DDSDiscovery_WaitSetMatchedStatus(true);
}

// Regression test for redmine issue 20409
TEST(DDSDiscovery, DataracePDP)
{
using namespace eprosima;
using namespace eprosima::fastdds::dds;
using namespace eprosima::fastdds::rtps;

class CustomDomainParticipantListener : public DomainParticipantListener
{
public:

CustomDomainParticipantListener()
: DomainParticipantListener()
, discovery_future(discovery_promise.get_future())
, destruction_future(destruction_promise.get_future())
, undiscovery_future(undiscovery_promise.get_future())
{
}

void on_participant_discovery(
DomainParticipant* /*participant*/,
eprosima::fastrtps::rtps::ParticipantDiscoveryInfo&& info) override
{
if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
{
try
{
discovery_promise.set_value();
}
catch (std::future_error&)
{
// do nothing
}
destruction_future.wait();
}
else if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT ||
info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
{
try
{
undiscovery_promise.set_value();
}
catch (std::future_error&)
{
// do nothing
}
}
}

std::promise<void> discovery_promise;
std::future<void> discovery_future;

std::promise<void> destruction_promise;
std::future<void> destruction_future;

std::promise<void> undiscovery_promise;
std::future<void> undiscovery_future;
};

// Disable intraprocess
auto settings = fastrtps::xmlparser::XMLProfileManager::library_settings();
auto prev_intraprocess_delivery = settings.intraprocess_delivery;
settings.intraprocess_delivery = fastrtps::INTRAPROCESS_OFF;
fastrtps::xmlparser::XMLProfileManager::library_settings(settings);

// DDS Domain Id
const unsigned int DOMAIN_ID = (uint32_t)GET_PID() % 230;

// This is a non deterministic test, so we will run it several times to increase probability of data race detection
// if it exists.
const unsigned int N_ITER = 10;
unsigned int iter_idx = 0;
while (iter_idx < N_ITER)
{
iter_idx++;

DomainParticipantQos qos;
qos.transport().use_builtin_transports = false;
auto udp_transport = std::make_shared<UDPv4TransportDescriptor>();
qos.transport().user_transports.push_back(udp_transport);

// Create discoverer participant (the one where a data race on PDP might occur)
CustomDomainParticipantListener participant_listener;
DomainParticipant* participant = DomainParticipantFactory::get_instance()->create_participant(DOMAIN_ID, qos,
&participant_listener);

DomainParticipantQos aux_qos;
aux_qos.transport().use_builtin_transports = false;
auto aux_udp_transport = std::make_shared<test_UDPv4TransportDescriptor>();
aux_qos.transport().user_transports.push_back(aux_udp_transport);

// Create auxiliary participant to be discovered
aux_qos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(1, 0);
aux_qos.wire_protocol().builtin.discovery_config.leaseDuration = Duration_t(1, 10);
DomainParticipant* aux_participant = DomainParticipantFactory::get_instance()->create_participant(DOMAIN_ID,
aux_qos);

// Wait for discovery
participant_listener.discovery_future.wait();

// Shutdown auxiliary participant's network, so it will be removed after lease duration
test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = true;
DomainParticipantFactory::get_instance()->delete_participant(aux_participant);
std::this_thread::sleep_for(std::chrono::milliseconds(1500)); // Wait for longer than lease duration

try
{
// NOTE: at this point, the discoverer participant is stuck in a UDP discovery thread (unicast or multicast).
// At the same time, the events thread is stuck at PDP::remove_remote_participant (lease duration expired
// and so the discovered participant is removed), trying to acquire the callback mutex taken by the
// discovery thread.

// If we now signal the discovery thread to continue, a data race might occur if the received
// ParticipantProxyData, which is further being processed in the discovery thread (assignRemoteEndpoints),
// gets deleted/cleared by the events thread at the same time.
// Note that a similar situation might arise in other scenarios, such as on the concurrent reception of a
// data P and data uP each on a different thread (unicast and multicast), however these are harder to
// reproduce in a regression test.
participant_listener.destruction_promise.set_value();
}
catch (std::future_error&)
{
// do nothing
}

participant_listener.undiscovery_future.wait();
DomainParticipantFactory::get_instance()->delete_participant(participant);
}

// Reestablish previous intraprocess configuration
settings.intraprocess_delivery = prev_intraprocess_delivery;
fastrtps::xmlparser::XMLProfileManager::library_settings(settings);
}

0 comments on commit 3053832

Please sign in to comment.