From 016c2637895f9ec717980fc44f85c368523a6b15 Mon Sep 17 00:00:00 2001 From: Eduardo Ponz Date: Mon, 10 Oct 2022 15:52:19 +0200 Subject: [PATCH 1/5] Refs #15905: Declare the PublishMode running flag as atomic Signed-off-by: Eduardo Ponz --- src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp index 8011f6eb904..97f94ca62af 100644 --- a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp +++ b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp @@ -5,13 +5,14 @@ #include #include -#include -#include -#include -#include +#include #include -#include #include +#include +#include +#include +#include +#include namespace eprosima { namespace fastdds { @@ -239,7 +240,7 @@ struct FlowControllerAsyncPublishMode std::thread thread; - bool running = false; + std::atomic_bool running {false}; std::condition_variable cv; @@ -1041,10 +1042,10 @@ class FlowControllerImpl : public FlowController typename std::enable_if::value, void>::type initialize_async_thread() { - if (false == async_mode.running) + bool expected = false; + if (async_mode.running.compare_exchange_strong(expected, true)) { // Code for initializing the asynchronous thread. - async_mode.running = true; async_mode.thread = std::thread(&FlowControllerImpl::run, this); } } From 9398a5820c34d346fa63fdd1cfd1249c6aecf74d Mon Sep 17 00:00:00 2001 From: Eduardo Ponz Date: Thu, 13 Oct 2022 12:42:30 +0200 Subject: [PATCH 2/5] Refs #15905: Add RTPS regression test Signed-off-by: Eduardo Ponz --- .../common/RTPSBlackboxTestsBasic.cpp | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp index aedc70b0c1a..93269e69c23 100644 --- a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp @@ -15,10 +15,12 @@ #include "BlackboxTests.hpp" #include +#include #include #include +#include #include #include #include @@ -663,6 +665,115 @@ TEST(RTPS, RemoveDisabledParticipant) ASSERT_TRUE(RTPSDomain::removeRTPSParticipant(rtps_participant)); } + +/** + * This test checks a race condition on initializing a writer's flow controller when creating + * several RTPSWriters in parallel: https://eprosima.easyredmine.com/issues/15905 + * + * The test creates a participant with 4 different flow controllers, and then creates 200 threads + * which each create an RTPSWriter which uses one of the participant's flow controllers. + * The threads wait for a command coming from the main thread to delete the writers. This is to + * ensure that all threads are initialized prior to any of them starts deleting. + */ +TEST(RTPS, MultithreadedWriterCreation) +{ + /* Flow controller builder */ + using FlowControllerDescriptor_t = eprosima::fastdds::rtps::FlowControllerDescriptor; + using SchedulerPolicy_t = eprosima::fastdds::rtps::FlowControllerSchedulerPolicy; + auto create_flow_controller = + [](const char* name, SchedulerPolicy_t scheduler, + int32_t max_bytes_per_period, + uint64_t period_ms) -> std::shared_ptr + { + std::shared_ptr descriptor = std::make_shared(); + descriptor->name = name; + descriptor->scheduler = scheduler; + descriptor->max_bytes_per_period = max_bytes_per_period; + descriptor->period_ms = period_ms; + return descriptor; + }; + + /* Create participant */ + RTPSParticipantAttributes rtps_attr; + // Create one flow controller of each kind to make things interesting + const char* flow_controller_name = "fifo_controller"; + rtps_attr.flow_controllers.push_back(create_flow_controller("high_priority_controller", + SchedulerPolicy_t::HIGH_PRIORITY, 200, 10)); + rtps_attr.flow_controllers.push_back(create_flow_controller("priority_with_reservation_controller", + SchedulerPolicy_t::PRIORITY_WITH_RESERVATION, 200, 10)); + rtps_attr.flow_controllers.push_back(create_flow_controller("round_robin_controller", + SchedulerPolicy_t::ROUND_ROBIN, 200, 10)); + rtps_attr.flow_controllers.push_back(create_flow_controller(flow_controller_name, SchedulerPolicy_t::FIFO, 200, + 10)); + RTPSParticipant* rtps_participant = RTPSDomain::createParticipant( + (uint32_t)GET_PID() % 230, false, rtps_attr, nullptr); + + /* Test sync variables */ + std::mutex finish_mtx; + std::condition_variable finish_cv; + bool should_finish = false; + + /* Lambda function to create a writer with a flow controller, and to destroy it at command */ + auto thread_run = [rtps_participant, flow_controller_name, &finish_mtx, &finish_cv, &should_finish]() + { + /* Create writer history */ + eprosima::fastrtps::rtps::HistoryAttributes hattr; + eprosima::fastrtps::rtps::WriterHistory* history = new eprosima::fastrtps::rtps::WriterHistory(hattr); + eprosima::fastrtps::TopicAttributes topic_attr; + + /* Create writer with a flow controller */ + eprosima::fastrtps::rtps::WriterAttributes writer_attr; + writer_attr.mode = RTPSWriterPublishMode::ASYNCHRONOUS_WRITER; + writer_attr.flow_controller_name = flow_controller_name; + eprosima::fastrtps::rtps::RTPSWriter* writer = eprosima::fastrtps::rtps::RTPSDomain::createRTPSWriter( + rtps_participant, writer_attr, history, nullptr); + eprosima::fastrtps::WriterQos writer_qos; + + /* Register writer in participant */ + ASSERT_EQ(rtps_participant->registerWriter(writer, topic_attr, writer_qos), true); + + { + /* Wait for test completion request */ + std::unique_lock lock(finish_mtx); + finish_cv.wait(lock, [&should_finish]() + { + return should_finish; + }); + } + + /* Remove writer */ + ASSERT_TRUE(RTPSDomain::removeRTPSWriter(writer)); + }; + + { + /* Create test threads */ + constexpr size_t num_threads = 200; + std::vector threads; + for (size_t i = 0; i < num_threads; ++i) + { + threads.push_back(std::thread(thread_run)); + } + + /* Once all threads are created, we can start deleting them */ + { + std::lock_guard guard(finish_mtx); + should_finish = true; + finish_cv.notify_all(); + } + + /* Wait until are threads join */ + for (std::thread& thr : threads) + { + thr.join(); + } + } + + /* Clean up */ + ASSERT_TRUE(RTPSDomain::removeRTPSParticipant(rtps_participant)); + ASSERT_NE(nullptr, rtps_participant); + RTPSDomain::stopAll(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else From 00acfab8926fb0cad1ad7c7eb3446ee3e60f9dc6 Mon Sep 17 00:00:00 2001 From: Eduardo Ponz Date: Thu, 13 Oct 2022 12:43:11 +0200 Subject: [PATCH 3/5] Refs #15905: Add DomainParticipantImpl::create_instance_handle data race regression test Signed-off-by: Eduardo Ponz --- .../blackbox/common/DDSBlackboxTestsBasic.cpp | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index c158b928fe1..872fa220ae0 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include + #include #include @@ -109,6 +113,77 @@ TEST(DDSBasic, DeleteDisabledEntities) factory->delete_participant(participant); } +/** + * This test checks a race condition on when calling DomainParticipantImpl::create_instance_handle() + * from different threads simultaneously. This was resulting in a `free(): invalid pointer` crash + * when deleting publishers created this way, as there was a clash in their respective instance + * handles. Not only did the crash occur, but it was also reported by TSan. + * + * The tests spawns 200 thread, each creating a publisher and then waiting on a command from the + * main thread to delete them (so all of them at deleted at the same time). + */ +TEST(DDSBasic, MultithreadedPublisherCreation) +{ + /* Get factory */ + DomainParticipantFactory* factory = DomainParticipantFactory::get_instance(); + ASSERT_NE(nullptr, factory); + + /* Create participant */ + DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT); + ASSERT_NE(nullptr, participant); + + /* Test synchronization variables */ + std::mutex finish_mtx; + std::condition_variable finish_cv; + bool should_finish = false; + + /* Function to create publishers, deleting them on command */ + auto thread_run = + [participant, &finish_mtx, &finish_cv, &should_finish]() + { + /* Create publisher */ + Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT); + ASSERT_NE(nullptr, publisher); + + { + /* Wait for test completion request */ + std::unique_lock lock(finish_mtx); + finish_cv.wait(lock, [&should_finish]() + { + return should_finish; + }); + } + + /* Delete publisher */ + ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_publisher(publisher)); + }; + + { + /* Create threads */ + std::vector threads; + for (size_t i = 0; i < 200; i++) + { + threads.push_back(std::thread(thread_run)); + } + + /* Command threads to delete their publishers */ + { + std::lock_guard guard(finish_mtx); + should_finish = true; + finish_cv.notify_all(); + } + + /* Wait for all threads to join */ + for (std::thread& thr : threads) + { + thr.join(); + } + } + + /* Clean up */ + ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant)); +} + } // namespace dds } // namespace fastdds } // namespace eprosima From 4987408e9afef3c232e01b07e19cceb2c8a7d311 Mon Sep 17 00:00:00 2001 From: Eduardo Ponz Date: Thu, 13 Oct 2022 16:05:19 +0200 Subject: [PATCH 4/5] Refs #15905: Set DomainParticipantImpl::next_instance_id_ as atomic Signed-off-by: Eduardo Ponz --- src/cpp/fastdds/domain/DomainParticipantImpl.cpp | 8 ++++---- src/cpp/fastdds/domain/DomainParticipantImpl.hpp | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp index 74e569c33c0..da28d045a85 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp @@ -2302,12 +2302,12 @@ void DomainParticipantImpl::create_instance_handle( { using eprosima::fastrtps::rtps::octet; - ++next_instance_id_; + uint32_t id = ++next_instance_id_; handle = guid_; handle.value[15] = 0x01; // Vendor specific; - handle.value[14] = static_cast(next_instance_id_ & 0xFF); - handle.value[13] = static_cast((next_instance_id_ >> 8) & 0xFF); - handle.value[12] = static_cast((next_instance_id_ >> 16) & 0xFF); + handle.value[14] = static_cast(id & 0xFF); + handle.value[13] = static_cast((id >> 8) & 0xFF); + handle.value[12] = static_cast((id >> 16) & 0xFF); } DomainParticipantListener* DomainParticipantImpl::get_listener_for( diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp index 692ae5c8458..0849af216ee 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp @@ -496,7 +496,7 @@ class DomainParticipantImpl fastrtps::rtps::GUID_t guid_; //!For instance handle creation - uint32_t next_instance_id_; + std::atomic next_instance_id_; //!Participant Qos DomainParticipantQos qos_; From eec6595253bd393992584cd78e962d2c2515942d Mon Sep 17 00:00:00 2001 From: Eduardo Ponz Date: Thu, 13 Oct 2022 18:41:21 +0200 Subject: [PATCH 5/5] Refs #15905: Apply suggestions Signed-off-by: Eduardo Ponz --- test/blackbox/common/DDSBlackboxTestsBasic.cpp | 4 ++-- test/blackbox/common/RTPSBlackboxTestsBasic.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index 872fa220ae0..887aab4e08a 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -114,12 +114,12 @@ TEST(DDSBasic, DeleteDisabledEntities) } /** - * This test checks a race condition on when calling DomainParticipantImpl::create_instance_handle() + * This test checks a race condition when calling DomainParticipantImpl::create_instance_handle() * from different threads simultaneously. This was resulting in a `free(): invalid pointer` crash * when deleting publishers created this way, as there was a clash in their respective instance * handles. Not only did the crash occur, but it was also reported by TSan. * - * The tests spawns 200 thread, each creating a publisher and then waiting on a command from the + * The test spawns 200 threads, each creating a publisher and then waiting on a command from the * main thread to delete them (so all of them at deleted at the same time). */ TEST(DDSBasic, MultithreadedPublisherCreation) diff --git a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp index 93269e69c23..e938ecd3f12 100644 --- a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp @@ -727,9 +727,9 @@ TEST(RTPS, MultithreadedWriterCreation) writer_attr.flow_controller_name = flow_controller_name; eprosima::fastrtps::rtps::RTPSWriter* writer = eprosima::fastrtps::rtps::RTPSDomain::createRTPSWriter( rtps_participant, writer_attr, history, nullptr); - eprosima::fastrtps::WriterQos writer_qos; /* Register writer in participant */ + eprosima::fastrtps::WriterQos writer_qos; ASSERT_EQ(rtps_participant->registerWriter(writer, topic_attr, writer_qos), true); {