diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp index 7189db05c40..64e533f7462 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp @@ -1805,12 +1805,12 @@ void DomainParticipantImpl::create_instance_handle( { using eprosima::fastrtps::rtps::octet; - ++next_instance_id_; + uint32_t id = ++next_instance_id_; handle = guid_; handle.value[15] = 0x01; // Vendor specific; - handle.value[14] = static_cast(next_instance_id_ & 0xFF); - handle.value[13] = static_cast((next_instance_id_ >> 8) & 0xFF); - handle.value[12] = static_cast((next_instance_id_ >> 16) & 0xFF); + handle.value[14] = static_cast(id & 0xFF); + handle.value[13] = static_cast((id >> 8) & 0xFF); + handle.value[12] = static_cast((id >> 16) & 0xFF); } DomainParticipantListener* DomainParticipantImpl::get_listener_for( diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp index 060a86dfdc6..e5490f33e5a 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp @@ -402,7 +402,7 @@ class DomainParticipantImpl fastrtps::rtps::GUID_t guid_; //!For instance handle creation - uint32_t next_instance_id_; + std::atomic next_instance_id_; //!Participant Qos DomainParticipantQos qos_; diff --git a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp new file mode 100644 index 00000000000..97f94ca62af --- /dev/null +++ b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp @@ -0,0 +1,1408 @@ +#ifndef _RTPS_FLOWCONTROL_FLOWCONTROLLERIMPL_HPP_ +#define _RTPS_FLOWCONTROL_FLOWCONTROLLERIMPL_HPP_ + +#include "FlowController.hpp" +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/** Auxiliary classes **/ + +struct FlowQueue +{ + FlowQueue() noexcept = default; + + ~FlowQueue() noexcept + { + assert(new_interested_.is_empty()); + assert(old_interested_.is_empty()); + } + + FlowQueue( + FlowQueue&& old) noexcept + { + swap(std::move(old)); + } + + FlowQueue& operator =( + FlowQueue&& old) noexcept + { + swap(std::move(old)); + return *this; + } + + void swap( + FlowQueue&& old) noexcept + { + new_interested_.swap(old.new_interested_); + old_interested_.swap(old.old_interested_); + + new_ones_.swap(old.new_ones_); + old_ones_.swap(old.old_ones_); + } + + bool is_empty() const noexcept + { + return new_ones_.is_empty() && old_ones_.is_empty(); + } + + void add_new_sample( + fastrtps::rtps::CacheChange_t* change) noexcept + { + new_interested_.add_change(change); + } + + void add_old_sample( + fastrtps::rtps::CacheChange_t* change) noexcept + { + old_interested_.add_change(change); + } + + fastrtps::rtps::CacheChange_t* get_next_change() noexcept + { + if (!is_empty()) + { + return !new_ones_.is_empty() ? + new_ones_.head.writer_info.next : old_ones_.head.writer_info.next; + } + + return nullptr; + } + + void add_interested_changes_to_queue() noexcept + { + // This function should be called with mutex_ and interested_lock locked, because the queue is changed. + new_ones_.add_list(new_interested_); + old_ones_.add_list(old_interested_); + } + +private: + + struct ListInfo + { + ListInfo() noexcept + { + clear(); + } + + void swap( + ListInfo& other) noexcept + { + if (other.is_empty()) + { + clear(); + } + else + { + head.writer_info.next = other.head.writer_info.next; + tail.writer_info.previous = other.tail.writer_info.previous; + other.clear(); + head.writer_info.next->writer_info.previous = &head; + tail.writer_info.previous->writer_info.next = &tail; + } + } + + void clear() noexcept + { + head.writer_info.next = &tail; + tail.writer_info.previous = &head; + } + + bool is_empty() const noexcept + { + assert((&tail == head.writer_info.next && &head == tail.writer_info.previous) || + (&tail != head.writer_info.next && &head != tail.writer_info.previous)); + return &tail == head.writer_info.next; + } + + void add_change( + fastrtps::rtps::CacheChange_t* change) noexcept + { + bool expected = false; + if (change->writer_info.is_linked.compare_exchange_strong(expected, true)) + { + change->writer_info.previous = tail.writer_info.previous; + change->writer_info.previous->writer_info.next = change; + tail.writer_info.previous = change; + change->writer_info.next = &tail; + } + } + + void add_list( + ListInfo& list) noexcept + { + if (!list.is_empty()) + { + fastrtps::rtps::CacheChange_t* first = list.head.writer_info.next; + fastrtps::rtps::CacheChange_t* last = list.tail.writer_info.previous; + + first->writer_info.previous = tail.writer_info.previous; + first->writer_info.previous->writer_info.next = first; + last->writer_info.next = &tail; + tail.writer_info.previous = last; + + list.clear(); + } + } + + fastrtps::rtps::CacheChange_t head; + fastrtps::rtps::CacheChange_t tail; + }; + + //! List of interested new changes to be included. + //! Should be protected with changes_interested_mutex. + ListInfo new_interested_; + + //! List of interested old changes to be included. + //! Should be protected with changes_interested_mutex. + ListInfo old_interested_; + + //! List of new changes + //! Should be protected with mutex_. + ListInfo new_ones_; + + //! List of old changes + //! Should be protected with mutex_. + ListInfo old_ones_; +}; + +/** Classes used to specify FlowController's publication model **/ + +//! Only sends new samples synchronously. There is no mechanism to send old ones. +struct FlowControllerPureSyncPublishMode +{ + + FlowControllerPureSyncPublishMode( + fastrtps::rtps::RTPSParticipantImpl*, + const FlowControllerDescriptor*) + { + } + +}; + +//! Sends new samples asynchronously. Old samples are sent also asynchronously */ +struct FlowControllerAsyncPublishMode +{ + FlowControllerAsyncPublishMode( + fastrtps::rtps::RTPSParticipantImpl* participant, + const FlowControllerDescriptor*) + : group(participant, true) + { + } + + virtual ~FlowControllerAsyncPublishMode() + { + if (running) + { + { + std::unique_lock lock(changes_interested_mutex); + running = false; + cv.notify_one(); + } + thread.join(); + } + } + + bool fast_check_is_there_slot_for_change( + fastrtps::rtps::CacheChange_t*) const + { + return true; + } + + bool wait( + std::unique_lock& lock) + { + cv.wait(lock); + return false; + } + + bool force_wait() const + { + return false; + } + + void process_deliver_retcode( + const fastrtps::rtps::DeliveryRetCode&) + { + } + + std::thread thread; + + std::atomic_bool running {false}; + + std::condition_variable cv; + + fastrtps::rtps::RTPSMessageGroup group; + + //! Mutex for interested samples to be added. + std::mutex changes_interested_mutex; + + //! Used to warning async thread a writer wants to remove a sample. + std::atomic writers_interested_in_remove = {0}; +}; + +//! Sends new samples synchronously. Old samples are sent asynchronously */ +struct FlowControllerSyncPublishMode : public FlowControllerPureSyncPublishMode, public FlowControllerAsyncPublishMode +{ + + FlowControllerSyncPublishMode( + fastrtps::rtps::RTPSParticipantImpl* participant, + const FlowControllerDescriptor* descriptor) + : FlowControllerPureSyncPublishMode(participant, descriptor) + , FlowControllerAsyncPublishMode(participant, descriptor) + { + } + +}; + +//! Sends all samples asynchronously but with bandwidth limitation. +struct FlowControllerLimitedAsyncPublishMode : public FlowControllerAsyncPublishMode +{ + FlowControllerLimitedAsyncPublishMode( + fastrtps::rtps::RTPSParticipantImpl* participant, + const FlowControllerDescriptor* descriptor) + : FlowControllerAsyncPublishMode(participant, descriptor) + { + assert(nullptr != descriptor); + assert(0 < descriptor->max_bytes_per_period); + + max_bytes_per_period = descriptor->max_bytes_per_period; + period_ms = std::chrono::milliseconds(descriptor->period_ms); + group.set_sent_bytes_limitation(static_cast(max_bytes_per_period)); + } + + bool fast_check_is_there_slot_for_change( + fastrtps::rtps::CacheChange_t* change) + { + // Not fragmented sample, the fast check is if the serialized payload fit. + uint32_t size_to_check = change->serializedPayload.length; + + if (0 != change->getFragmentCount()) + { + // For fragmented sample, the fast check is the minor fragments fit. + size_to_check = change->serializedPayload.length % change->getFragmentSize(); + + if (0 == size_to_check) + { + size_to_check = change->getFragmentSize(); + } + + + } + + bool ret = (max_bytes_per_period - group.get_current_bytes_processed()) > size_to_check; + + if (!ret) + { + force_wait_ = true; + } + + return ret; + } + + /*! + * Wait until there is a new change added (notified by other thread) or there is a timeout (period was excedded and + * the bandwidth limitation has to be reset. + * + * @return false if the condition_variable was awaken because a new change was added. true if the condition_variable was awaken because the bandwidth limitation has to be reset. + */ + bool wait( + std::unique_lock& lock) + { + auto lapse = std::chrono::steady_clock::now() - last_period_; + bool reset_limit = true; + + if (lapse < period_ms) + { + if (std::cv_status::no_timeout == cv.wait_for(lock, period_ms - lapse)) + { + reset_limit = false; + } + } + + if (reset_limit) + { + last_period_ = std::chrono::steady_clock::now(); + force_wait_ = false; + group.reset_current_bytes_processed(); + } + + return reset_limit; + } + + bool force_wait() const + { + return force_wait_; + } + + void process_deliver_retcode( + const fastrtps::rtps::DeliveryRetCode& ret_value) + { + if (fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT == ret_value) + { + force_wait_ = true; + } + } + + int32_t max_bytes_per_period = 0; + + std::chrono::milliseconds period_ms; + +private: + + bool force_wait_ = false; + + std::chrono::steady_clock::time_point last_period_ = std::chrono::steady_clock::now(); +}; + + +/** Classes used to specify FlowController's sample scheduling **/ + +//! Fifo scheduling +struct FlowControllerFifoSchedule +{ + void register_writer( + fastrtps::rtps::RTPSWriter*) const + { + } + + void unregister_writer( + fastrtps::rtps::RTPSWriter*) const + { + } + + void work_done() const + { + // Do nothing + } + + void add_new_sample( + fastrtps::rtps::RTPSWriter*, + fastrtps::rtps::CacheChange_t* change) + { + queue_.add_new_sample(change); + } + + void add_old_sample( + fastrtps::rtps::RTPSWriter*, + fastrtps::rtps::CacheChange_t* change) + { + queue_.add_old_sample(change); + } + + /*! + * Returns the first sample in the queue. + * Default behaviour. + * Expects the queue is ordered. + * + * @return Pointer to next change to be sent. nullptr implies there is no sample to be sent or is forbidden due to + * bandwidth exceeded. + */ + fastrtps::rtps::CacheChange_t* get_next_change_nts() + { + return queue_.get_next_change(); + } + + /*! + * Store the sample at the end of the list. + * + * @return true if there is added changes. + */ + void add_interested_changes_to_queue_nts() + { + // This function should be called with mutex_ and interested_lock locked, because the queue is changed. + queue_.add_interested_changes_to_queue(); + } + + void set_bandwith_limitation( + uint32_t) const + { + } + + void trigger_bandwidth_limit_reset() const + { + } + +private: + + //! Scheduler queue. FIFO scheduler only has one queue. + FlowQueue queue_; +}; + +//! Round Robin scheduling +struct FlowControllerRoundRobinSchedule +{ + using element = std::tuple; + using container = std::vector; + using iterator = container::iterator; + + FlowControllerRoundRobinSchedule() + { + next_writer_ = writers_queue_.begin(); + } + + void register_writer( + fastrtps::rtps::RTPSWriter* writer) + { + fastrtps::rtps::RTPSWriter* current_writer = nullptr; + + if (writers_queue_.end() != next_writer_) + { + current_writer = std::get<0>(*next_writer_); + } + + assert(writers_queue_.end() == find(writer)); + writers_queue_.emplace_back(writer, FlowQueue()); + + if (nullptr == current_writer) + { + next_writer_ = writers_queue_.begin(); + } + else + { + next_writer_ = find(current_writer); + } + } + + void unregister_writer( + fastrtps::rtps::RTPSWriter* writer) + { + // Queue cannot be empty, as writer should be present + assert(writers_queue_.end() != next_writer_); + fastrtps::rtps::RTPSWriter* current_writer = std::get<0>(*next_writer_); + assert(nullptr != current_writer); + + auto it = find(writer); + assert(it != writers_queue_.end()); + assert(std::get<1>(*it).is_empty()); + + // Go to the next writer when unregistering the current one + if (it == next_writer_) + { + set_next_writer(); + current_writer = std::get<0>(*next_writer_); + } + + writers_queue_.erase(it); + if (writer == current_writer) + { + next_writer_ = writers_queue_.begin(); + } + else + { + next_writer_ = find(current_writer); + } + } + + void work_done() + { + assert(0 < writers_queue_.size()); + assert(writers_queue_.end() != next_writer_); + set_next_writer(); + } + + iterator set_next_writer() + { + iterator next = std::next(next_writer_); + next_writer_ = writers_queue_.end() == next ? writers_queue_.begin() : next; + return next_writer_; + } + + void add_new_sample( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change) + { + auto it = find(writer); + assert(it != writers_queue_.end()); + std::get<1>(*it).add_new_sample(change); + } + + void add_old_sample( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change) + { + auto it = find(writer); + assert(it != writers_queue_.end()); + std::get<1>(*it).add_old_sample(change); + } + + fastrtps::rtps::CacheChange_t* get_next_change_nts() + { + fastrtps::rtps::CacheChange_t* ret_change = nullptr; + + if (0 < writers_queue_.size()) + { + auto starting_it = next_writer_; // For avoid loops. + + do + { + ret_change = std::get<1>(*next_writer_).get_next_change(); + } while (nullptr == ret_change && starting_it != set_next_writer()); + } + + return ret_change; + } + + void add_interested_changes_to_queue_nts() + { + // This function should be called with mutex_ and interested_lock locked, because the queue is changed. + for (auto& queue : writers_queue_) + { + std::get<1>(queue).add_interested_changes_to_queue(); + } + } + + void set_bandwith_limitation( + uint32_t) const + { + } + + void trigger_bandwidth_limit_reset() const + { + } + +private: + + iterator find( + const fastrtps::rtps::RTPSWriter* writer) + { + return std::find_if(writers_queue_.begin(), writers_queue_.end(), + [writer](const element& current_writer) -> bool + { + return writer == std::get<0>(current_writer); + }); + } + + container writers_queue_; + iterator next_writer_; + +}; + +//! High priority scheduling +struct FlowControllerHighPrioritySchedule +{ + void register_writer( + fastrtps::rtps::RTPSWriter* writer) + { + assert(nullptr != writer); + int32_t priority = 10; + auto property = fastrtps::rtps::PropertyPolicyHelper::find_property( + writer->getAttributes().properties, "fastdds.sfc.priority"); + + if (nullptr != property) + { + char* ptr = nullptr; + priority = strtol(property->c_str(), &ptr, 10); + + if (property->c_str() != ptr) // A valid integer was read. + { + if (-10 > priority || 10 < priority) + { + priority = 10; + logError(RTPS_WRITER, + "Wrong value for fastdds.sfc.priority property. Range is [-10, 10]. Priority set to lowest (10)"); + } + } + else + { + priority = 10; + logError(RTPS_WRITER, + "Not numerical value for fastdds.sfc.priority property. Priority set to lowest (10)"); + } + } + + auto ret = priorities_.insert({writer, priority}); + (void)ret; + assert(ret.second); + + // Ensure the priority is created. + FlowQueue& queue = writers_queue_[priority]; + (void)queue; + } + + void unregister_writer( + fastrtps::rtps::RTPSWriter* writer) + { + auto it = priorities_.find(writer); + assert(it != priorities_.end()); + priorities_.erase(it); + } + + void work_done() const + { + // Do nothing + } + + void add_new_sample( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change) + { + find_queue(writer).add_new_sample(change); + } + + void add_old_sample( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change) + { + find_queue(writer).add_old_sample(change); + } + + fastrtps::rtps::CacheChange_t* get_next_change_nts() + { + fastrtps::rtps::CacheChange_t* ret_change = nullptr; + + if (0 < writers_queue_.size()) + { + for (auto it = writers_queue_.begin(); nullptr == ret_change && it != writers_queue_.end(); ++it) + { + ret_change = it->second.get_next_change(); + } + } + + return ret_change; + } + + void add_interested_changes_to_queue_nts() + { + // This function should be called with mutex_ and interested_lock locked, because the queue is changed. + for (auto& queue : writers_queue_) + { + queue.second.add_interested_changes_to_queue(); + } + } + + void set_bandwith_limitation( + uint32_t) const + { + } + + void trigger_bandwidth_limit_reset() const + { + } + +private: + + FlowQueue& find_queue( + fastrtps::rtps::RTPSWriter* writer) + { + // Find priority. + auto priority_it = priorities_.find(writer); + assert(priority_it != priorities_.end()); + auto queue_it = writers_queue_.find(priority_it->second); + assert(queue_it != writers_queue_.end()); + return queue_it->second; + } + + std::map writers_queue_; + + std::unordered_map priorities_; +}; + +//! Priority with reservation scheduling +struct FlowControllerPriorityWithReservationSchedule +{ + void register_writer( + fastrtps::rtps::RTPSWriter* writer) + { + assert(nullptr != writer); + int32_t priority = 10; + auto property = fastrtps::rtps::PropertyPolicyHelper::find_property( + writer->getAttributes().properties, "fastdds.sfc.priority"); + + if (nullptr != property) + { + char* ptr = nullptr; + priority = strtol(property->c_str(), &ptr, 10); + + if (property->c_str() != ptr) // A valid integer was read. + { + if (-10 > priority || 10 < priority) + { + priority = 10; + logError(RTPS_WRITER, + "Wrong value for fastdds.sfc.priority property. Range is [-10, 10]. Priority set to lowest (10)"); + } + } + else + { + priority = 10; + logError(RTPS_WRITER, + "Not numerical value for fastdds.sfc.priority property. Priority set to lowest (10)"); + } + } + + uint32_t reservation = 0; + property = fastrtps::rtps::PropertyPolicyHelper::find_property( + writer->getAttributes().properties, "fastdds.sfc.bandwidth_reservation"); + + if (nullptr != property) + { + char* ptr = nullptr; + reservation = strtoul(property->c_str(), &ptr, 10); + + if (property->c_str() != ptr) // A valid integer was read. + { + if (100 < reservation) + { + reservation = 0; + logError(RTPS_WRITER, + "Wrong value for fastdds.sfc.bandwidth_reservation property. Range is [0, 100]. Reservation set to lowest (0)"); + } + } + else + { + reservation = 0; + logError(RTPS_WRITER, + "Not numerical value for fastdds.sfc.bandwidth_reservation property. Reservation set to lowest (0)"); + } + } + + // Calculate reservation in bytes. + uint32_t reservation_bytes = (0 == bandwidth_limit_? 0 : + ((bandwidth_limit_ * reservation) / 100)); + + auto ret = writers_queue_.emplace(writer, std::make_tuple(FlowQueue(), priority, reservation_bytes, 0u)); + (void)ret; + assert(ret.second); + + priorities_[priority].push_back(writer); + } + + void unregister_writer( + fastrtps::rtps::RTPSWriter* writer) + { + auto it = writers_queue_.find(writer); + assert(it != writers_queue_.end()); + int32_t priority = std::get<1>(it->second); + writers_queue_.erase(it); + auto priority_it = priorities_.find(priority); + assert(priority_it != priorities_.end()); + auto writer_it = std::find(priority_it->second.begin(), priority_it->second.end(), writer); + assert(writer_it != priority_it->second.end()); + priority_it->second.erase(writer_it); + } + + void work_done() + { + if (nullptr != writer_being_processed_) + { + assert(0 != size_being_processed_); + auto writer = writers_queue_.find(writer_being_processed_); + std::get<3>(writer->second) += size_being_processed_; + writer_being_processed_ = nullptr; + size_being_processed_ = 0; + } + } + + void add_new_sample( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change) + { + // Find writer queue.. + auto it = writers_queue_.find(writer); + assert(it != writers_queue_.end()); + std::get<0>(it->second).add_new_sample(change); + } + + void add_old_sample( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change) + { + // Find writer queue.. + auto it = writers_queue_.find(writer); + assert(it != writers_queue_.end()); + std::get<0>(it->second).add_old_sample(change); + } + + fastrtps::rtps::CacheChange_t* get_next_change_nts() + { + fastrtps::rtps::CacheChange_t* highest_priority = nullptr; + fastrtps::rtps::CacheChange_t* ret_change = nullptr; + + if (0 < writers_queue_.size()) + { + for (auto& priority : priorities_) + { + for (auto writer_it : priority.second) + { + auto writer = writers_queue_.find(writer_it); + fastrtps::rtps::CacheChange_t* change = std::get<0>(writer->second).get_next_change(); + + if (nullptr == highest_priority) + { + highest_priority = change; + } + + if (nullptr != change) + { + // Check if writer's next change can be processed because the writer's bandwidth reservation is + // enough. + uint32_t size_to_check = change->serializedPayload.length; + if (0 != change->getFragmentCount()) + { + size_to_check = change->getFragmentSize(); + } + + if (std::get<2>(writer->second) > (std::get<3>(writer->second) + size_to_check)) + { + ret_change = change; + writer_being_processed_ = writer_it; + size_being_processed_ = size_to_check; + break; + } + } + } + + if (nullptr != ret_change) + { + break; + } + } + } + + return (nullptr != ret_change ? ret_change : highest_priority); + } + + void add_interested_changes_to_queue_nts() + { + // This function should be called with mutex_ and interested_lock locked, because the queue is changed. + for (auto& queue : writers_queue_) + { + std::get<0>(queue.second).add_interested_changes_to_queue(); + } + } + + void set_bandwith_limitation( + uint32_t limit) + { + bandwidth_limit_ = limit; + } + + void trigger_bandwidth_limit_reset() + { + for (auto& writer : writers_queue_) + { + std::get<3>(writer.second) = 0; + } + } + +private: + + using map_writers = std::unordered_map>; + + using map_priorities = std::map>; + + map_writers writers_queue_; + + map_priorities priorities_; + + uint32_t bandwidth_limit_ = 0; + + fastrtps::rtps::RTPSWriter* writer_being_processed_ = nullptr; + + uint32_t size_being_processed_ = 0; +}; + +template +class FlowControllerImpl : public FlowController +{ + using publish_mode = PublishMode; + using scheduler = SampleScheduling; + +public: + + FlowControllerImpl( + fastrtps::rtps::RTPSParticipantImpl* participant, + const FlowControllerDescriptor* descriptor + ) + : participant_(participant) + , async_mode(participant, descriptor) + { + uint32_t limitation = get_max_payload(); + + if (std::numeric_limits::max() != limitation) + { + sched.set_bandwith_limitation(limitation); + } + } + + virtual ~FlowControllerImpl() noexcept + { + } + + /*! + * Initializes the flow controller. + */ + void init() override + { + initialize_async_thread(); + } + + /*! + * Registers a writer. + * This object is only be able to manage a CacheChante_t if its writer was registered previously with this function. + * + * @param writer Pointer to the writer to be registered. Cannot be nullptr. + */ + void register_writer( + fastrtps::rtps::RTPSWriter* writer) override + { + std::unique_lock lock(mutex_); + auto ret = writers_.insert({ writer->getGuid(), writer}); + (void)ret; + assert(ret.second); + register_writer_impl(writer); + } + + /*! + * Unregister a writer. + * + * @param writer Pointer to the writer to be unregistered. Cannot be nullptr. + */ + void unregister_writer( + fastrtps::rtps::RTPSWriter* writer) override + { + std::unique_lock lock(mutex_); + writers_.erase(writer->getGuid()); + unregister_writer_impl(writer); + } + + /* + * Adds the CacheChange_t to be managed by this object. + * The CacheChange_t has to be a new one, that is, it has to be added to the writer's history before this call. + * This function should be called by RTPSWriter::unsent_change_added_to_history(). + * This function has two specializations depending on template parameter PublishMode. + * + * @param Pointer to the writer which the added CacheChante_t is responsable. Cannot be nullptr. + * @param change Pointer to the new CacheChange_t to be managed by this object. Cannot be nullptr. + * @return true if sample could be added. false in other case. + */ + bool add_new_sample( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) override + { + return add_new_sample_impl(writer, change, max_blocking_time); + } + + /*! + * Adds the CacheChante_t to be managed by this object. + * The CacheChange_t has to be an old one, that is, it is already in the writer's history and for some reason has to + * be sent again. + * + * @param Pointer to the writer which the added change is responsable. Cannot be nullptr. + * @param change Pointer to the old change to be managed by this object. Cannot be nullptr. + * @return true if sample could be added. false in other case. + */ + bool add_old_sample( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change) override + { + return add_old_sample_impl(writer, change, + std::chrono::steady_clock::now() + std::chrono::hours(24)); + } + + /*! + * If currently the CacheChange_t is managed by this object, remove it. + * This funcion should be called when a CacheChange_t is removed from the writer's history. + * + * @param Pointer to the change which should be removed if it is currently managed by this object. + */ + void remove_change( + fastrtps::rtps::CacheChange_t* change) override + { + assert(nullptr != change); + remove_change_impl(change); + } + + uint32_t get_max_payload() override + { + return get_max_payload_impl(); + } + +private: + + /*! + * Initialize asynchronous thread. + */ + template + typename std::enable_if::value, void>::type + initialize_async_thread() + { + bool expected = false; + if (async_mode.running.compare_exchange_strong(expected, true)) + { + // Code for initializing the asynchronous thread. + async_mode.thread = std::thread(&FlowControllerImpl::run, this); + } + } + + /*! This function is used when PublishMode = FlowControllerPureSyncPublishMode. + * In this case the async thread doesn't need to be initialized. + */ + template + typename std::enable_if::value, void>::type + initialize_async_thread() + { + // Do nothing. + } + + template + typename std::enable_if::value, void>::type + register_writer_impl( + fastrtps::rtps::RTPSWriter* writer) + { + std::unique_lock in_lock(async_mode.changes_interested_mutex); + sched.register_writer(writer); + } + + template + typename std::enable_if::value, void>::type + register_writer_impl( + fastrtps::rtps::RTPSWriter*) + { + // Do nothing. + } + + template + typename std::enable_if::value, void>::type + unregister_writer_impl( + fastrtps::rtps::RTPSWriter* writer) + { + std::unique_lock in_lock(async_mode.changes_interested_mutex); + sched.unregister_writer(writer); + } + + template + typename std::enable_if::value, void>::type + unregister_writer_impl( + fastrtps::rtps::RTPSWriter*) + { + // Do nothing. + } + + /*! + * This function store internally the sample and wake up the async thread. + * + * @note Before calling this function, the change's writer mutex have to be locked. + */ + template + typename std::enable_if::value, bool>::type + enqueue_new_sample_impl( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change, + const std::chrono::time_point& /* TODO max_blocking_time*/) + { + assert(!change->writer_info.is_linked.load()); + // Sync delivery failed. Try to store for asynchronous delivery. + std::unique_lock lock(async_mode.changes_interested_mutex); + sched.add_new_sample(writer, change); + async_mode.cv.notify_one(); + + return true; + } + + /*! This function is used when PublishMode = FlowControllerPureSyncPublishMode. + * In this case there is no async mechanism. + */ + template + typename std::enable_if::value, bool>::type + constexpr enqueue_new_sample_impl( + fastrtps::rtps::RTPSWriter*, + fastrtps::rtps::CacheChange_t*, + const std::chrono::time_point&) const + { + // Do nothing. Return false. + return false; + } + + /*! + * This function tries to send the sample synchronously. + * That is, it uses the user's thread, which is the one calling this function, to send the sample. + * It calls new function `RTPSWriter::deliver_sample_nts()` for sending the sample. + * If this function fails (for example because non-blocking socket is full), this function stores internally the sample to + * try sending it again asynchronously. + */ + template + typename std::enable_if::value, bool>::type + add_new_sample_impl( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) + { + // This call should be made with writer's mutex locked. + fastrtps::rtps::LocatorSelectorSender& locator_selector = writer->get_general_locator_selector(); + std::lock_guard lock(locator_selector); + fastrtps::rtps::RTPSMessageGroup group(participant_, writer, &locator_selector); + if (fastrtps::rtps::DeliveryRetCode::DELIVERED != + writer->deliver_sample_nts(change, group, locator_selector, max_blocking_time)) + { + return enqueue_new_sample_impl(writer, change, max_blocking_time); + } + + return true; + } + + /*! + * This function stores internally the sample to send it asynchronously. + */ + template + typename std::enable_if::value, bool>::type + add_new_sample_impl( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) + { + return enqueue_new_sample_impl(writer, change, max_blocking_time); + } + + /*! + * This function store internally the sample and wake up the async thread. + * + * @note Before calling this function, the change's writer mutex have to be locked. + */ + template + typename std::enable_if::value, bool>::type + add_old_sample_impl( + fastrtps::rtps::RTPSWriter* writer, + fastrtps::rtps::CacheChange_t* change, + const std::chrono::time_point& /* TODO max_blocking_time*/) + { + if (!change->writer_info.is_linked.load()) + { + std::unique_lock lock(async_mode.changes_interested_mutex); + sched.add_old_sample(writer, change); + async_mode.cv.notify_one(); + + return true; + } + + return false; + } + + /*! This function is used when PublishMode = FlowControllerPureSyncPublishMode. + * In this case there is no async mechanism. + */ + template + typename std::enable_if::value, bool>::type + constexpr add_old_sample_impl( + fastrtps::rtps::RTPSWriter*, + fastrtps::rtps::CacheChange_t*, + const std::chrono::time_point&) const + { + return false; + } + + /*! + * This function store internally the sample and wake up the async thread. + * + * @note Before calling this function, the change's writer mutex have to be locked. + */ + template + typename std::enable_if::value, void>::type + remove_change_impl( + fastrtps::rtps::CacheChange_t* change) + { + if (change->writer_info.is_linked.load()) + { + ++async_mode.writers_interested_in_remove; + std::unique_lock lock(mutex_); + std::unique_lock interested_lock(async_mode.changes_interested_mutex); + + // When blocked, both pointer are different than nullptr or equal. + assert((nullptr != change->writer_info.previous && + nullptr != change->writer_info.next) || + (nullptr == change->writer_info.previous && + nullptr == change->writer_info.next)); + if (change->writer_info.is_linked.load()) + { + + // Try to join previous node and next node. + change->writer_info.previous->writer_info.next = change->writer_info.next; + change->writer_info.next->writer_info.previous = change->writer_info.previous; + change->writer_info.previous = nullptr; + change->writer_info.next = nullptr; + change->writer_info.is_linked.store(false); + } + --async_mode.writers_interested_in_remove; + } + } + + /*! This function is used when PublishMode = FlowControllerPureSyncPublishMode. + * In this case there is no async mechanism. + */ + template + typename std::enable_if::value, void>::type + remove_change_impl( + fastrtps::rtps::CacheChange_t*) const + { + // Do nothing. + } + + /*! + * Function run by the asynchronous thread. + */ + void run() + { + while (async_mode.running) + { + // There are writers interested in removing a sample. + if (0 != async_mode.writers_interested_in_remove) + { + continue; + } + + std::unique_lock lock(mutex_); + fastrtps::rtps::CacheChange_t* change_to_process = nullptr; + + //Check if we have to sleep. + { + std::unique_lock in_lock(async_mode.changes_interested_mutex); + // Add interested changes into the queue. + sched.add_interested_changes_to_queue_nts(); + + while (async_mode.running && + (async_mode.force_wait() || nullptr == (change_to_process = sched.get_next_change_nts()))) + { + // Release main mutex to allow registering/unregistering writers while this thread is waiting. + lock.unlock(); + bool ret = async_mode.wait(in_lock); + + in_lock.unlock(); + lock.lock(); + in_lock.lock(); + + if (ret) + { + sched.trigger_bandwidth_limit_reset(); + } + sched.add_interested_changes_to_queue_nts(); + } + } + + fastrtps::rtps::RTPSWriter* current_writer = nullptr; + while (nullptr != change_to_process) + { + // Fast check if next change will enter. + if (!async_mode.fast_check_is_there_slot_for_change(change_to_process)) + { + break; + } + + if (nullptr == current_writer || current_writer->getGuid() != change_to_process->writerGUID) + { + auto writer_it = writers_.find(change_to_process->writerGUID); + assert(writers_.end() != writer_it); + + current_writer = writer_it->second; + } + + if (!current_writer->getMutex().try_lock()) + { + break; + } + + fastrtps::rtps::LocatorSelectorSender& locator_selector = + current_writer->get_async_locator_selector(); + async_mode.group.sender(current_writer, &locator_selector); + locator_selector.lock(); + + // Remove previously from queue, because deliver_sample_nts could call FlowController::remove_sample() + // provoking a deadlock. + fastrtps::rtps::CacheChange_t* previous = change_to_process->writer_info.previous; + fastrtps::rtps::CacheChange_t* next = change_to_process->writer_info.next; + previous->writer_info.next = next; + next->writer_info.previous = previous; + change_to_process->writer_info.previous = nullptr; + change_to_process->writer_info.next = nullptr; + change_to_process->writer_info.is_linked.store(false); + + fastrtps::rtps::DeliveryRetCode ret_delivery = current_writer->deliver_sample_nts( + change_to_process, async_mode.group, locator_selector, + std::chrono::steady_clock::now() + std::chrono::hours(24)); + + if (fastrtps::rtps::DeliveryRetCode::DELIVERED != ret_delivery) + { + // If delivery fails, put the change again in the queue. + change_to_process->writer_info.is_linked.store(true); + previous->writer_info.next = change_to_process; + next->writer_info.previous = change_to_process; + change_to_process->writer_info.previous = previous; + change_to_process->writer_info.next = next; + + async_mode.process_deliver_retcode(ret_delivery); + + locator_selector.unlock(); + current_writer->getMutex().unlock(); + // Unlock mutex_ and try again. + break; + } + + locator_selector.unlock(); + current_writer->getMutex().unlock(); + + sched.work_done(); + + if (0 != async_mode.writers_interested_in_remove) + { + // There are writers that want to remove samples. + break; + } + + // Add interested changes into the queue. + { + std::unique_lock in_lock(async_mode.changes_interested_mutex); + sched.add_interested_changes_to_queue_nts(); + } + + change_to_process = sched.get_next_change_nts(); + } + + async_mode.group.sender(nullptr, nullptr); + } + } + + template + typename std::enable_if::value, uint32_t>::type + get_max_payload_impl() + { + return static_cast(async_mode.max_bytes_per_period); + } + + template + typename std::enable_if::value, uint32_t>::type + constexpr get_max_payload_impl() const + { + return std::numeric_limits::max(); + } + + std::mutex mutex_; + + fastrtps::rtps::RTPSParticipantImpl* participant_ = nullptr; + + std::map writers_; + + scheduler sched; + + // async_mode must be destroyed before sched. + publish_mode async_mode; +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // _RTPS_FLOWCONTROL_FLOWCONTROLLERIMPL_HPP_ diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp new file mode 100644 index 00000000000..887aab4e08a --- /dev/null +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -0,0 +1,189 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "BlackboxTests.hpp" +#include "../types/HelloWorldPubSubTypes.h" + +namespace eprosima { +namespace fastdds { +namespace dds { + +using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t; + +/** + * This test checks whether it is safe to delete not enabled DDS entities * + */ +TEST(DDSBasic, DeleteDisabledEntities) +{ + // Set DomainParticipantFactory to create disabled entities + DomainParticipantFactoryQos factory_qos; + factory_qos.entity_factory().autoenable_created_entities = false; + DomainParticipantFactory* factory = DomainParticipantFactory::get_instance(); + ASSERT_NE(nullptr, factory); + factory->set_qos(factory_qos); + DomainParticipantFactoryQos factory_qos_check; + ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->get_qos(factory_qos_check)); + ASSERT_EQ(false, factory_qos_check.entity_factory().autoenable_created_entities); + + // Create a disabled DomainParticipant, setting it to in turn create disable entities + DomainParticipantQos participant_qos; + participant_qos.entity_factory().autoenable_created_entities = false; + DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, participant_qos); + ASSERT_NE(nullptr, participant); + DomainParticipantQos participant_qos_check; + ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->get_qos(participant_qos_check)); + ASSERT_EQ(false, participant_qos_check.entity_factory().autoenable_created_entities); + + // Create a disabled Publisher, setting it to in turn create disable entities + PublisherQos publisher_qos; + publisher_qos.entity_factory().autoenable_created_entities = false; + Publisher* publisher = participant->create_publisher(publisher_qos); + ASSERT_NE(nullptr, publisher); + PublisherQos publisher_qos_check; + ASSERT_EQ(ReturnCode_t::RETCODE_OK, publisher->get_qos(publisher_qos_check)); + ASSERT_EQ(false, publisher_qos_check.entity_factory().autoenable_created_entities); + + // Create a disabled Subscriber, setting it to in turn create disable entities + SubscriberQos subscriber_qos; + subscriber_qos.entity_factory().autoenable_created_entities = false; + Subscriber* subscriber = participant->create_subscriber(subscriber_qos); + ASSERT_NE(nullptr, subscriber); + SubscriberQos subscriber_qos_check; + ASSERT_EQ(ReturnCode_t::RETCODE_OK, subscriber->get_qos(subscriber_qos_check)); + ASSERT_EQ(false, subscriber_qos_check.entity_factory().autoenable_created_entities); + + // Register type + TypeSupport type_support; + type_support.reset(new HelloWorldPubSubType()); + type_support.register_type(participant); + ASSERT_NE(nullptr, type_support); + + // Create Topic + Topic* topic = participant->create_topic("HelloWorld", type_support.get_type_name(), TOPIC_QOS_DEFAULT); + ASSERT_NE(nullptr, topic); + + // Create a disabled DataWriter + DataWriter* datawriter = publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT); + ASSERT_NE(nullptr, datawriter); + + // Create a disabled DataReader + DataReader* datareader = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT); + ASSERT_NE(nullptr, datareader); + + // Delete all entities + publisher->delete_datawriter(datawriter); + subscriber->delete_datareader(datareader); + participant->delete_publisher(publisher); + participant->delete_subscriber(subscriber); + participant->delete_topic(topic); + factory->delete_participant(participant); +} + +/** + * This test checks a race condition when calling DomainParticipantImpl::create_instance_handle() + * from different threads simultaneously. This was resulting in a `free(): invalid pointer` crash + * when deleting publishers created this way, as there was a clash in their respective instance + * handles. Not only did the crash occur, but it was also reported by TSan. + * + * The test spawns 200 threads, each creating a publisher and then waiting on a command from the + * main thread to delete them (so all of them at deleted at the same time). + */ +TEST(DDSBasic, MultithreadedPublisherCreation) +{ + /* Get factory */ + DomainParticipantFactory* factory = DomainParticipantFactory::get_instance(); + ASSERT_NE(nullptr, factory); + + /* Create participant */ + DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT); + ASSERT_NE(nullptr, participant); + + /* Test synchronization variables */ + std::mutex finish_mtx; + std::condition_variable finish_cv; + bool should_finish = false; + + /* Function to create publishers, deleting them on command */ + auto thread_run = + [participant, &finish_mtx, &finish_cv, &should_finish]() + { + /* Create publisher */ + Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT); + ASSERT_NE(nullptr, publisher); + + { + /* Wait for test completion request */ + std::unique_lock lock(finish_mtx); + finish_cv.wait(lock, [&should_finish]() + { + return should_finish; + }); + } + + /* Delete publisher */ + ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_publisher(publisher)); + }; + + { + /* Create threads */ + std::vector threads; + for (size_t i = 0; i < 200; i++) + { + threads.push_back(std::thread(thread_run)); + } + + /* Command threads to delete their publishers */ + { + std::lock_guard guard(finish_mtx); + should_finish = true; + finish_cv.notify_all(); + } + + /* Wait for all threads to join */ + for (std::thread& thr : threads) + { + thr.join(); + } + } + + /* Clean up */ + ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant)); +} + +} // namespace dds +} // namespace fastdds +} // namespace eprosima diff --git a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp index c5cbb04f232..9f66d2dd019 100644 --- a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp @@ -14,6 +14,22 @@ #include "BlackboxTests.hpp" +<<<<<<< HEAD +======= +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +>>>>>>> 4391864a8 (Fix dataraces when creating DataWriters (#3008)) #include "RTPSAsSocketReader.hpp" #include "RTPSAsSocketWriter.hpp" #include "RTPSWithRegistrationReader.hpp" @@ -445,6 +461,313 @@ TEST_P(RTPS, RTPSAsReliableWithRegistrationAndHolesInHistory) } +<<<<<<< HEAD +======= +TEST_P(RTPS, RTPSAsReliableVolatileTwoWritersConsecutives) +{ + RTPSWithRegistrationReader reader(TEST_TOPIC_NAME); + RTPSWithRegistrationWriter writer(TEST_TOPIC_NAME); + + reader.reliability(ReliabilityKind_t::RELIABLE).init(); + EXPECT_TRUE(reader.isInitialized()); + + writer.reliability(ReliabilityKind_t::RELIABLE).init(); + EXPECT_TRUE(writer.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto complete_data = default_helloworld_data_generator(); + + reader.expected_data(complete_data); + reader.startReception(); + + // Send data + writer.send(complete_data); + EXPECT_TRUE(complete_data.empty()); + + reader.block_for_all(); + reader.stopReception(); + + writer.destroy(); + + // Wait for undiscovery + reader.wait_undiscovery(); + + writer.reliability(ReliabilityKind_t::RELIABLE).init(); + + // Wait for discovery + writer.wait_discovery(); + reader.wait_discovery(); + + complete_data = default_helloworld_data_generator(); + + reader.expected_data(complete_data, true); + reader.startReception(); + + writer.send(complete_data); + EXPECT_TRUE(complete_data.empty()); + + reader.block_for_all(); + + reader.destroy(); + writer.destroy(); +} + +TEST_P(RTPS, RTPSAsReliableTransientLocalTwoWritersConsecutives) +{ + RTPSWithRegistrationReader reader(TEST_TOPIC_NAME); + RTPSWithRegistrationWriter writer(TEST_TOPIC_NAME); + + reader.reliability(ReliabilityKind_t::RELIABLE).durability(DurabilityKind_t::TRANSIENT_LOCAL).init(); + EXPECT_TRUE(reader.isInitialized()); + + writer.reliability(ReliabilityKind_t::RELIABLE).init(); + + EXPECT_TRUE(writer.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto complete_data = default_helloworld_data_generator(); + + reader.expected_data(complete_data); + reader.startReception(); + + // Send data + writer.send(complete_data); + EXPECT_TRUE(complete_data.empty()); + + reader.block_for_all(); + reader.stopReception(); + + writer.destroy(); + + // Wait for undiscovery + reader.wait_undiscovery(); + + writer.reliability(ReliabilityKind_t::RELIABLE).init(); + + // Wait for discovery + writer.wait_discovery(); + reader.wait_discovery(); + + complete_data = default_helloworld_data_generator(); + + reader.expected_data(complete_data, true); + reader.startReception(); + + writer.send(complete_data); + EXPECT_TRUE(complete_data.empty()); + + reader.block_for_all(); + + reader.destroy(); + writer.destroy(); +} + +/** + * This test checks the addition of network interfaces at run-time. + * + * After launching the reader with the network interfaces enabled, + * the writer is launched with the transport simulating that there + * are no interfaces. + * No participant discovery occurs, nor is communication established. + * + * In a second step, the flag to simulate no interfaces is disabled and + * RTPSParticipant::update_attributes() called to add the "new" interfaces. + * Discovery is succesful and communication is established. + */ +TEST(RTPS, RTPSNetworkInterfaceChangesAtRunTime) +{ + RTPSWithRegistrationReader reader(TEST_TOPIC_NAME); + RTPSWithRegistrationWriter writer(TEST_TOPIC_NAME); + + // reader is initialized with all the network interfaces + reader.reliability(ReliabilityKind_t::RELIABLE).durability(DurabilityKind_t::TRANSIENT_LOCAL).init(); + ASSERT_TRUE(reader.isInitialized()); + + // writer: launch without interfaces + test_UDPv4Transport::simulate_no_interfaces = true; + auto test_transport = std::make_shared(); + writer.disable_builtin_transport().add_user_transport_to_pparams(test_transport).init(); + ASSERT_TRUE(writer.isInitialized()); + + // no discovery + writer.wait_discovery(std::chrono::seconds(3)); + reader.wait_discovery(std::chrono::seconds(3)); + EXPECT_EQ(writer.get_matched(), 0u); + EXPECT_EQ(reader.get_matched(), 0u); + + // send data + auto complete_data = default_helloworld_data_generator(); + size_t samples = complete_data.size(); + + reader.expected_data(complete_data, true); + reader.startReception(); + + writer.send(complete_data); + EXPECT_TRUE(complete_data.empty()); + + // no data received + reader.block_for_all(std::chrono::seconds(3)); + EXPECT_EQ(reader.getReceivedCount(), 0u); + + // enable interfaces + test_UDPv4Transport::simulate_no_interfaces = false; + writer.participant_update_attributes(); + + // Wait for discovery + writer.wait_discovery(std::chrono::seconds(3)); + reader.wait_discovery(std::chrono::seconds(3)); + ASSERT_EQ(writer.get_matched(), 1u); + ASSERT_EQ(reader.get_matched(), 1u); + + // data received + reader.block_for_all(); + EXPECT_EQ(reader.getReceivedCount(), static_cast(samples)); + + reader.destroy(); + writer.destroy(); +} + +/** + * Regression test for checking that a not enabled RTPSParticipant can be removed + * + * https://github.com/eProsima/Fast-DDS/pull/2171 introduced this regression since with it + * the PDP is not enabled until calling BuiltinProtocols::enable(), which is called within + * RTPSParticipant::enable(). However, during RTPSDomain::removeRTPSParticipant(), there is a call + * to BuiltinProtocols::stopRTPSParticipantAnnouncement(), which in turn calls + * PDP::stopRTPSParticipantAnnouncement(). That function ends up accessing a timed event pointer, + * which is only instantiated on PDP::enable(). Since the RTPSParticipant was not enabled, + * BuiltinProtocols and in turn PDP are not either, meaning that it is not safe to call + * PDP::stopRTPSParticipantAnnouncement() on a not enabled PDP. + * + * The test checks that the necessary guards are in place so that it is safe to call + * RTPSDomain::removeRTPSParticipant() o a not enabled RTPSParticipant. + */ +TEST(RTPS, RemoveDisabledParticipant) +{ + RTPSParticipantAttributes rtps_attr; + RTPSParticipant* rtps_participant = RTPSDomain::createParticipant( + (uint32_t)GET_PID() % 230, false, rtps_attr, nullptr); + + ASSERT_NE(nullptr, rtps_participant); + ASSERT_TRUE(RTPSDomain::removeRTPSParticipant(rtps_participant)); +} + + +/** + * This test checks a race condition on initializing a writer's flow controller when creating + * several RTPSWriters in parallel: https://eprosima.easyredmine.com/issues/15905 + * + * The test creates a participant with 4 different flow controllers, and then creates 200 threads + * which each create an RTPSWriter which uses one of the participant's flow controllers. + * The threads wait for a command coming from the main thread to delete the writers. This is to + * ensure that all threads are initialized prior to any of them starts deleting. + */ +TEST(RTPS, MultithreadedWriterCreation) +{ + /* Flow controller builder */ + using FlowControllerDescriptor_t = eprosima::fastdds::rtps::FlowControllerDescriptor; + using SchedulerPolicy_t = eprosima::fastdds::rtps::FlowControllerSchedulerPolicy; + auto create_flow_controller = + [](const char* name, SchedulerPolicy_t scheduler, + int32_t max_bytes_per_period, + uint64_t period_ms) -> std::shared_ptr + { + std::shared_ptr descriptor = std::make_shared(); + descriptor->name = name; + descriptor->scheduler = scheduler; + descriptor->max_bytes_per_period = max_bytes_per_period; + descriptor->period_ms = period_ms; + return descriptor; + }; + + /* Create participant */ + RTPSParticipantAttributes rtps_attr; + // Create one flow controller of each kind to make things interesting + const char* flow_controller_name = "fifo_controller"; + rtps_attr.flow_controllers.push_back(create_flow_controller("high_priority_controller", + SchedulerPolicy_t::HIGH_PRIORITY, 200, 10)); + rtps_attr.flow_controllers.push_back(create_flow_controller("priority_with_reservation_controller", + SchedulerPolicy_t::PRIORITY_WITH_RESERVATION, 200, 10)); + rtps_attr.flow_controllers.push_back(create_flow_controller("round_robin_controller", + SchedulerPolicy_t::ROUND_ROBIN, 200, 10)); + rtps_attr.flow_controllers.push_back(create_flow_controller(flow_controller_name, SchedulerPolicy_t::FIFO, 200, + 10)); + RTPSParticipant* rtps_participant = RTPSDomain::createParticipant( + (uint32_t)GET_PID() % 230, false, rtps_attr, nullptr); + + /* Test sync variables */ + std::mutex finish_mtx; + std::condition_variable finish_cv; + bool should_finish = false; + + /* Lambda function to create a writer with a flow controller, and to destroy it at command */ + auto thread_run = [rtps_participant, flow_controller_name, &finish_mtx, &finish_cv, &should_finish]() + { + /* Create writer history */ + eprosima::fastrtps::rtps::HistoryAttributes hattr; + eprosima::fastrtps::rtps::WriterHistory* history = new eprosima::fastrtps::rtps::WriterHistory(hattr); + eprosima::fastrtps::TopicAttributes topic_attr; + + /* Create writer with a flow controller */ + eprosima::fastrtps::rtps::WriterAttributes writer_attr; + writer_attr.mode = RTPSWriterPublishMode::ASYNCHRONOUS_WRITER; + writer_attr.flow_controller_name = flow_controller_name; + eprosima::fastrtps::rtps::RTPSWriter* writer = eprosima::fastrtps::rtps::RTPSDomain::createRTPSWriter( + rtps_participant, writer_attr, history, nullptr); + + /* Register writer in participant */ + eprosima::fastrtps::WriterQos writer_qos; + ASSERT_EQ(rtps_participant->registerWriter(writer, topic_attr, writer_qos), true); + + { + /* Wait for test completion request */ + std::unique_lock lock(finish_mtx); + finish_cv.wait(lock, [&should_finish]() + { + return should_finish; + }); + } + + /* Remove writer */ + ASSERT_TRUE(RTPSDomain::removeRTPSWriter(writer)); + }; + + { + /* Create test threads */ + constexpr size_t num_threads = 200; + std::vector threads; + for (size_t i = 0; i < num_threads; ++i) + { + threads.push_back(std::thread(thread_run)); + } + + /* Once all threads are created, we can start deleting them */ + { + std::lock_guard guard(finish_mtx); + should_finish = true; + finish_cv.notify_all(); + } + + /* Wait until are threads join */ + for (std::thread& thr : threads) + { + thr.join(); + } + } + + /* Clean up */ + ASSERT_TRUE(RTPSDomain::removeRTPSParticipant(rtps_participant)); + ASSERT_NE(nullptr, rtps_participant); + RTPSDomain::stopAll(); +} + +>>>>>>> 4391864a8 (Fix dataraces when creating DataWriters (#3008)) #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else