From a8c6c2119dc0a6e294473fe9ca0a2ebced573201 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 7 Sep 2022 16:15:46 +0200 Subject: [PATCH 01/19] Add test for multithreaded creation of readers on a single subscriber. Signed-off-by: Miguel Company --- .../blackbox/common/DDSBlackboxTestsBasic.cpp | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index 887aab4e08a..493facce0b3 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -184,6 +184,74 @@ TEST(DDSBasic, MultithreadedPublisherCreation) ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant)); } +TEST(DDSBasic, MultithreadedReaderCreation) +{ + // Get factory + DomainParticipantFactory* factory = DomainParticipantFactory::get_instance(); + ASSERT_NE(nullptr, factory); + + // Create participant + DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT); + ASSERT_NE(nullptr, participant); + + // Register type + TypeSupport type_support; + type_support.reset(new FixedSizedPubSubType()); + type_support.register_type(participant); + ASSERT_NE(nullptr, type_support); + + // Create subscriber + Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); + ASSERT_NE(nullptr, subscriber); + + // Create Topic + Topic* topic = participant->create_topic(TEST_TOPIC_NAME, type_support.get_type_name(), TOPIC_QOS_DEFAULT); + ASSERT_NE(nullptr, topic); + + std::mutex mtx; + std::condition_variable cv; + bool should_finish = false; + + auto thread_run = [subscriber, topic, &mtx, &cv, &should_finish]() + { + // Create reader + DataReader* reader = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT); + ASSERT_NE(nullptr, reader); + + // Wait for test completion request + std::unique_lock lock(mtx); + cv.wait(lock, [&should_finish]() + { + return should_finish; + }); + + ASSERT_EQ(ReturnCode_t::RETCODE_OK, subscriber->delete_datareader(reader)); + }; + + { + std::vector threads; + for (size_t i = 0; i < 10; ++i) + { + threads.push_back(std::thread(thread_run)); + } + + { + std::lock_guard guard(mtx); + should_finish = true; + cv.notify_all(); + } + + for (std::thread& thr : threads) + { + thr.join(); + } + } + + ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_subscriber(subscriber)); + ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_topic(topic)); + ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant)); +} + } // namespace dds } // namespace fastdds } // namespace eprosima From 5e284f3b390982208536d051c399f7c3f967fe42 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 8 Sep 2022 10:26:48 +0200 Subject: [PATCH 02/19] Add DataWriter. Signed-off-by: Miguel Company --- test/blackbox/common/DDSBlackboxTestsBasic.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index 493facce0b3..77a35de3065 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -204,10 +204,18 @@ TEST(DDSBasic, MultithreadedReaderCreation) Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); ASSERT_NE(nullptr, subscriber); + // Create publisher + Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT); + ASSERT_NE(nullptr, publisher); + // Create Topic Topic* topic = participant->create_topic(TEST_TOPIC_NAME, type_support.get_type_name(), TOPIC_QOS_DEFAULT); ASSERT_NE(nullptr, topic); + // Create DataWriter + DataWriter* writer = publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT); + ASSERT_NE(nullptr, writer); + std::mutex mtx; std::condition_variable cv; bool should_finish = false; @@ -247,6 +255,8 @@ TEST(DDSBasic, MultithreadedReaderCreation) } } + ASSERT_EQ(ReturnCode_t::RETCODE_OK, publisher->delete_datawriter(writer)); + ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_publisher(publisher)); ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_subscriber(subscriber)); ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_topic(topic)); ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant)); From f17a4e839a0ec3c74eae28f56b2500088f00528a Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Fri, 23 Sep 2022 11:38:54 +0200 Subject: [PATCH 03/19] Refs 15766: Refactor of shared_mutex to select writer priority Signed-off-by: Miguel Barro --- include/fastrtps/utils/shared_mutex.hpp | 277 +++++++++++++++++++++--- 1 file changed, 244 insertions(+), 33 deletions(-) diff --git a/include/fastrtps/utils/shared_mutex.hpp b/include/fastrtps/utils/shared_mutex.hpp index d2fbf9b5f81..f896056920d 100644 --- a/include/fastrtps/utils/shared_mutex.hpp +++ b/include/fastrtps/utils/shared_mutex.hpp @@ -1,5 +1,29 @@ -// Copyright Howard Hinnant 2007-2010. Distributed under the Boost -// Software License, Version 1.0. (see http://www.boost.org/LICENSE_1_0.txt) +/* + Copyright Howard Hinnant 2007-2010. Distributed under the Boost + Software License, Version 1.0. (see http://www.boost.org/LICENSE_1_0.txt) + The original implementation has been modified to support the POSIX priorities: + + PTHREAD_RWLOCK_PREFER_READER_NP + This is the default. A thread may hold multiple read + locks; that is, read locks are recursive. According to + The Single Unix Specification, the behavior is unspecified + when a reader tries to place a lock, and there is no write + lock but writers are waiting. Giving preference to the + reader, as is set by PTHREAD_RWLOCK_PREFER_READER_NP, + implies that the reader will receive the requested lock, + even if a writer is waiting. As long as there are + readers, the writer will be starved. + + PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP + Setting the lock kind to this avoids writer starvation as + long as any read locking is not done in a recursive + fashion. + + The C++ Standard has not yet (C++20) imposed any requirements on shared_mutex implementation thus + each platform made its own choices: + Windows & Boost defaults to PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP. + Linux & Mac defaults to PTHREAD_RWLOCK_PREFER_READER_NP. + */ /** * @file shared_mutex.hpp @@ -29,22 +53,33 @@ ( !(defined(__has_include) && __has_include()) && \ !(defined(HAVE_CXX17) && HAVE_CXX17) && __cplusplus < 201703 ) -#include -#include #include +#include +#include +#include #include +#include namespace eprosima { +namespace detail { -class shared_mutex +// mimic POSIX Read-Write lock syntax +enum class shared_mutex_type { + PTHREAD_RWLOCK_PREFER_READER_NP, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP +}; + +class shared_mutex_base +{ + +protected: + typedef std::mutex mutex_t; typedef std::condition_variable cond_t; typedef unsigned count_t; mutex_t mut_; cond_t gate1_; - cond_t gate2_; count_t state_; static const count_t write_entered_ = 1U << (sizeof(count_t) * CHAR_BIT - 1); @@ -52,40 +87,26 @@ class shared_mutex public: - shared_mutex() + shared_mutex_base() : state_(0) { } - ~shared_mutex() + ~shared_mutex_base() { std::lock_guard _(mut_); } - shared_mutex( - const shared_mutex&) = delete; - shared_mutex& operator =( - const shared_mutex&) = delete; + shared_mutex_base( + const shared_mutex_base&) = delete; + shared_mutex_base& operator =( + const shared_mutex_base&) = delete; // Exclusive ownership - void lock() - { - std::unique_lock lk(mut_); - while (state_ & write_entered_) - { - gate1_.wait(lk); - } - state_ |= write_entered_; - while (state_ & n_readers_) - { - gate2_.wait(lk); - } - } - bool try_lock() { - std::unique_lock lk(mut_); + std::lock_guard _(mut_); if (state_ == 0) { state_ = write_entered_; @@ -117,7 +138,7 @@ class shared_mutex bool try_lock_shared() { - std::unique_lock lk(mut_); + std::lock_guard _(mut_); count_t num_readers = state_ & n_readers_; if (!(state_ & write_entered_) && num_readers != n_readers_) { @@ -129,6 +150,35 @@ class shared_mutex return false; } +}; + +template +class shared_mutex; + +// original Hinnant implementation prioritizing writers + +template<> +class shared_mutex + : public shared_mutex_base +{ + cond_t gate2_; + +public: + + void lock() + { + std::unique_lock lk(mut_); + while (state_ & write_entered_) + { + gate1_.wait(lk); + } + state_ |= write_entered_; + while (state_ & n_readers_) + { + gate2_.wait(lk); + } + } + void unlock_shared() { std::lock_guard _(mut_); @@ -142,17 +192,178 @@ class shared_mutex gate2_.notify_one(); } } - else + else if (num_readers == n_readers_ - 1) { - if (num_readers == n_readers_ - 1) - { - gate1_.notify_one(); - } + gate1_.notify_one(); + } + } + +}; + +// implementation not locking readers on behalf of writers + +template<> +class shared_mutex + : public shared_mutex_base +{ + count_t writer_waiting_ = 0; +public: + + void lock() + { + std::unique_lock lk(mut_); + ++writer_waiting_; + while (state_ & n_readers_ || state_ & write_entered_) + { + gate1_.wait(lk); + } + state_ |= write_entered_; + --writer_waiting_; + } + + void unlock_shared() + { + std::lock_guard _(mut_); + count_t num_readers = (state_ & n_readers_) - 1; + state_ &= ~n_readers_; + state_ |= num_readers; + + if ((writer_waiting_ && num_readers == 0) + || (num_readers == n_readers_ - 1)) + { + gate1_.notify_one(); + } + } +}; + +// Debugger wrapper class that provides insight +template +class debug_wrapper : public sm +{ + std::mutex wm_; + // Identity of the exclusive owner if any + std::thread::id exclusive_owner_ = {}; + // key_type thread_id, mapped_type number of locks + std::map shared_owners_; + +public: + + ~debug_wrapper() + { + std::lock_guard _(wm_); + } + + // Exclusive ownership + + void lock() + { + sm::lock(); + std::lock_guard _(wm_); + exclusive_owner_ = std::this_thread::get_id(); + } + + bool try_lock() + { + std::lock_guard _(wm_); + bool res = sm::try_lock(); + if (res) + { + exclusive_owner_ = std::this_thread::get_id(); + } + return res; + } + + void unlock() + { + std::lock_guard _(wm_); + sm::unlock(); + exclusive_owner_ = std::thread::id(); + } + + // Shared ownership + + void lock_shared() + { + sm::lock_shared(); + std::lock_guard _(wm_); + ++shared_owners_[std::this_thread::get_id()]; + } + + bool try_lock_shared() + { + std::lock_guard _(wm_); + bool res = sm::try_lock_shared(); + if (res) + { + ++shared_owners_[std::this_thread::get_id()]; + } + return res; + } + + void unlock_shared() + { + std::lock_guard _(wm_); + sm::unlock_shared(); + auto owner = shared_owners_.find(std::this_thread::get_id()); + if ( owner != shared_owners_.end() && 0 == --owner->second ) + { + shared_owners_.erase(owner); } } }; +} // namespace detail + +/* + Fast-DDS defaults to PTHREAD_RWLOCK_PREFER_READER_NP for two main reasons: + + - It allows reader side recursiveness. If we have two threads (T1, T2) and + called S a shared lock and E and exclusive one. + + T1: S -> S + T2: E + + PTHREAD_RWLOCK_PREFER_READER_NP will never deadlock. The S locks are not + influenced by the E locks. + + PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP will deadlock. before T1 + takes S twice. That happens because: + + T1's second S will wait for E (writer is prioritized) + + E will wait for T1's first S lock (writer needs atomic access) + + T1's first S cannot unlock because is blocked in the second S. + + Thus, shared_mutex is + non-recursive. + + - It prevents ABBA deadlocks with other mutexes. If we have three threads + (Ti) and P is an ordinary mutex: + + T1: P -> S + T2: S -> P + T3: E + + PTHREAD_RWLOCK_PREFER_READER_NP will never deadlock. The S locks are not + influenced by the E locks. Starvation issues can be managed in the user + code. + + PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP will deadlock if T3 takes E + before T1 takes S. That happens because: + + T1's S will wait for E (writer is prioritized) + + E will wait for T2's S lock (writer needs atomic access) + + T2's S cannot unlock because is blocked in P (owned by T1). + + Thus, shared_mutex must be + managed like an ordinary mutex in deadlock sense. + */ + +#ifdef NDEBUG +using shared_mutex = detail::shared_mutex; +#else +using shared_mutex = + detail::debug_wrapper>; +#endif // NDEBUG + template class shared_lock { From 74927312d7bec861581db1b8bbdc98ca5630d4df Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Sat, 24 Sep 2022 11:19:29 +0200 Subject: [PATCH 04/19] Refs 15766: CMake update to force third party shared mutex if the framework prioritizes writing. Signed-off-by: Miguel Barro --- CMakeLists.txt | 9 --- cmake/modules/check_shared_mutex_priority.cpp | 69 +++++++++++++++++++ src/cpp/CMakeLists.txt | 38 ++++++++++ 3 files changed, 107 insertions(+), 9 deletions(-) create mode 100644 cmake/modules/check_shared_mutex_priority.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 302f79b4020..6e59d509d43 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -266,15 +266,6 @@ endif() option(SQLITE3_SUPPORT "Activate SQLITE3 support" ON) -option(USE_THIRDPARTY_SHARED_MUTEX [=[ -Forces the use of a Boost-based shared_mutex implementation -instead of the framework one. Useful to cope with issues on -framework implementations like misguided sanitizer reports. -This implementation will be used by default on frameworks -lacking the shared_mutex feature like those not fulfilling -C++17. -]=] OFF) - ############################################################################### # SHM as Default transport ############################################################################### diff --git a/cmake/modules/check_shared_mutex_priority.cpp b/cmake/modules/check_shared_mutex_priority.cpp new file mode 100644 index 00000000000..7dab4e2a4e7 --- /dev/null +++ b/cmake/modules/check_shared_mutex_priority.cpp @@ -0,0 +1,69 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file check_shared_mutex_priority.cpp + * + */ + +#include +#include +#include +#include +#include +#include + +using namespace std; + +int main() +{ + shared_mutex sm; + atomic_bool mark = false; + + // take first shared lock + sm.lock_shared(); + + // signal is taken + thread exclusive([&]() + { + mark = true; + lock_guard guard(sm); + }); + + // Wait till the thread takes the lock + do + { + this_thread::sleep_for(chrono::milliseconds(100)); + } + while (!mark); + + // try take the second shared lock + bool success = sm.try_lock_shared(); + if (success) + { + sm.unlock_shared(); + cout << "PTHREAD_RWLOCK_PREFER_READER_NP" << endl; + } + else + { + cout << "PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP" << endl; + } + + // release first lock + sm.unlock_shared(); + // wait for the main thread + exclusive.join(); + + return 0; +} diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index 987aaa4530e..e3d649ca9dd 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -386,6 +386,44 @@ if(APPLE) set(CMAKE_INSTALL_RPATH_USE_LINK_PATH FALSE) endif() +# Check if the shared_mutex provided by the platform STL library +# prioritizes writes + +try_run(SM_RUN_RESULT SM_COMPILE_RESULT + "${CMAKE_CURRENT_BINARY_DIR}/shmtest" + "${CMAKE_SOURCE_DIR}/cmake/modules/check_shared_mutex_priority.cpp" + LINK_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} + RUN_OUTPUT_VARIABLE SM_RUN_OUTPUT) + +if(SM_COMPILE_RESULT AND NOT SM_RUN_RESULT) + string(STRIP ${SM_RUN_OUTPUT} SM_RUN_OUTPUT) + message(STATUS "Framework's shared_mutex is ${SM_RUN_OUTPUT}") +endif() + +if(SM_RUN_OUTPUT STREQUAL "PTHREAD_RWLOCK_PREFER_READER_NP") + set(USER_CAN_CHOOSE_SHARED_MEMORY_THIRDPARTY ON) +else() + message(STATUS "Forcing third party shared_mutex") + set(USE_THIRDPARTY_SHARED_MUTEX ON) +endif() + +cmake_dependent_option( + USE_THIRDPARTY_SHARED_MUTEX [=[ +Forces the use of a Boost-based shared_mutex implementation +instead of the framework one. Useful to cope with issues on +framework implementations like misguided sanitizer reports. +This implementation will be used by default on frameworks +lacking the shared_mutex feature like those not fulfilling +C++17. +]=] OFF + "USER_CAN_CHOOSE_SHARED_MEMORY_THIRDPARTY" + ON) + +unset(USER_CAN_CHOOSE_SHARED_MEMORY_THIRDPARTY) +unset(SM_RUN_RESULT) +unset(SM_COMPILE_RESULT) +unset(SM_RUN_OUTPUT) + #Create library add_library(${PROJECT_NAME} ${${PROJECT_NAME}_source_files}) set_target_properties(${PROJECT_NAME} PROPERTIES VERSION ${PROJECT_VERSION}) From b3bf9ac80315fb8613721c17c8d75b1a5aba3c2d Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Sat, 24 Sep 2022 17:17:32 +0200 Subject: [PATCH 05/19] Refs 15766: Add atomic support for some debian distros Signed-off-by: Miguel Barro --- src/cpp/CMakeLists.txt | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index e3d649ca9dd..8f73040e460 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -386,13 +386,22 @@ if(APPLE) set(CMAKE_INSTALL_RPATH_USE_LINK_PATH FALSE) endif() +# Find out if libatomic link is required in this platform +find_package(Atomic MODULE) + # Check if the shared_mutex provided by the platform STL library # prioritizes writes +# try_run cannot manage targets yet +get_target_property(CMAKE_ATOMIC_LIB eProsima_atomic INTERFACE_LINK_LIBRARIES) +if(NOT CMAKE_ATOMIC_LIB) + set(CMAKE_ATOMIC_LIB) +endif() + try_run(SM_RUN_RESULT SM_COMPILE_RESULT "${CMAKE_CURRENT_BINARY_DIR}/shmtest" "${CMAKE_SOURCE_DIR}/cmake/modules/check_shared_mutex_priority.cpp" - LINK_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} + LINK_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_ATOMIC_LIB} RUN_OUTPUT_VARIABLE SM_RUN_OUTPUT) if(SM_COMPILE_RESULT AND NOT SM_RUN_RESULT) @@ -423,6 +432,7 @@ unset(USER_CAN_CHOOSE_SHARED_MEMORY_THIRDPARTY) unset(SM_RUN_RESULT) unset(SM_COMPILE_RESULT) unset(SM_RUN_OUTPUT) +unset(CMAKE_ATOMIC_LIB) #Create library add_library(${PROJECT_NAME} ${${PROJECT_NAME}_source_files}) @@ -471,9 +481,6 @@ else() set(PRIVACY "PUBLIC") endif() -# Find out if libatomic link is required in this platform -find_package(Atomic MODULE) - # Link library to external libraries. target_link_libraries(${PROJECT_NAME} ${PRIVACY} fastcdr foonathan_memory ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_DL_LIBS} From ced48853ef485953636e150f2f35c584d323e2e1 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Sat, 24 Sep 2022 17:38:09 +0200 Subject: [PATCH 06/19] Refs 15766: Allow recursiveness on participant endpoint collection mutexes. Now they always allow them. Signed-off-by: Miguel Barro --- .../rtps/participant/RTPSParticipantImpl.cpp | 2 -- .../rtps/participant/RTPSParticipantImpl.h | 28 ++----------------- 2 files changed, 2 insertions(+), 28 deletions(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index a1402cf72ab..2a879dab1fb 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -68,8 +68,6 @@ using UDPv4TransportDescriptor = fastdds::rtps::UDPv4TransportDescriptor; using TCPTransportDescriptor = fastdds::rtps::TCPTransportDescriptor; using SharedMemTransportDescriptor = fastdds::rtps::SharedMemTransportDescriptor; -thread_local RTPSParticipantImpl* RTPSParticipantImpl::collections_mutex_owner_ = nullptr; - static EntityId_t TrustedWriter( const EntityId_t& reader) { diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index d11e9bfe7c0..c8b1d440612 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -536,8 +536,6 @@ class RTPSParticipantImpl std::atomic IdCounter; //! Mutex to safely access endpoints collections mutable shared_mutex endpoints_list_mutex; - //! This member avoids shared_mutex reentrancy by tracking last participant into traversing the endpoints collections - static thread_local RTPSParticipantImpl* collections_mutex_owner_; //!Writer List. std::vector m_allWriterList; //!Reader List @@ -933,15 +931,7 @@ class RTPSParticipantImpl Functor f) { // check if we are reentrying - shared_lock may_lock; - RTPSParticipantImpl* previous_owner = collections_mutex_owner_; - - if (collections_mutex_owner_ != this) - { - shared_lock lock(endpoints_list_mutex); - may_lock = std::move(lock); - collections_mutex_owner_ = this; - } + shared_lock _(endpoints_list_mutex); // traverse the list for ( RTPSWriter* pw : m_userWriterList) @@ -952,9 +942,6 @@ class RTPSParticipantImpl } } - // restore tls former value - std::swap(collections_mutex_owner_, previous_owner); - return f; } @@ -967,15 +954,7 @@ class RTPSParticipantImpl Functor f) { // check if we are reentrying - shared_lock may_lock; - RTPSParticipantImpl* previous_owner = collections_mutex_owner_; - - if (collections_mutex_owner_ != this) - { - shared_lock lock(endpoints_list_mutex); - may_lock = std::move(lock); - collections_mutex_owner_ = this; - } + shared_lock _(endpoints_list_mutex); for ( RTPSReader* pr : m_userReaderList) { @@ -985,9 +964,6 @@ class RTPSParticipantImpl } } - // restore tls former value - std::swap(collections_mutex_owner_, previous_owner); - return f; } From e0653c4b63d95da095cac49e74ef247f4d6aecd0 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Sat, 24 Sep 2022 18:07:21 +0200 Subject: [PATCH 07/19] Refs 15766: linter pass Signed-off-by: Miguel Barro --- include/fastrtps/utils/shared_mutex.hpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/include/fastrtps/utils/shared_mutex.hpp b/include/fastrtps/utils/shared_mutex.hpp index f896056920d..ef3556e8474 100644 --- a/include/fastrtps/utils/shared_mutex.hpp +++ b/include/fastrtps/utils/shared_mutex.hpp @@ -207,6 +207,7 @@ class shared_mutex : public shared_mutex_base { count_t writer_waiting_ = 0; + public: void lock() @@ -229,11 +230,12 @@ class shared_mutex state_ |= num_readers; if ((writer_waiting_ && num_readers == 0) - || (num_readers == n_readers_ - 1)) + || (num_readers == n_readers_ - 1)) { gate1_.notify_one(); } } + }; // Debugger wrapper class that provides insight @@ -329,9 +331,9 @@ class debug_wrapper : public sm PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP will deadlock. before T1 takes S twice. That happens because: - + T1's second S will wait for E (writer is prioritized) - + E will wait for T1's first S lock (writer needs atomic access) - + T1's first S cannot unlock because is blocked in the second S. + + T1's second S will wait for E (writer is prioritized) + + E will wait for T1's first S lock (writer needs atomic access) + + T1's first S cannot unlock because is blocked in the second S. Thus, shared_mutex is non-recursive. @@ -349,9 +351,9 @@ class debug_wrapper : public sm PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP will deadlock if T3 takes E before T1 takes S. That happens because: - + T1's S will wait for E (writer is prioritized) - + E will wait for T2's S lock (writer needs atomic access) - + T2's S cannot unlock because is blocked in P (owned by T1). + + T1's S will wait for E (writer is prioritized) + + E will wait for T2's S lock (writer needs atomic access) + + T2's S cannot unlock because is blocked in P (owned by T1). Thus, shared_mutex must be managed like an ordinary mutex in deadlock sense. From 8bf908e8787c0e797680f0f8b96f6964b3aee00e Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Sat, 24 Sep 2022 23:11:37 +0200 Subject: [PATCH 08/19] Refs 15766: Generate the config.h file when the USE_THIRDPARTY_SHARED_MUTEX value is already specified. Signed-off-by: Miguel Barro --- src/cpp/CMakeLists.txt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index 8f73040e460..c872511cb6f 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -371,9 +371,6 @@ else() set(HAVE_STRICT_REALTIME 0) endif() -configure_file(${PROJECT_SOURCE_DIR}/include/${PROJECT_NAME}/config.h.in - ${PROJECT_BINARY_DIR}/include/${PROJECT_NAME}/config.h) - if(NOT ANDROID) find_package(Threads REQUIRED) endif() @@ -434,6 +431,10 @@ unset(SM_COMPILE_RESULT) unset(SM_RUN_OUTPUT) unset(CMAKE_ATOMIC_LIB) +# Generate the proper configure file +configure_file(${PROJECT_SOURCE_DIR}/include/${PROJECT_NAME}/config.h.in + ${PROJECT_BINARY_DIR}/include/${PROJECT_NAME}/config.h) + #Create library add_library(${PROJECT_NAME} ${${PROJECT_NAME}_source_files}) set_target_properties(${PROJECT_NAME} PROPERTIES VERSION ${PROJECT_VERSION}) From b446a9844058b902bf6588478f83da69d64cc439 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Thu, 29 Sep 2022 13:01:20 +0200 Subject: [PATCH 09/19] Refs 15766: Mandatory piggyback: avoid polution on free_pools_ collection Signed-off-by: Miguel Barro --- src/cpp/rtps/history/TopicPayloadPool.cpp | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/cpp/rtps/history/TopicPayloadPool.cpp b/src/cpp/rtps/history/TopicPayloadPool.cpp index 014d57f80f4..d337bcc7dcd 100644 --- a/src/cpp/rtps/history/TopicPayloadPool.cpp +++ b/src/cpp/rtps/history/TopicPayloadPool.cpp @@ -189,20 +189,18 @@ TopicPayloadPool::PayloadNode* TopicPayloadPool::allocate( TopicPayloadPool::PayloadNode* TopicPayloadPool::do_allocate( uint32_t size) { - PayloadNode* payload = nullptr; + PayloadNode* payload = new (std::nothrow) PayloadNode(size); - try + if (payload != nullptr) { - payload = new PayloadNode(size); + payload->data_index(static_cast(all_payloads_.size())); + all_payloads_.push_back(payload); } - catch (std::bad_alloc& exception) + else { - logWarning(RTPS_HISTORY, "Failure to create a new payload " << exception.what()); - return nullptr; + logWarning(RTPS_HISTORY, "Failure to create a new payload "); } - payload->data_index(static_cast(all_payloads_.size())); - all_payloads_.push_back(payload); return payload; } @@ -253,7 +251,11 @@ void TopicPayloadPool::reserve ( for (size_t i = all_payloads_.size(); i < min_num_payloads; ++i) { PayloadNode* payload = do_allocate(size); - free_payloads_.push_back(payload); + + if (payload != nullptr) + { + free_payloads_.push_back(payload); + } } } From b90c9d38158c0d27bc2abc8fa564d1e387abb3aa Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Mon, 10 Oct 2022 00:53:02 +0200 Subject: [PATCH 10/19] Refs 15766: Add some missing members to shared_lock thirdparty Signed-off-by: Miguel Barro --- include/fastrtps/utils/shared_mutex.hpp | 36 +++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/include/fastrtps/utils/shared_mutex.hpp b/include/fastrtps/utils/shared_mutex.hpp index ef3556e8474..000e52da630 100644 --- a/include/fastrtps/utils/shared_mutex.hpp +++ b/include/fastrtps/utils/shared_mutex.hpp @@ -569,6 +569,41 @@ shared_lock::try_lock() return owns_; } +template +template +bool +shared_lock::try_lock_until( + const std::chrono::time_point& abs_time) +{ + if (m_ == nullptr) + throw std::system_error(std::error_code(EPERM, std::system_category()), + "shared_lock::try_lock_until: references null mutex"); + if (owns_) + throw std::system_error(std::error_code(EDEADLK, std::system_category()), + "shared_lock::try_lock_until: already locked"); + owns_ = m_->try_lock_shared_until(abs_time); + return owns_; +} + +template +void +shared_lock::unlock() +{ + if (!owns_) + throw std::system_error(std::error_code(EPERM, std::system_category()), + "shared_lock::unlock: not locked"); + m_->unlock_shared(); + owns_ = false; +} + +template +inline +void +swap(shared_lock& x, shared_lock& y) +{ + x.swap(y); +} + } //namespace eprosima #else // fallback to STL @@ -579,6 +614,7 @@ namespace eprosima { using std::shared_mutex; using std::shared_lock; +using std::swap; } //namespace eprosima From 0f45a4de8b2fdb5407947ff511a82d5868f132ac Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Mon, 10 Oct 2022 08:02:00 +0200 Subject: [PATCH 11/19] Refs 15766: linter Signed-off-by: Miguel Barro --- include/fastrtps/utils/shared_mutex.hpp | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/include/fastrtps/utils/shared_mutex.hpp b/include/fastrtps/utils/shared_mutex.hpp index 000e52da630..c823fef09d2 100644 --- a/include/fastrtps/utils/shared_mutex.hpp +++ b/include/fastrtps/utils/shared_mutex.hpp @@ -573,14 +573,18 @@ template template bool shared_lock::try_lock_until( - const std::chrono::time_point& abs_time) + const std::chrono::time_point& abs_time) { if (m_ == nullptr) + { throw std::system_error(std::error_code(EPERM, std::system_category()), - "shared_lock::try_lock_until: references null mutex"); + "shared_lock::try_lock_until: references null mutex"); + } if (owns_) + { throw std::system_error(std::error_code(EDEADLK, std::system_category()), - "shared_lock::try_lock_until: already locked"); + "shared_lock::try_lock_until: already locked"); + } owns_ = m_->try_lock_shared_until(abs_time); return owns_; } @@ -590,8 +594,10 @@ void shared_lock::unlock() { if (!owns_) + { throw std::system_error(std::error_code(EPERM, std::system_category()), - "shared_lock::unlock: not locked"); + "shared_lock::unlock: not locked"); + } m_->unlock_shared(); owns_ = false; } @@ -599,7 +605,9 @@ shared_lock::unlock() template inline void -swap(shared_lock& x, shared_lock& y) +swap( + shared_lock& x, + shared_lock& y) { x.swap(y); } From 07e067e6e94a3e4808a3c67bafd7ebc31e7481f8 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Tue, 8 Nov 2022 13:07:08 +0100 Subject: [PATCH 12/19] Refs 15766. shared_mutex testing Signed-off-by: Miguel Barro --- cmake/common/gtest.cmake | 5 + test/unittest/utils/CMakeLists.txt | 8 +- test/unittest/utils/shared_mutex_tests.cpp | 221 +++++++++++++++++++++ 3 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 test/unittest/utils/shared_mutex_tests.cpp diff --git a/cmake/common/gtest.cmake b/cmake/common/gtest.cmake index 2f5e8fbd344..454708daaa5 100644 --- a/cmake/common/gtest.cmake +++ b/cmake/common/gtest.cmake @@ -59,6 +59,11 @@ macro(add_gtest) # Normal tests file(STRINGS ${GTEST_SOURCE_FILE} GTEST_TEST_NAMES REGEX "^([T][Y][P][E][D][_])?TEST") foreach(GTEST_TEST_NAME ${GTEST_TEST_NAMES}) + + if(GTEST_TEST_NAME MATCHES "TYPED_TEST_SUITE") + continue() + endif() + string(REGEX REPLACE ["\) \(,"] ";" GTEST_TEST_NAME ${GTEST_TEST_NAME}) list(GET GTEST_TEST_NAME 1 GTEST_GROUP_NAME) list(GET GTEST_TEST_NAME 3 GTEST_TEST_NAME) diff --git a/test/unittest/utils/CMakeLists.txt b/test/unittest/utils/CMakeLists.txt index 876f7678e02..38f219c652a 100644 --- a/test/unittest/utils/CMakeLists.txt +++ b/test/unittest/utils/CMakeLists.txt @@ -139,6 +139,12 @@ target_include_directories(SystemInfoTests PRIVATE target_link_libraries(SystemInfoTests GTest::gtest) add_gtest(SystemInfoTests SOURCES ${SYSTEMINFOTESTS_SOURCE}) +add_executable(SharedMutexTests shared_mutex_tests.cpp) +target_compile_definitions(SharedMutexTests PUBLIC USE_THIRDPARTY_SHARED_MUTEX=1) +target_include_directories(SharedMutexTests PUBLIC ${PROJECT_SOURCE_DIR}/include) +target_link_libraries(SharedMutexTests PUBLIC GTest::gtest) +add_gtest(SharedMutexTests SOURCES shared_mutex_tests.cpp) + ############################################################################### # Necessary files ############################################################################### @@ -155,4 +161,4 @@ if(ANDROID) set_property(TARGET LocatorTests PROPERTY CROSSCOMPILING_EMULATOR "adb;shell;cd;${CMAKE_CURRENT_BINARY_DIR};&&") set_property(TARGET FixedSizeStringTests PROPERTY CROSSCOMPILING_EMULATOR "adb;shell;cd;${CMAKE_CURRENT_BINARY_DIR};&&") set_property(TARGET SystemInfoTests PROPERTY CROSSCOMPILING_EMULATOR "adb;shell;cd;${CMAKE_CURRENT_BINARY_DIR};&&") -endif() \ No newline at end of file +endif() diff --git a/test/unittest/utils/shared_mutex_tests.cpp b/test/unittest/utils/shared_mutex_tests.cpp new file mode 100644 index 00000000000..4525f1f31d3 --- /dev/null +++ b/test/unittest/utils/shared_mutex_tests.cpp @@ -0,0 +1,221 @@ +// Copyright (c) Microsoft Corporation. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// Excerpts from: STL/main/tests/std/tests/Dev11_1150223_shared_mutex/test.cpp + +#include +#include +#include +#include +#include + +#include +#include + +using namespace std; +using namespace eprosima; + +template +class SharedMutexTest : public testing::Test +{ + +public: + + using Mutex = T; + + void join_and_clear( + vector& threads) + { + for (auto& t : threads) + { + t.join(); + } + + threads.clear(); + } + +}; + +using SharedMutexTypes = ::testing::Types< + detail::shared_mutex, + detail::shared_mutex, + detail::debug_wrapper>, + detail::debug_wrapper>>; + +TYPED_TEST_SUITE(SharedMutexTest, SharedMutexTypes); + +TYPED_TEST(SharedMutexTest, test_one_writer) +{ + // One simultaneous writer. + atomic atom(-1); + Mutex mut; + vector threads; + + for (int i = 0; i < 4; ++i) + { + threads.emplace_back([&atom, &mut] + { + while (atom == -1) + { + } + lock_guard ExclusiveLock(mut); + const int val = ++atom; + this_thread::sleep_for(25ms); // Not a timing assumption. + ASSERT_EQ(atom, val); + }); + } + + ASSERT_EQ(atom.exchange(0), -1); + join_and_clear(threads); + ASSERT_EQ(atom, 4); +} + +TYPED_TEST(SharedMutexTest, test_multiple_readers) +{ + // Many simultaneous readers. + atomic atom(-1); + Mutex mut; + vector threads; + + for (int i = 0; i < 4; ++i) + { + threads.emplace_back([&atom, &mut] + { + while (atom == -1) + { + } + shared_lock SharedLock(mut); + ++atom; + while (atom < 4) + { + } + }); + } + + ASSERT_EQ(atom.exchange(0), -1); + join_and_clear(threads); + ASSERT_EQ(atom, 4); +} + +TYPED_TEST(SharedMutexTest, test_writer_blocking_readers) +{ + // One writer blocking many readers. + atomic atom(-4); + Mutex mut; + vector threads; + + threads.emplace_back([&atom, &mut] + { + while (atom < 0) + { + } + lock_guard ExclusiveLock(mut); + ASSERT_EQ(atom.exchange(1000), 0); + this_thread::sleep_for(50ms); // Not a timing assumption. + ASSERT_EQ(atom.exchange(1729), 1000); + }); + + for (int i = 0; i < 4; ++i) + { + threads.emplace_back([&atom, &mut] + { + ++atom; + while (atom < 1000) + { + } + shared_lock SharedLock(mut); + ASSERT_EQ(atom, 1729); + }); + } + + join_and_clear(threads); + ASSERT_EQ(atom, 1729); +} + +TYPED_TEST(SharedMutexTest, test_readers_blocking_writer) +{ + // Many readers blocking one writer. + atomic atom(-5); + Mutex mut; + vector threads; + + for (int i = 0; i < 4; ++i) + { + threads.emplace_back([&atom, &mut] + { + shared_lock SharedLock(mut); + ++atom; + while (atom < 0) + { + } + this_thread::sleep_for(50ms); // Not a timing assumption. + atom += 10; + }); + } + + threads.emplace_back([&atom, &mut] + { + ++atom; + while (atom < 0) + { + } + lock_guard ExclusiveLock(mut); + ASSERT_EQ(atom, 40); + }); + + join_and_clear(threads); + ASSERT_EQ(atom, 40); +} + +TYPED_TEST(SharedMutexTest, test_try_lock_and_try_lock_shared) +{ + // Test try_lock() and try_lock_shared(). + Mutex mut; + + { + unique_lock MainExclusive(mut, try_to_lock); + ASSERT_TRUE(MainExclusive.owns_lock()); + + thread t([&mut] + { + { + unique_lock ExclusiveLock(mut, try_to_lock); + ASSERT_FALSE(ExclusiveLock.owns_lock()); + } + + { + shared_lock SharedLock(mut, try_to_lock); + ASSERT_FALSE(SharedLock.owns_lock()); + } + }); + + t.join(); + } + + { + shared_lock MainShared(mut, try_to_lock); + ASSERT_TRUE(MainShared.owns_lock()); + + thread t([&mut] + { + { + unique_lock ExclusiveLock(mut, try_to_lock); + ASSERT_FALSE(ExclusiveLock.owns_lock()); + } + + { + shared_lock SharedLock(mut, try_to_lock); + ASSERT_TRUE(SharedLock.owns_lock()); + } + }); + + t.join(); + } +} + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 0426db909dd5412da3fbe55731a4e1f27748c6b5 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Tue, 8 Nov 2022 15:15:54 +0100 Subject: [PATCH 13/19] Refs 15766. fixing gtest backward compatibility issues Signed-off-by: Miguel Barro --- test/unittest/utils/shared_mutex_tests.cpp | 54 +++++++++++----------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/test/unittest/utils/shared_mutex_tests.cpp b/test/unittest/utils/shared_mutex_tests.cpp index 4525f1f31d3..2187fdde071 100644 --- a/test/unittest/utils/shared_mutex_tests.cpp +++ b/test/unittest/utils/shared_mutex_tests.cpp @@ -12,16 +12,14 @@ #include using namespace std; +using namespace std::chrono; using namespace eprosima; -template +template class SharedMutexTest : public testing::Test { - public: - using Mutex = T; - void join_and_clear( vector& threads) { @@ -47,7 +45,7 @@ TYPED_TEST(SharedMutexTest, test_one_writer) { // One simultaneous writer. atomic atom(-1); - Mutex mut; + TypeParam mut; vector threads; for (int i = 0; i < 4; ++i) @@ -57,15 +55,15 @@ TYPED_TEST(SharedMutexTest, test_one_writer) while (atom == -1) { } - lock_guard ExclusiveLock(mut); + lock_guard ExclusiveLock(mut); const int val = ++atom; - this_thread::sleep_for(25ms); // Not a timing assumption. + this_thread::sleep_for(milliseconds(25)); // Not a timing assumption. ASSERT_EQ(atom, val); }); } ASSERT_EQ(atom.exchange(0), -1); - join_and_clear(threads); + this->join_and_clear(threads); ASSERT_EQ(atom, 4); } @@ -73,7 +71,7 @@ TYPED_TEST(SharedMutexTest, test_multiple_readers) { // Many simultaneous readers. atomic atom(-1); - Mutex mut; + TypeParam mut; vector threads; for (int i = 0; i < 4; ++i) @@ -83,7 +81,7 @@ TYPED_TEST(SharedMutexTest, test_multiple_readers) while (atom == -1) { } - shared_lock SharedLock(mut); + shared_lock SharedLock(mut); ++atom; while (atom < 4) { @@ -92,7 +90,7 @@ TYPED_TEST(SharedMutexTest, test_multiple_readers) } ASSERT_EQ(atom.exchange(0), -1); - join_and_clear(threads); + this->join_and_clear(threads); ASSERT_EQ(atom, 4); } @@ -100,7 +98,7 @@ TYPED_TEST(SharedMutexTest, test_writer_blocking_readers) { // One writer blocking many readers. atomic atom(-4); - Mutex mut; + TypeParam mut; vector threads; threads.emplace_back([&atom, &mut] @@ -108,9 +106,9 @@ TYPED_TEST(SharedMutexTest, test_writer_blocking_readers) while (atom < 0) { } - lock_guard ExclusiveLock(mut); + lock_guard ExclusiveLock(mut); ASSERT_EQ(atom.exchange(1000), 0); - this_thread::sleep_for(50ms); // Not a timing assumption. + this_thread::sleep_for(milliseconds(50)); // Not a timing assumption. ASSERT_EQ(atom.exchange(1729), 1000); }); @@ -122,12 +120,12 @@ TYPED_TEST(SharedMutexTest, test_writer_blocking_readers) while (atom < 1000) { } - shared_lock SharedLock(mut); + shared_lock SharedLock(mut); ASSERT_EQ(atom, 1729); }); } - join_and_clear(threads); + this->join_and_clear(threads); ASSERT_EQ(atom, 1729); } @@ -135,19 +133,19 @@ TYPED_TEST(SharedMutexTest, test_readers_blocking_writer) { // Many readers blocking one writer. atomic atom(-5); - Mutex mut; + TypeParam mut; vector threads; for (int i = 0; i < 4; ++i) { threads.emplace_back([&atom, &mut] { - shared_lock SharedLock(mut); + shared_lock SharedLock(mut); ++atom; while (atom < 0) { } - this_thread::sleep_for(50ms); // Not a timing assumption. + this_thread::sleep_for(milliseconds(50)); // Not a timing assumption. atom += 10; }); } @@ -158,32 +156,32 @@ TYPED_TEST(SharedMutexTest, test_readers_blocking_writer) while (atom < 0) { } - lock_guard ExclusiveLock(mut); + lock_guard ExclusiveLock(mut); ASSERT_EQ(atom, 40); }); - join_and_clear(threads); + this->join_and_clear(threads); ASSERT_EQ(atom, 40); } TYPED_TEST(SharedMutexTest, test_try_lock_and_try_lock_shared) { // Test try_lock() and try_lock_shared(). - Mutex mut; + TypeParam mut; { - unique_lock MainExclusive(mut, try_to_lock); + unique_lock MainExclusive(mut, try_to_lock); ASSERT_TRUE(MainExclusive.owns_lock()); thread t([&mut] { { - unique_lock ExclusiveLock(mut, try_to_lock); + unique_lock ExclusiveLock(mut, try_to_lock); ASSERT_FALSE(ExclusiveLock.owns_lock()); } { - shared_lock SharedLock(mut, try_to_lock); + shared_lock SharedLock(mut, try_to_lock); ASSERT_FALSE(SharedLock.owns_lock()); } }); @@ -192,18 +190,18 @@ TYPED_TEST(SharedMutexTest, test_try_lock_and_try_lock_shared) } { - shared_lock MainShared(mut, try_to_lock); + shared_lock MainShared(mut, try_to_lock); ASSERT_TRUE(MainShared.owns_lock()); thread t([&mut] { { - unique_lock ExclusiveLock(mut, try_to_lock); + unique_lock ExclusiveLock(mut, try_to_lock); ASSERT_FALSE(ExclusiveLock.owns_lock()); } { - shared_lock SharedLock(mut, try_to_lock); + shared_lock SharedLock(mut, try_to_lock); ASSERT_TRUE(SharedLock.owns_lock()); } }); From 4e69d828e34e16749f5fa8ed7776a5ddc34cfc79 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Wed, 9 Nov 2022 09:46:47 +0100 Subject: [PATCH 14/19] Refs 15766. Add a new test to check priority is right Signed-off-by: Miguel Barro --- test/unittest/utils/shared_mutex_tests.cpp | 60 ++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/test/unittest/utils/shared_mutex_tests.cpp b/test/unittest/utils/shared_mutex_tests.cpp index 2187fdde071..9254ceae19f 100644 --- a/test/unittest/utils/shared_mutex_tests.cpp +++ b/test/unittest/utils/shared_mutex_tests.cpp @@ -15,6 +15,24 @@ using namespace std; using namespace std::chrono; using namespace eprosima; +template +detail::shared_mutex_type get_mutex_priority( + const Mutex& m); + +template +detail::shared_mutex_type get_mutex_priority( + const detail::shared_mutex&) +{ + return mt; +} + +template +detail::shared_mutex_type get_mutex_priority( + const detail::debug_wrapper>&) +{ + return mt; +} + template class SharedMutexTest : public testing::Test { @@ -210,6 +228,48 @@ TYPED_TEST(SharedMutexTest, test_try_lock_and_try_lock_shared) } } +TYPED_TEST(SharedMutexTest, test_mutex_priority) +{ + TypeParam sm; + atomic_bool mark = false; + + // take first shared lock + sm.lock_shared(); + + // signal is taken + thread exclusive([&]() + { + mark = true; + lock_guard guard(sm); + }); + + // Wait till the thread takes the lock + do + { + this_thread::sleep_for(chrono::milliseconds(100)); + } + while (!mark); + + // try take the second shared lock + bool success = sm.try_lock_shared(); + if (success) + { + sm.unlock_shared(); + ASSERT_EQ(get_mutex_priority(sm), + detail::shared_mutex_type::PTHREAD_RWLOCK_PREFER_READER_NP); + } + else + { + ASSERT_EQ(get_mutex_priority(sm), + detail::shared_mutex_type::PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); + } + + // release first lock + sm.unlock_shared(); + // wait for the main thread + exclusive.join(); +} + int main( int argc, char** argv) From 06f299a3c83f8c60da607798eb70118e577f8931 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Wed, 9 Nov 2022 17:16:37 +0100 Subject: [PATCH 15/19] Refs 15766. Fixing gcc build warnings Signed-off-by: Miguel Barro --- test/unittest/utils/shared_mutex_tests.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/unittest/utils/shared_mutex_tests.cpp b/test/unittest/utils/shared_mutex_tests.cpp index 9254ceae19f..9c25500cfc1 100644 --- a/test/unittest/utils/shared_mutex_tests.cpp +++ b/test/unittest/utils/shared_mutex_tests.cpp @@ -231,15 +231,15 @@ TYPED_TEST(SharedMutexTest, test_try_lock_and_try_lock_shared) TYPED_TEST(SharedMutexTest, test_mutex_priority) { TypeParam sm; - atomic_bool mark = false; + atomic_bool mark{false}; // take first shared lock sm.lock_shared(); // signal is taken - thread exclusive([&]() + thread exclusive([&] { - mark = true; + mark.store(true); lock_guard guard(sm); }); From 045242fbc55e1ee36e6e7c05243eb7a3f6dd9e12 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Wed, 9 Nov 2022 17:30:43 +0100 Subject: [PATCH 16/19] Refs 15766. Fixing clang build warnings Signed-off-by: Miguel Barro --- test/unittest/utils/shared_mutex_tests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unittest/utils/shared_mutex_tests.cpp b/test/unittest/utils/shared_mutex_tests.cpp index 9c25500cfc1..eba86e73e46 100644 --- a/test/unittest/utils/shared_mutex_tests.cpp +++ b/test/unittest/utils/shared_mutex_tests.cpp @@ -57,7 +57,7 @@ using SharedMutexTypes = ::testing::Types< detail::debug_wrapper>, detail::debug_wrapper>>; -TYPED_TEST_SUITE(SharedMutexTest, SharedMutexTypes); +TYPED_TEST_SUITE(SharedMutexTest, SharedMutexTypes, ); TYPED_TEST(SharedMutexTest, test_one_writer) { From 9838980f5622a0df42b28709882661ae7f47550b Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Wed, 9 Nov 2022 21:08:23 +0100 Subject: [PATCH 17/19] Refs 15766. Make thirdparty versions always available even if none is used as eprosima::shared_mutex. Signed-off-by: Miguel Barro --- include/fastrtps/utils/shared_mutex.hpp | 162 ++++++++++++++---------- 1 file changed, 93 insertions(+), 69 deletions(-) diff --git a/include/fastrtps/utils/shared_mutex.hpp b/include/fastrtps/utils/shared_mutex.hpp index c823fef09d2..7726691713b 100644 --- a/include/fastrtps/utils/shared_mutex.hpp +++ b/include/fastrtps/utils/shared_mutex.hpp @@ -32,27 +32,6 @@ #ifndef _UTILS_SHARED_MUTEX_HPP_ #define _UTILS_SHARED_MUTEX_HPP_ -#ifndef USE_THIRDPARTY_SHARED_MUTEX -# if defined(_MSC_VER) && _MSVC_LANG < 202302L -# pragma message("warning: USE_THIRDPARTY_SHARED_MUTEX not defined. By default use framework version.") -# else -# warning "USE_THIRDPARTY_SHARED_MUTEX not defined. By default use framework version." -# endif // if defined(_MSC_VER) && _MSVC_LANG < 202302L -# define USE_THIRDPARTY_SHARED_MUTEX 0 -#endif // ifndef USE_THIRDPARTY_SHARED_MUTEX - -#if defined(__has_include) && __has_include() -# include -#endif // if defined(__has_include) && __has_include() - -// Detect if the share_mutex feature is available -#if defined(__has_include) && __has_include() && !defined(__cpp_lib_shared_mutex) || \ - /* allow users to ignore shared_mutex framework implementation */ \ - (~USE_THIRDPARTY_SHARED_MUTEX + 1) || \ - /* deprecated procedure if the good one is not available*/ \ - ( !(defined(__has_include) && __has_include()) && \ - !(defined(HAVE_CXX17) && HAVE_CXX17) && __cplusplus < 201703 ) - #include #include #include @@ -316,55 +295,19 @@ class debug_wrapper : public sm }; } // namespace detail +} // namespace eprosima -/* - Fast-DDS defaults to PTHREAD_RWLOCK_PREFER_READER_NP for two main reasons: - - - It allows reader side recursiveness. If we have two threads (T1, T2) and - called S a shared lock and E and exclusive one. - - T1: S -> S - T2: E - - PTHREAD_RWLOCK_PREFER_READER_NP will never deadlock. The S locks are not - influenced by the E locks. - - PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP will deadlock. before T1 - takes S twice. That happens because: - + T1's second S will wait for E (writer is prioritized) - + E will wait for T1's first S lock (writer needs atomic access) - + T1's first S cannot unlock because is blocked in the second S. - - Thus, shared_mutex is - non-recursive. - - - It prevents ABBA deadlocks with other mutexes. If we have three threads - (Ti) and P is an ordinary mutex: - - T1: P -> S - T2: S -> P - T3: E - - PTHREAD_RWLOCK_PREFER_READER_NP will never deadlock. The S locks are not - influenced by the E locks. Starvation issues can be managed in the user - code. - - PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP will deadlock if T3 takes E - before T1 takes S. That happens because: - + T1's S will wait for E (writer is prioritized) - + E will wait for T2's S lock (writer needs atomic access) - + T2's S cannot unlock because is blocked in P (owned by T1). +#if defined(__has_include) && __has_include() +# include +#endif // if defined(__has_include) && __has_include() - Thus, shared_mutex must be - managed like an ordinary mutex in deadlock sense. - */ +// Detect if the shared_lock feature is available +#if defined(__has_include) && __has_include() && !defined(__cpp_lib_shared_mutex) || \ + /* deprecated procedure if the good one is not available*/ \ + ( !(defined(__has_include) && __has_include()) && \ + !(defined(HAVE_CXX17) && HAVE_CXX17) && __cplusplus < 201703 ) -#ifdef NDEBUG -using shared_mutex = detail::shared_mutex; -#else -using shared_mutex = - detail::debug_wrapper>; -#endif // NDEBUG +namespace eprosima { template class shared_lock @@ -620,12 +563,93 @@ swap( namespace eprosima { -using std::shared_mutex; using std::shared_lock; using std::swap; } //namespace eprosima -#endif // if (__cplusplus < 201402) || (defined(_MSC_VER) && _MSC_VER < 1900 ) +#endif // shared_lock selection + +#ifndef USE_THIRDPARTY_SHARED_MUTEX +# if defined(_MSC_VER) && _MSVC_LANG < 202302L +# pragma message("warning: USE_THIRDPARTY_SHARED_MUTEX not defined. By default use framework version.") +# else +# warning "USE_THIRDPARTY_SHARED_MUTEX not defined. By default use framework version." +# endif // if defined(_MSC_VER) && _MSVC_LANG < 202302L +# define USE_THIRDPARTY_SHARED_MUTEX 0 +#endif // ifndef USE_THIRDPARTY_SHARED_MUTEX + +// Detect if the share_mutex feature is available or if the user forces it +#if defined(__has_include) && __has_include() && !defined(__cpp_lib_shared_mutex) || \ + /* allow users to ignore shared_mutex framework implementation */ \ + (~USE_THIRDPARTY_SHARED_MUTEX + 1) || \ + /* deprecated procedure if the good one is not available*/ \ + ( !(defined(__has_include) && __has_include()) && \ + !(defined(HAVE_CXX17) && HAVE_CXX17) && __cplusplus < 201703 ) + +/* + Fast-DDS defaults to PTHREAD_RWLOCK_PREFER_READER_NP for two main reasons: + + - It allows reader side recursiveness. If we have two threads (T1, T2) and + called S a shared lock and E and exclusive one. + + T1: S -> S + T2: E + + PTHREAD_RWLOCK_PREFER_READER_NP will never deadlock. The S locks are not + influenced by the E locks. + + PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP will deadlock. before T1 + takes S twice. That happens because: + + T1's second S will wait for E (writer is prioritized) + + E will wait for T1's first S lock (writer needs atomic access) + + T1's first S cannot unlock because is blocked in the second S. + + Thus, shared_mutex is + non-recursive. + + - It prevents ABBA deadlocks with other mutexes. If we have three threads + (Ti) and P is an ordinary mutex: + + T1: P -> S + T2: S -> P + T3: E + + PTHREAD_RWLOCK_PREFER_READER_NP will never deadlock. The S locks are not + influenced by the E locks. Starvation issues can be managed in the user + code. + + PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP will deadlock if T3 takes E + before T1 takes S. That happens because: + + T1's S will wait for E (writer is prioritized) + + E will wait for T2's S lock (writer needs atomic access) + + T2's S cannot unlock because is blocked in P (owned by T1). + + Thus, shared_mutex must be + managed like an ordinary mutex in deadlock sense. + */ + +namespace eprosima { + +#ifdef NDEBUG +using shared_mutex = detail::shared_mutex; +#else +using shared_mutex = + detail::debug_wrapper>; +#endif // NDEBUG + +} //namespace eprosima + +#else // fallback to STL + +#include + +namespace eprosima { + +using std::shared_mutex; + +} //namespace eprosima + +#endif // shared_mutex selection #endif // _UTILS_SHARED_MUTEX_HPP_ From 50388f0b314d6f1d5d7ebbc72ac9dad176140e2a Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Fri, 11 Nov 2022 11:00:19 +0100 Subject: [PATCH 18/19] Refs 15766. Addressing reviewers comments Signed-off-by: Miguel Barro --- include/fastrtps/utils/shared_mutex.hpp | 8 ++++---- test/blackbox/common/DDSBlackboxTestsBasic.cpp | 17 +++++++++++++---- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/include/fastrtps/utils/shared_mutex.hpp b/include/fastrtps/utils/shared_mutex.hpp index 7726691713b..d6924144eb7 100644 --- a/include/fastrtps/utils/shared_mutex.hpp +++ b/include/fastrtps/utils/shared_mutex.hpp @@ -245,8 +245,8 @@ class debug_wrapper : public sm bool try_lock() { - std::lock_guard _(wm_); bool res = sm::try_lock(); + std::lock_guard _(wm_); if (res) { exclusive_owner_ = std::this_thread::get_id(); @@ -256,8 +256,8 @@ class debug_wrapper : public sm void unlock() { - std::lock_guard _(wm_); sm::unlock(); + std::lock_guard _(wm_); exclusive_owner_ = std::thread::id(); } @@ -272,8 +272,8 @@ class debug_wrapper : public sm bool try_lock_shared() { - std::lock_guard _(wm_); bool res = sm::try_lock_shared(); + std::lock_guard _(wm_); if (res) { ++shared_owners_[std::this_thread::get_id()]; @@ -283,8 +283,8 @@ class debug_wrapper : public sm void unlock_shared() { - std::lock_guard _(wm_); sm::unlock_shared(); + std::lock_guard _(wm_); auto owner = shared_owners_.find(std::this_thread::get_id()); if ( owner != shared_owners_.end() && 0 == --owner->second ) { diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index 77a35de3065..f9e6110a383 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -184,7 +184,7 @@ TEST(DDSBasic, MultithreadedPublisherCreation) ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant)); } -TEST(DDSBasic, MultithreadedReaderCreation) +TEST(DDSBasic, MultithreadedReaderCreationDoesNotDeadlock) { // Get factory DomainParticipantFactory* factory = DomainParticipantFactory::get_instance(); @@ -212,18 +212,27 @@ TEST(DDSBasic, MultithreadedReaderCreation) Topic* topic = participant->create_topic(TEST_TOPIC_NAME, type_support.get_type_name(), TOPIC_QOS_DEFAULT); ASSERT_NE(nullptr, topic); + // Set QoS + DataSharingQosPolicy dsp; + dsp.off(); + + DataWriterQos dw_qos; + DataReaderQos dr_qos; + dw_qos.data_sharing(dsp); + dr_qos.data_sharing(dsp); + // Create DataWriter - DataWriter* writer = publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT); + DataWriter* writer = publisher->create_datawriter(topic, dw_qos); ASSERT_NE(nullptr, writer); std::mutex mtx; std::condition_variable cv; bool should_finish = false; - auto thread_run = [subscriber, topic, &mtx, &cv, &should_finish]() + auto thread_run = [subscriber, topic, &mtx, &cv, &should_finish, &dr_qos]() { // Create reader - DataReader* reader = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT); + DataReader* reader = subscriber->create_datareader(topic, dr_qos); ASSERT_NE(nullptr, reader); // Wait for test completion request From 9d7259a4a029922bdbbed7202cfbcda75df2e735 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Fri, 11 Nov 2022 23:08:21 +0100 Subject: [PATCH 19/19] Refs 15766. Fixing sync issue on ProxyPool. Signed-off-by: Miguel Barro --- include/fastrtps/utils/ProxyPool.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/fastrtps/utils/ProxyPool.hpp b/include/fastrtps/utils/ProxyPool.hpp index f4d3b31c0df..6d812623d6e 100644 --- a/include/fastrtps/utils/ProxyPool.hpp +++ b/include/fastrtps/utils/ProxyPool.hpp @@ -154,6 +154,9 @@ class ProxyPool // return the resource mask_.set(idx); + + // notify the resource is free + cv_.notify_one(); } public: