Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[15905] Fix dataraces when creating DataWriters #3008

Merged
merged 5 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2302,12 +2302,12 @@ void DomainParticipantImpl::create_instance_handle(
{
using eprosima::fastrtps::rtps::octet;

++next_instance_id_;
uint32_t id = ++next_instance_id_;
handle = guid_;
handle.value[15] = 0x01; // Vendor specific;
handle.value[14] = static_cast<octet>(next_instance_id_ & 0xFF);
handle.value[13] = static_cast<octet>((next_instance_id_ >> 8) & 0xFF);
handle.value[12] = static_cast<octet>((next_instance_id_ >> 16) & 0xFF);
handle.value[14] = static_cast<octet>(id & 0xFF);
handle.value[13] = static_cast<octet>((id >> 8) & 0xFF);
handle.value[12] = static_cast<octet>((id >> 16) & 0xFF);
}

DomainParticipantListener* DomainParticipantImpl::get_listener_for(
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ class DomainParticipantImpl
fastrtps::rtps::GUID_t guid_;

//!For instance handle creation
uint32_t next_instance_id_;
std::atomic<uint32_t> next_instance_id_;

//!Participant Qos
DomainParticipantQos qos_;
Expand Down
17 changes: 9 additions & 8 deletions src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/writer/RTPSWriter.h>

#include <map>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <chrono>
#include <condition_variable>
#include <map>
#include <mutex>
#include <thread>
#include <unordered_map>

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -239,7 +240,7 @@ struct FlowControllerAsyncPublishMode

std::thread thread;

bool running = false;
std::atomic_bool running {false};

std::condition_variable cv;

Expand Down Expand Up @@ -1041,10 +1042,10 @@ class FlowControllerImpl : public FlowController
typename std::enable_if<!std::is_same<FlowControllerPureSyncPublishMode, PubMode>::value, void>::type
initialize_async_thread()
{
if (false == async_mode.running)
bool expected = false;
if (async_mode.running.compare_exchange_strong(expected, true))
{
// Code for initializing the asynchronous thread.
async_mode.running = true;
async_mode.thread = std::thread(&FlowControllerImpl::run, this);
}
}
Expand Down
75 changes: 75 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <condition_variable>
#include <mutex>
#include <thread>

#include <gtest/gtest.h>

#include <fastdds/dds/domain/DomainParticipant.hpp>
Expand Down Expand Up @@ -109,6 +113,77 @@ TEST(DDSBasic, DeleteDisabledEntities)
factory->delete_participant(participant);
}

/**
* This test checks a race condition when calling DomainParticipantImpl::create_instance_handle()
* from different threads simultaneously. This was resulting in a `free(): invalid pointer` crash
* when deleting publishers created this way, as there was a clash in their respective instance
* handles. Not only did the crash occur, but it was also reported by TSan.
*
* The test spawns 200 threads, each creating a publisher and then waiting on a command from the
* main thread to delete them (so all of them at deleted at the same time).
*/
TEST(DDSBasic, MultithreadedPublisherCreation)
{
/* Get factory */
DomainParticipantFactory* factory = DomainParticipantFactory::get_instance();
ASSERT_NE(nullptr, factory);

/* Create participant */
DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT);
ASSERT_NE(nullptr, participant);

/* Test synchronization variables */
std::mutex finish_mtx;
std::condition_variable finish_cv;
bool should_finish = false;

/* Function to create publishers, deleting them on command */
auto thread_run =
[participant, &finish_mtx, &finish_cv, &should_finish]()
{
/* Create publisher */
Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT);
ASSERT_NE(nullptr, publisher);

{
/* Wait for test completion request */
std::unique_lock<std::mutex> lock(finish_mtx);
finish_cv.wait(lock, [&should_finish]()
{
return should_finish;
});
}

/* Delete publisher */
ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_publisher(publisher));
};

{
/* Create threads */
std::vector<std::thread> threads;
for (size_t i = 0; i < 200; i++)
{
threads.push_back(std::thread(thread_run));
}

/* Command threads to delete their publishers */
{
std::lock_guard<std::mutex> guard(finish_mtx);
should_finish = true;
finish_cv.notify_all();
}

/* Wait for all threads to join */
for (std::thread& thr : threads)
{
thr.join();
}
}

/* Clean up */
ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant));
}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
111 changes: 111 additions & 0 deletions test/blackbox/common/RTPSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
#include "BlackboxTests.hpp"

#include <chrono>
#include <memory>
#include <thread>

#include <gtest/gtest.h>

#include <fastdds/rtps/flowcontrol/FlowControllerDescriptor.hpp>
#include <fastrtps/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastrtps/rtps/participant/RTPSParticipant.h>
#include <fastrtps/rtps/RTPSDomain.h>
Expand Down Expand Up @@ -663,6 +665,115 @@ TEST(RTPS, RemoveDisabledParticipant)
ASSERT_TRUE(RTPSDomain::removeRTPSParticipant(rtps_participant));
}


