Skip to content

Commit

Permalink
Fix dataraces when creating DataWriters (#3008)
Browse files Browse the repository at this point in the history
* Refs #15905: Declare the PublishMode running flag as atomic

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #15905: Add RTPS regression test

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #15905: Add DomainParticipantImpl::create_instance_handle data race regression test

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #15905: Set DomainParticipantImpl::next_instance_id_ as atomic

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #15905: Apply suggestions

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>
(cherry picked from commit 4391864)
  • Loading branch information
EduPonz authored and mergify[bot] committed Oct 14, 2022
1 parent 4b77c39 commit 861ef89
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 13 deletions.
8 changes: 4 additions & 4 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2254,12 +2254,12 @@ void DomainParticipantImpl::create_instance_handle(
{
using eprosima::fastrtps::rtps::octet;

++next_instance_id_;
uint32_t id = ++next_instance_id_;
handle = guid_;
handle.value[15] = 0x01; // Vendor specific;
handle.value[14] = static_cast<octet>(next_instance_id_ & 0xFF);
handle.value[13] = static_cast<octet>((next_instance_id_ >> 8) & 0xFF);
handle.value[12] = static_cast<octet>((next_instance_id_ >> 16) & 0xFF);
handle.value[14] = static_cast<octet>(id & 0xFF);
handle.value[13] = static_cast<octet>((id >> 8) & 0xFF);
handle.value[12] = static_cast<octet>((id >> 16) & 0xFF);
}

DomainParticipantListener* DomainParticipantImpl::get_listener_for(
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ class DomainParticipantImpl
fastrtps::rtps::GUID_t guid_;

//!For instance handle creation
uint32_t next_instance_id_;
std::atomic<uint32_t> next_instance_id_;

//!Participant Qos
DomainParticipantQos qos_;
Expand Down
17 changes: 9 additions & 8 deletions src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/writer/RTPSWriter.h>

#include <map>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <chrono>
#include <condition_variable>
#include <map>
#include <mutex>
#include <thread>
#include <unordered_map>

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -235,7 +236,7 @@ struct FlowControllerAsyncPublishMode

std::thread thread;

bool running = false;
std::atomic_bool running {false};

std::condition_variable cv;

Expand Down Expand Up @@ -1037,10 +1038,10 @@ class FlowControllerImpl : public FlowController
typename std::enable_if<!std::is_same<FlowControllerPureSyncPublishMode, PubMode>::value, void>::type
initialize_async_thread()
{
if (false == async_mode.running)
bool expected = false;
if (async_mode.running.compare_exchange_strong(expected, true))
{
// Code for initializing the asynchronous thread.
async_mode.running = true;
async_mode.thread = std::thread(&FlowControllerImpl::run, this);
}
}
Expand Down
75 changes: 75 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <condition_variable>
#include <mutex>
#include <thread>

#include <gtest/gtest.h>

#include <fastdds/dds/domain/DomainParticipant.hpp>
Expand Down Expand Up @@ -109,6 +113,77 @@ TEST(DDSBasic, DeleteDisabledEntities)
factory->delete_participant(participant);
}

/**
* This test checks a race condition when calling DomainParticipantImpl::create_instance_handle()
* from different threads simultaneously. This was resulting in a `free(): invalid pointer` crash
* when deleting publishers created this way, as there was a clash in their respective instance
* handles. Not only did the crash occur, but it was also reported by TSan.
*
* The test spawns 200 threads, each creating a publisher and then waiting on a command from the
* main thread to delete them (so all of them at deleted at the same time).
*/
TEST(DDSBasic, MultithreadedPublisherCreation)
{
/* Get factory */
DomainParticipantFactory* factory = DomainParticipantFactory::get_instance();
ASSERT_NE(nullptr, factory);

/* Create participant */
DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT);
ASSERT_NE(nullptr, participant);

/* Test synchronization variables */
std::mutex finish_mtx;
std::condition_variable finish_cv;
bool should_finish = false;

/* Function to create publishers, deleting them on command */
auto thread_run =
[participant, &finish_mtx, &finish_cv, &should_finish]()
{
/* Create publisher */
Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT);
ASSERT_NE(nullptr, publisher);

{
/* Wait for test completion request */
std::unique_lock<std::mutex> lock(finish_mtx);
finish_cv.wait(lock, [&should_finish]()
{
return should_finish;
});
}

/* Delete publisher */
ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_publisher(publisher));
};

{
/* Create threads */
std::vector<std::thread> threads;
for (size_t i = 0; i < 200; i++)
{
threads.push_back(std::thread(thread_run));
}

/* Command threads to delete their publishers */
{
std::lock_guard<std::mutex> guard(finish_mtx);
should_finish = true;
finish_cv.notify_all();
}

/* Wait for all threads to join */
for (std::thread& thr : threads)
{
thr.join();
}
}

/* Clean up */
ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant));
}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
111 changes: 111 additions & 0 deletions test/blackbox/common/RTPSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
#include "BlackboxTests.hpp"

#include <chrono>
#include <memory>
#include <thread>