/**
* This test checks a race condition on initializing a writer's flow controller when creating
* several RTPSWriters in parallel: https://eprosima.easyredmine.com/issues/15905
*
* The test creates a participant with 4 different flow controllers, and then creates 200 threads
* which each create an RTPSWriter which uses one of the participant's flow controllers.
* The threads wait for a command coming from the main thread to delete the writers. This is to
* ensure that all threads are initialized prior to any of them starts deleting.
*/
TEST(RTPS, MultithreadedWriterCreation)
{
/* Flow controller builder */
using FlowControllerDescriptor_t = eprosima::fastdds::rtps::FlowControllerDescriptor;
using SchedulerPolicy_t = eprosima::fastdds::rtps::FlowControllerSchedulerPolicy;
auto create_flow_controller =
[](const char* name, SchedulerPolicy_t scheduler,
int32_t max_bytes_per_period,
uint64_t period_ms) -> std::shared_ptr<FlowControllerDescriptor_t>
{
std::shared_ptr<FlowControllerDescriptor_t> descriptor = std::make_shared<FlowControllerDescriptor_t>();
descriptor->name = name;
descriptor->scheduler = scheduler;
descriptor->max_bytes_per_period = max_bytes_per_period;
descriptor->period_ms = period_ms;
return descriptor;
};

/* Create participant */
RTPSParticipantAttributes rtps_attr;
// Create one flow controller of each kind to make things interesting
const char* flow_controller_name = "fifo_controller";
rtps_attr.flow_controllers.push_back(create_flow_controller("high_priority_controller",
SchedulerPolicy_t::HIGH_PRIORITY, 200, 10));
rtps_attr.flow_controllers.push_back(create_flow_controller("priority_with_reservation_controller",
SchedulerPolicy_t::PRIORITY_WITH_RESERVATION, 200, 10));
rtps_attr.flow_controllers.push_back(create_flow_controller("round_robin_controller",
SchedulerPolicy_t::ROUND_ROBIN, 200, 10));
rtps_attr.flow_controllers.push_back(create_flow_controller(flow_controller_name, SchedulerPolicy_t::FIFO, 200,
10));
RTPSParticipant* rtps_participant = RTPSDomain::createParticipant(
(uint32_t)GET_PID() % 230, false, rtps_attr, nullptr);

/* Test sync variables */
std::mutex finish_mtx;
std::condition_variable finish_cv;
bool should_finish = false;

/* Lambda function to create a writer with a flow controller, and to destroy it at command */
auto thread_run = [rtps_participant, flow_controller_name, &finish_mtx, &finish_cv, &should_finish]()
{
/* Create writer history */
eprosima::fastrtps::rtps::HistoryAttributes hattr;
eprosima::fastrtps::rtps::WriterHistory* history = new eprosima::fastrtps::rtps::WriterHistory(hattr);
eprosima::fastrtps::TopicAttributes topic_attr;

/* Create writer with a flow controller */
eprosima::fastrtps::rtps::WriterAttributes writer_attr;
writer_attr.mode = RTPSWriterPublishMode::ASYNCHRONOUS_WRITER;
writer_attr.flow_controller_name = flow_controller_name;
eprosima::fastrtps::rtps::RTPSWriter* writer = eprosima::fastrtps::rtps::RTPSDomain::createRTPSWriter(
rtps_participant, writer_attr, history, nullptr);

/* Register writer in participant */
eprosima::fastrtps::WriterQos writer_qos;
ASSERT_EQ(rtps_participant->registerWriter(writer, topic_attr, writer_qos), true);

{
/* Wait for test completion request */
std::unique_lock<std::mutex> lock(finish_mtx);
finish_cv.wait(lock, [&should_finish]()
{
return should_finish;
});
}

/* Remove writer */
ASSERT_TRUE(RTPSDomain::removeRTPSWriter(writer));
};

{
/* Create test threads */
constexpr size_t num_threads = 200;
std::vector<std::thread> threads;
for (size_t i = 0; i < num_threads; ++i)
{
threads.push_back(std::thread(thread_run));
}

/* Once all threads are created, we can start deleting them */
{
std::lock_guard<std::mutex> guard(finish_mtx);
should_finish = true;
finish_cv.notify_all();
}

/* Wait until are threads join */
for (std::thread& thr : threads)
{
thr.join();
}
}

/* Clean up */
ASSERT_TRUE(RTPSDomain::removeRTPSParticipant(rtps_participant));
ASSERT_NE(nullptr, rtps_participant);
RTPSDomain::stopAll();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down