#include <gtest/gtest.h>

#include <fastdds/rtps/flowcontrol/FlowControllerDescriptor.hpp>
#include <fastrtps/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastrtps/rtps/participant/RTPSParticipant.h>
#include <fastrtps/rtps/RTPSDomain.h>
Expand Down Expand Up @@ -663,6 +665,115 @@ TEST(RTPS, RemoveDisabledParticipant)
ASSERT_TRUE(RTPSDomain::removeRTPSParticipant(rtps_participant));
}


/**
* This test checks a race condition on initializing a writer's flow controller when creating
* several RTPSWriters in parallel: https://eprosima.easyredmine.com/issues/15905
*
* The test creates a participant with 4 different flow controllers, and then creates 200 threads
* which each create an RTPSWriter which uses one of the participant's flow controllers.
* The threads wait for a command coming from the main thread to delete the writers. This is to
* ensure that all threads are initialized prior to any of them starts deleting.
*/
TEST(RTPS, MultithreadedWriterCreation)
{
/* Flow controller builder */
using FlowControllerDescriptor_t = eprosima::fastdds::rtps::FlowControllerDescriptor;
using SchedulerPolicy_t = eprosima::fastdds::rtps::FlowControllerSchedulerPolicy;
auto create_flow_controller =
[](const char* name, SchedulerPolicy_t scheduler,
int32_t max_bytes_per_period,
uint64_t period_ms) -> std::shared_ptr<FlowControllerDescriptor_t>
{
std::shared_ptr<FlowControllerDescriptor_t> descriptor = std::make_shared<FlowControllerDescriptor_t>();
descriptor->name = name;
descriptor->scheduler = scheduler;
descriptor->max_bytes_per_period = max_bytes_per_period;
descriptor->period_ms = period_ms;
return descriptor;
};

/* Create participant */
RTPSParticipantAttributes rtps_attr;
// Create one flow controller of each kind to make things interesting
const char* flow_controller_name = "fifo_controller";
rtps_attr.flow_controllers.push_back(create_flow_controller("high_priority_controller",
SchedulerPolicy_t::HIGH_PRIORITY, 200, 10));
rtps_attr.flow_controllers.push_back(create_flow_controller("priority_with_reservation_controller",
SchedulerPolicy_t::PRIORITY_WITH_RESERVATION, 200, 10));
rtps_attr.flow_controllers.push_back(create_flow_controller("round_robin_controller",
SchedulerPolicy_t::ROUND_ROBIN, 200, 10));
rtps_attr.flow_controllers.push_back(create_flow_controller(flow_controller_name, SchedulerPolicy_t::FIFO, 200,
10));
RTPSParticipant* rtps_participant = RTPSDomain::createParticipant(
(uint32_t)GET_PID() % 230, false, rtps_attr, nullptr);

/* Test sync variables */
std::mutex finish_mtx;
std::condition_variable finish_cv;
bool should_finish = false;

/* Lambda function to create a writer with a flow controller, and to destroy it at command */
auto thread_run = [rtps_participant, flow_controller_name, &finish_mtx, &finish_cv, &should_finish]()
{
/* Create writer history */
eprosima::fastrtps::rtps::HistoryAttributes hattr;
eprosima::fastrtps::rtps::WriterHistory* history = new eprosima::fastrtps::rtps::WriterHistory(hattr);
eprosima::fastrtps::TopicAttributes topic_attr;

/* Create writer with a flow controller */
eprosima::fastrtps::rtps::WriterAttributes writer_attr;
writer_attr.mode = RTPSWriterPublishMode::ASYNCHRONOUS_WRITER;
writer_attr.flow_controller_name = flow_controller_name;
eprosima::fastrtps::rtps::RTPSWriter* writer = eprosima::fastrtps::rtps::RTPSDomain::createRTPSWriter(
rtps_participant, writer_attr, history, nullptr);

/* Register writer in participant */
eprosima::fastrtps::WriterQos writer_qos;
ASSERT_EQ(rtps_participant->registerWriter(writer, topic_attr, writer_qos), true);

{
/* Wait for test completion request */
std::unique_lock<std::mutex> lock(finish_mtx);
finish_cv.wait(lock, [&should_finish]()
{
return should_finish;
});
}

/* Remove writer */
ASSERT_TRUE(RTPSDomain::removeRTPSWriter(writer));
};

{
/* Create test threads */
constexpr size_t num_threads = 200;
std::vector<std::thread> threads;
for (size_t i = 0; i < num_threads; ++i)
{
threads.push_back(std::thread(thread_run));
}

/* Once all threads are created, we can start deleting them */
{
std::lock_guard<std::mutex> guard(finish_mtx);
should_finish = true;
finish_cv.notify_all();
}

/* Wait until are threads join */
for (std::thread& thr : threads)
{
thr.join();
}
}

/* Clean up */
ASSERT_TRUE(RTPSDomain::removeRTPSParticipant(rtps_participant));
ASSERT_NE(nullptr, rtps_participant);
RTPSDomain::stopAll();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit 861ef89

Please sign in to comment.