diff --git a/include/fastdds/dds/domain/DomainParticipant.hpp b/include/fastdds/dds/domain/DomainParticipant.hpp index 91766a6c9f7..5c3c851b0b6 100644 --- a/include/fastdds/dds/domain/DomainParticipant.hpp +++ b/include/fastdds/dds/domain/DomainParticipant.hpp @@ -784,6 +784,8 @@ class DomainParticipant : public Entity /** * @brief Getter for the resource event * + * @pre The DomainParticipant is enabled. + * * @return A reference to the resource event */ RTPS_DllAPI fastrtps::rtps::ResourceEvent& get_resource_event() const; diff --git a/include/fastdds/rtps/network/ReceiverResource.h b/include/fastdds/rtps/network/ReceiverResource.h index 7f4504d851d..6d9cb5ffbe0 100644 --- a/include/fastdds/rtps/network/ReceiverResource.h +++ b/include/fastdds/rtps/network/ReceiverResource.h @@ -15,9 +15,11 @@ #ifndef _FASTDDS_RTPS_RECEIVER_RESOURCE_H #define _FASTDDS_RTPS_RECEIVER_RESOURCE_H +#include #include -#include #include +#include + #include #include @@ -34,39 +36,46 @@ namespace rtps { */ class ReceiverResource : public fastdds::rtps::TransportReceiverInterface { - //! Only NetworkFactory is ever allowed to construct a ReceiverResource from scratch. - //! In doing so, it guarantees the transport and channel are in a valid state for - //! this resource to exist. + //! Only NetworkFactory is ever allowed to construct a ReceiverResource from scratch. + //! In doing so, it guarantees the transport and channel are in a valid state for + //! this resource to exist. friend class NetworkFactory; public: + /** - * Method called by the transport when receiving data. - * @param data Pointer to the received data. - * @param size Number of bytes received. - * @param localLocator Locator identifying the local endpoint. - * @param remoteLocator Locator identifying the remote endpoint. - */ - virtual void OnDataReceived(const octet* data, const uint32_t size, - const Locator_t& localLocator, const Locator_t& remoteLocator) override; + * Method called by the transport when receiving data. + * @param data Pointer to the received data. + * @param size Number of bytes received. + * @param localLocator Locator identifying the local endpoint. + * @param remoteLocator Locator identifying the remote endpoint. + */ + virtual void OnDataReceived( + const octet* data, + const uint32_t size, + const Locator_t& localLocator, + const Locator_t& remoteLocator) override; /** * Reports whether this resource supports the given local locator (i.e., said locator * maps to the transport channel managed by this resource). */ - bool SupportsLocator(const Locator_t& localLocator); + bool SupportsLocator( + const Locator_t& localLocator); /** * Register a MessageReceiver object to be called upon reception of data. * @param receiver The message receiver to register. */ - void RegisterReceiver(MessageReceiver* receiver); + void RegisterReceiver( + MessageReceiver* receiver); /** - * Unregister a MessageReceiver object to be called upon reception of data. - * @param receiver The message receiver to unregister. - */ - void UnregisterReceiver(MessageReceiver* receiver); + * Unregister a MessageReceiver object to be called upon reception of data. + * @param receiver The message receiver to unregister. + */ + void UnregisterReceiver( + MessageReceiver* receiver); /** * Closes related ChannelResources. @@ -82,23 +91,32 @@ class ReceiverResource : public fastdds::rtps::TransportReceiverInterface * Resources can only be transfered through move semantics. Copy, assignment, and * construction outside of the factory are forbidden. */ - ReceiverResource(ReceiverResource&&); + ReceiverResource( + ReceiverResource&&); ~ReceiverResource() override; private: - ReceiverResource() = delete; - ReceiverResource(const ReceiverResource&) = delete; - ReceiverResource& operator=(const ReceiverResource&) = delete; - ReceiverResource(fastdds::rtps::TransportInterface&, const Locator_t&, uint32_t); + ReceiverResource() = delete; + ReceiverResource( + const ReceiverResource&) = delete; + ReceiverResource& operator =( + const ReceiverResource&) = delete; + + ReceiverResource( + fastdds::rtps::TransportInterface&, + const Locator_t&, + uint32_t); std::function Cleanup; std::function LocatorMapsToManagedChannel; bool mValid; // Post-construction validity check for the NetworkFactory std::mutex mtx; + std::condition_variable cv_; MessageReceiver* receiver; uint32_t max_message_size_; + int active_callbacks_; }; } // namespace rtps diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp index 43e45f16d09..614866a9cf7 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp @@ -200,17 +200,19 @@ DomainParticipantImpl::DomainParticipantImpl( void DomainParticipantImpl::disable() { - if (participant_) + DomainParticipant* participant = get_participant(); + if (participant) { - participant_->set_listener(nullptr); + participant->set_listener(nullptr); } rtps_listener_.participant_ = nullptr; // The function to disable the DomainParticipantImpl is called from // DomainParticipantFactory::delete_participant() and DomainParticipantFactory destructor. - if (rtps_participant_ != nullptr) + auto rtps_participant = get_rtps_participant(); + if (rtps_participant != nullptr) { - rtps_participant_->set_listener(nullptr); + rtps_participant->set_listener(nullptr); { std::lock_guard lock(mtx_pubs_); @@ -266,9 +268,10 @@ DomainParticipantImpl::~DomainParticipantImpl() topics_by_handle_.clear(); } - if (rtps_participant_ != nullptr) + auto rtps_participant = get_rtps_participant(); + if (rtps_participant != nullptr) { - RTPSDomain::removeRTPSParticipant(rtps_participant_); + RTPSDomain::removeRTPSParticipant(rtps_participant); } { @@ -276,6 +279,8 @@ DomainParticipantImpl::~DomainParticipantImpl() types_.clear(); } + std::lock_guard _(mtx_gs_); + if (participant_) { participant_->impl_ = nullptr; @@ -287,7 +292,7 @@ DomainParticipantImpl::~DomainParticipantImpl() ReturnCode_t DomainParticipantImpl::enable() { // Should not have been previously enabled - assert(rtps_participant_ == nullptr); + assert(get_rtps_participant() == nullptr); fastrtps::rtps::RTPSParticipantAttributes rtps_attr; set_attributes_from_qos(rtps_attr, qos_); @@ -313,13 +318,18 @@ ReturnCode_t DomainParticipantImpl::enable() } guid_ = part->getGuid(); - rtps_participant_ = part; - rtps_participant_->set_check_type_function( - [this](const std::string& type_name) -> bool - { - return find_type(type_name).get() != nullptr; - }); + { + std::lock_guard _(mtx_gs_); + + rtps_participant_ = part; + + part->set_check_type_function( + [this](const std::string& type_name) -> bool + { + return find_type(type_name).get() != nullptr; + }); + } if (qos_.entity_factory().autoenable_created_entities) { @@ -338,7 +348,7 @@ ReturnCode_t DomainParticipantImpl::enable() std::lock_guard lock(mtx_pubs_); for (auto pub : publishers_) { - pub.second->rtps_participant_ = rtps_participant_; + pub.second->rtps_participant_ = part; pub.second->user_publisher_->enable(); } } @@ -349,13 +359,13 @@ ReturnCode_t DomainParticipantImpl::enable() for (auto sub : subscribers_) { - sub.second->rtps_participant_ = rtps_participant_; + sub.second->rtps_participant_ = part; sub.second->user_subscriber_->enable(); } } } - rtps_participant_->enable(); + part->enable(); return ReturnCode_t::RETCODE_OK; } @@ -363,60 +373,75 @@ ReturnCode_t DomainParticipantImpl::enable() ReturnCode_t DomainParticipantImpl::set_qos( const DomainParticipantQos& qos) { - bool enabled = (rtps_participant_ != nullptr); - const DomainParticipantQos& qos_to_set = (&qos == &PARTICIPANT_QOS_DEFAULT) ? - DomainParticipantFactory::get_instance()->get_default_participant_qos() : qos; + bool enabled = false; + bool qos_should_be_updated = false; + fastrtps::rtps::RTPSParticipantAttributes patt; + fastrtps::rtps::RTPSParticipant* rtps_participant = nullptr; - if (&qos != &PARTICIPANT_QOS_DEFAULT) { - ReturnCode_t ret_val = check_qos(qos_to_set); - if (!ret_val) + std::lock_guard _(mtx_gs_); + + rtps_participant = rtps_participant_; + enabled = rtps_participant != nullptr; + const DomainParticipantQos& qos_to_set = (&qos == &PARTICIPANT_QOS_DEFAULT) ? + DomainParticipantFactory::get_instance()->get_default_participant_qos() : qos; + + if (&qos != &PARTICIPANT_QOS_DEFAULT) { - return ret_val; + ReturnCode_t ret_val = check_qos(qos_to_set); + if (!ret_val) + { + return ret_val; + } } - } - if (enabled && !can_qos_be_updated(qos_, qos_to_set)) - { - return ReturnCode_t::RETCODE_IMMUTABLE_POLICY; - } - - bool qos_should_be_updated = set_qos(qos_, qos_to_set, !enabled); - if (enabled) - { - if (qos_should_be_updated) + if (enabled && !can_qos_be_updated(qos_, qos_to_set)) { - // Notify the participant that there is a QoS update - fastrtps::rtps::RTPSParticipantAttributes patt; - set_attributes_from_qos(patt, qos_); - rtps_participant_->update_attributes(patt); + return ReturnCode_t::RETCODE_IMMUTABLE_POLICY; } - else + + qos_should_be_updated = set_qos(qos_, qos_to_set, !enabled); + if (enabled) { - // Trigger update of network interfaces by calling update_attributes - rtps_participant_->update_attributes(rtps_participant_->getRTPSParticipantAttributes()); + if (qos_should_be_updated) + { + // Notify the participant that there is a QoS update + set_attributes_from_qos(patt, qos_); + } + else + { + // Trigger update of network interfaces by calling update_attributes with current attributes + patt = rtps_participant->getRTPSParticipantAttributes(); + } } } + if (enabled) + { + rtps_participant->update_attributes(patt); + } + return ReturnCode_t::RETCODE_OK; } ReturnCode_t DomainParticipantImpl::get_qos( DomainParticipantQos& qos) const { + std::lock_guard _(mtx_gs_); qos = qos_; return ReturnCode_t::RETCODE_OK; } const DomainParticipantQos& DomainParticipantImpl::get_qos() const { + std::lock_guard _(mtx_gs_); return qos_; } ReturnCode_t DomainParticipantImpl::delete_publisher( const Publisher* pub) { - if (participant_ != pub->get_participant()) + if (get_participant() != pub->get_participant()) { return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; } @@ -444,7 +469,7 @@ ReturnCode_t DomainParticipantImpl::delete_publisher( ReturnCode_t DomainParticipantImpl::delete_subscriber( const Subscriber* sub) { - if (participant_ != sub->get_participant()) + if (get_participant() != sub->get_participant()) { return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; } @@ -578,7 +603,7 @@ ContentFilteredTopic* DomainParticipantImpl::create_contentfilteredtopic( return nullptr; } - if (related_topic->get_participant() != this->participant_) + if (related_topic->get_participant() != get_participant()) { logError(PARTICIPANT, "Creating ContentFilteredTopic with name " << name << ": related_topic not from this participant"); @@ -783,9 +808,8 @@ Publisher* DomainParticipantImpl::create_publisher( PublisherImpl* pubimpl = create_publisher_impl(qos, listener); Publisher* pub = new Publisher(pubimpl, mask); pubimpl->user_publisher_ = pub; - pubimpl->rtps_participant_ = rtps_participant_; - - bool enabled = rtps_participant_ != nullptr; + pubimpl->rtps_participant_ = get_rtps_participant(); + bool enabled = get_rtps_participant() != nullptr; // Create InstanceHandle for the new publisher InstanceHandle_t pub_handle; @@ -973,14 +997,15 @@ ReturnCode_t DomainParticipantImpl::delete_contained_entities() ReturnCode_t DomainParticipantImpl::assert_liveliness() { - if (rtps_participant_ == nullptr) + fastrtps::rtps::RTPSParticipant* rtps_participant = get_rtps_participant(); + if (rtps_participant == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } - if (rtps_participant_->wlp() != nullptr) + if (rtps_participant->wlp() != nullptr) { - if (rtps_participant_->wlp()->assert_liveliness_manual_by_participant()) + if (rtps_participant->wlp()->assert_liveliness_manual_by_participant()) { return ReturnCode_t::RETCODE_OK; } @@ -1233,18 +1258,9 @@ ReturnCode_t DomainParticipantImpl::get_current_time( return ReturnCode_t::RETCODE_OK; } -const DomainParticipant* DomainParticipantImpl::get_participant() const -{ - return participant_; -} - -DomainParticipant* DomainParticipantImpl::get_participant() -{ - return participant_; -} - std::vector DomainParticipantImpl::get_participant_names() const { + std::lock_guard _(mtx_gs_); return rtps_participant_ == nullptr ? std::vector {} : @@ -1268,11 +1284,11 @@ Subscriber* DomainParticipantImpl::create_subscriber( SubscriberImpl* subimpl = create_subscriber_impl(qos, listener); Subscriber* sub = new Subscriber(subimpl, mask); subimpl->user_subscriber_ = sub; - subimpl->rtps_participant_ = this->rtps_participant_; + subimpl->rtps_participant_ = get_rtps_participant(); // Create InstanceHandle for the new subscriber InstanceHandle_t sub_handle; - bool enabled = rtps_participant_ != nullptr; + bool enabled = get_rtps_participant() != nullptr; // Create InstanceHandle for the new subscriber create_instance_handle(sub_handle); @@ -1339,7 +1355,7 @@ Topic* DomainParticipantImpl::create_topic( return nullptr; } - bool enabled = rtps_participant_ != nullptr; + bool enabled = get_rtps_participant() != nullptr; std::lock_guard lock(mtx_topics_); @@ -1562,9 +1578,10 @@ void DomainParticipantImpl::MyRTPSParticipantListener::onParticipantDiscovery( RTPSParticipant*, ParticipantDiscoveryInfo&& info) { - if (participant_ != nullptr && participant_->listener_ != nullptr) + DomainParticipantListener* listener = nullptr; + if (participant_ != nullptr && (listener = participant_->get_listener()) != nullptr) { - participant_->listener_->on_participant_discovery(participant_->participant_, std::move(info)); + listener->on_participant_discovery(participant_->participant_, std::move(info)); } } @@ -1573,9 +1590,10 @@ void DomainParticipantImpl::MyRTPSParticipantListener::onParticipantAuthenticati RTPSParticipant*, ParticipantAuthenticationInfo&& info) { - if (participant_ != nullptr && participant_->listener_ != nullptr) + DomainParticipantListener* listener = nullptr; + if (participant_ != nullptr && (listener = participant_->get_listener()) != nullptr) { - participant_->listener_->onParticipantAuthentication(participant_->participant_, std::move(info)); + listener->onParticipantAuthentication(participant_->participant_, std::move(info)); } } @@ -1585,9 +1603,10 @@ void DomainParticipantImpl::MyRTPSParticipantListener::onReaderDiscovery( RTPSParticipant*, ReaderDiscoveryInfo&& info) { - if (participant_ != nullptr && participant_->listener_ != nullptr) + DomainParticipantListener* listener = nullptr; + if (participant_ != nullptr && (listener = participant_->get_listener()) != nullptr) { - participant_->listener_->on_subscriber_discovery(participant_->participant_, std::move(info)); + listener->on_subscriber_discovery(participant_->participant_, std::move(info)); } } @@ -1595,9 +1614,10 @@ void DomainParticipantImpl::MyRTPSParticipantListener::onWriterDiscovery( RTPSParticipant*, WriterDiscoveryInfo&& info) { - if (participant_ != nullptr && participant_->listener_ != nullptr) + DomainParticipantListener* listener = nullptr; + if (participant_ != nullptr && (listener = participant_->get_listener()) != nullptr) { - participant_->listener_->on_publisher_discovery(participant_->participant_, std::move(info)); + listener->on_publisher_discovery(participant_->participant_, std::move(info)); } } @@ -1609,9 +1629,10 @@ void DomainParticipantImpl::MyRTPSParticipantListener::on_type_discovery( const fastrtps::types::TypeObject* object, fastrtps::types::DynamicType_ptr dyn_type) { - if (participant_ != nullptr && participant_->listener_ != nullptr) + DomainParticipantListener* listener = nullptr; + if (participant_ != nullptr && (listener = participant_->get_listener()) != nullptr) { - participant_->listener_->on_type_discovery( + listener->on_type_discovery( participant_->participant_, request_sample_id, topic, @@ -1628,9 +1649,10 @@ void DomainParticipantImpl::MyRTPSParticipantListener::on_type_dependencies_repl const fastrtps::rtps::SampleIdentity& request_sample_id, const fastrtps::types::TypeIdentifierWithSizeSeq& dependencies) { - if (participant_ != nullptr && participant_->listener_ != nullptr) + DomainParticipantListener* listener = nullptr; + if (participant_ != nullptr && (listener = participant_->get_listener()) != nullptr) { - participant_->listener_->on_type_dependencies_reply( + listener->on_type_dependencies_reply( participant_->participant_, request_sample_id, dependencies); } @@ -1643,13 +1665,17 @@ void DomainParticipantImpl::MyRTPSParticipantListener::on_type_information_recei const fastrtps::string_255& type_name, const fastrtps::types::TypeInformation& type_information) { - if (participant_ != nullptr && participant_->listener_ != nullptr) + DomainParticipantListener* listener = nullptr; + DomainParticipant* participant = nullptr; + if (participant_ != nullptr + && (participant = participant_->get_participant()) != nullptr + && (listener = participant_->get_listener()) != nullptr) { if (type_information.complete().typeid_with_size().type_id()._d() > 0 || type_information.minimal().typeid_with_size().type_id()._d() > 0) { - participant_->listener_->on_type_information_received( - participant_->participant_, topic_name, type_name, type_information); + listener->on_type_information_received( + participant, topic_name, type_name, type_information); } } } @@ -1659,15 +1685,15 @@ bool DomainParticipantImpl::new_remote_endpoint_discovered( uint16_t endpointId, EndpointKind_t kind) { - if (rtps_participant_ != nullptr) + if (get_rtps_participant() != nullptr) { if (kind == fastrtps::rtps::WRITER) { - return rtps_participant_->newRemoteWriterDiscovered(partguid, static_cast(endpointId)); + return get_rtps_participant()->newRemoteWriterDiscovered(partguid, static_cast(endpointId)); } else { - return rtps_participant_->newRemoteReaderDiscovered(partguid, static_cast(endpointId)); + return get_rtps_participant()->newRemoteReaderDiscovered(partguid, static_cast(endpointId)); } } @@ -1676,19 +1702,26 @@ bool DomainParticipantImpl::new_remote_endpoint_discovered( ResourceEvent& DomainParticipantImpl::get_resource_event() const { - return rtps_participant_->get_resource_event(); + assert(nullptr != get_rtps_participant()); + return get_rtps_participant()->get_resource_event(); } fastrtps::rtps::SampleIdentity DomainParticipantImpl::get_type_dependencies( const fastrtps::types::TypeIdentifierSeq& in) const { - return rtps_participant_->typelookup_manager()->get_type_dependencies(in); + const fastrtps::rtps::RTPSParticipant* rtps_participant = get_rtps_participant(); + return nullptr != rtps_participant ? + rtps_participant->typelookup_manager()->get_type_dependencies(in) : + builtin::INVALID_SAMPLE_IDENTITY; } fastrtps::rtps::SampleIdentity DomainParticipantImpl::get_types( const fastrtps::types::TypeIdentifierSeq& in) const { - return rtps_participant_->typelookup_manager()->get_types(in); + const fastrtps::rtps::RTPSParticipant* rtps_participant = get_rtps_participant(); + return nullptr != rtps_participant ? + rtps_participant->typelookup_manager()->get_types(in) : + builtin::INVALID_SAMPLE_IDENTITY; } ReturnCode_t DomainParticipantImpl::register_remote_type( @@ -1698,7 +1731,7 @@ ReturnCode_t DomainParticipantImpl::register_remote_type( { using namespace fastrtps::types; - if (rtps_participant_ == nullptr) + if (get_rtps_participant() == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } @@ -1739,7 +1772,7 @@ ReturnCode_t DomainParticipantImpl::register_remote_type( return register_dynamic_type(dyn); } } - else if (rtps_participant_->typelookup_manager() != nullptr) + else if (get_rtps_participant()->typelookup_manager() != nullptr) { TypeIdentifierSeq dependencies; TypeIdentifierSeq retrieve_objects; @@ -2014,7 +2047,7 @@ ReturnCode_t DomainParticipantImpl::register_dynamic_type( fastrtps::types::DynamicType_ptr dyn_type) { TypeSupport type(new fastrtps::types::DynamicPubSubType(dyn_type)); - return participant_->register_type(type); + return get_participant()->register_type(type); } void DomainParticipantImpl::remove_parent_request( @@ -2320,9 +2353,9 @@ void DomainParticipantImpl::create_instance_handle( DomainParticipantListener* DomainParticipantImpl::get_listener_for( const StatusMask& status) { - if (participant_->get_status_mask().is_active(status)) + if (get_participant()->get_status_mask().is_active(status)) { - return listener_; + return get_listener(); } return nullptr; } diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp index 2f1cd373977..6c91d196618 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp @@ -109,12 +109,14 @@ class DomainParticipantImpl ReturnCode_t set_listener( DomainParticipantListener* listener) { + std::lock_guard _(mtx_gs_); listener_ = listener; return ReturnCode_t::RETCODE_OK; } - const DomainParticipantListener* get_listener() const + DomainParticipantListener* get_listener() const { + std::lock_guard _(mtx_gs_); return listener_; } @@ -395,17 +397,27 @@ class DomainParticipantImpl ReturnCode_t get_current_time( fastrtps::Time_t& current_time) const; - const DomainParticipant* get_participant() const; + const DomainParticipant* get_participant() const + { + std::lock_guard _(mtx_gs_); + return participant_; + } - DomainParticipant* get_participant(); + DomainParticipant* get_participant() + { + std::lock_guard _(mtx_gs_); + return participant_; + } - const fastrtps::rtps::RTPSParticipant* rtps_participant() const + const fastrtps::rtps::RTPSParticipant* get_rtps_participant() const { + std::lock_guard _(mtx_gs_); return rtps_participant_; } - fastrtps::rtps::RTPSParticipant* rtps_participant() + fastrtps::rtps::RTPSParticipant* get_rtps_participant() { + std::lock_guard _(mtx_gs_); return rtps_participant_; } @@ -497,6 +509,9 @@ class DomainParticipantImpl //!Participant Listener DomainParticipantListener* listener_; + //! getter/setter mutex + mutable std::mutex mtx_gs_; + //!Publisher maps std::map publishers_; std::map publishers_by_handle_; diff --git a/src/cpp/fastdds/publisher/PublisherImpl.cpp b/src/cpp/fastdds/publisher/PublisherImpl.cpp index 29c6d179be7..03fbd9bc59b 100644 --- a/src/cpp/fastdds/publisher/PublisherImpl.cpp +++ b/src/cpp/fastdds/publisher/PublisherImpl.cpp @@ -110,7 +110,7 @@ PublisherImpl::PublisherImpl( , listener_(listen) , publisher_listener_(this) , user_publisher_(nullptr) - , rtps_participant_(p->rtps_participant()) + , rtps_participant_(p->get_rtps_participant()) , default_datawriter_qos_(DATAWRITER_QOS_DEFAULT) { PublisherAttributes pub_attr; diff --git a/src/cpp/fastdds/subscriber/SubscriberImpl.cpp b/src/cpp/fastdds/subscriber/SubscriberImpl.cpp index d545a4f73cb..0677cb45712 100644 --- a/src/cpp/fastdds/subscriber/SubscriberImpl.cpp +++ b/src/cpp/fastdds/subscriber/SubscriberImpl.cpp @@ -108,7 +108,7 @@ SubscriberImpl::SubscriberImpl( , listener_(listen) , subscriber_listener_(this) , user_subscriber_(nullptr) - , rtps_participant_(p->rtps_participant()) + , rtps_participant_(p->get_rtps_participant()) , default_datareader_qos_(DATAREADER_QOS_DEFAULT) { SubscriberAttributes sub_attr; diff --git a/src/cpp/rtps/network/ReceiverResource.cpp b/src/cpp/rtps/network/ReceiverResource.cpp index 8f91084876e..dc76716a11c 100644 --- a/src/cpp/rtps/network/ReceiverResource.cpp +++ b/src/cpp/rtps/network/ReceiverResource.cpp @@ -34,8 +34,10 @@ ReceiverResource::ReceiverResource( , LocatorMapsToManagedChannel(nullptr) , mValid(false) , mtx() + , cv_() , receiver(nullptr) , max_message_size_(max_recv_buffer_size) + , active_callbacks_(0) { // Internal channel is opened and assigned to this resource. mValid = transport.OpenInputChannel(locator, this, max_message_size_); @@ -58,6 +60,8 @@ ReceiverResource::ReceiverResource( ReceiverResource::ReceiverResource( ReceiverResource&& rValueResource) { + std::lock_guard _(mtx); + Cleanup.swap(rValueResource.Cleanup); LocatorMapsToManagedChannel.swap(rValueResource.LocatorMapsToManagedChannel); receiver = rValueResource.receiver; @@ -65,6 +69,8 @@ ReceiverResource::ReceiverResource( mValid = rValueResource.mValid; rValueResource.mValid = false; max_message_size_ = rValueResource.max_message_size_; + active_callbacks_ = rValueResource.active_callbacks_; + rValueResource.active_callbacks_ = 0; } bool ReceiverResource::SupportsLocator( @@ -80,7 +86,8 @@ bool ReceiverResource::SupportsLocator( void ReceiverResource::RegisterReceiver( MessageReceiver* rcv) { - std::unique_lock lock(mtx); + std::lock_guard _(mtx); + if (receiver == nullptr) { receiver = rcv; @@ -90,7 +97,8 @@ void ReceiverResource::RegisterReceiver( void ReceiverResource::UnregisterReceiver( MessageReceiver* rcv) { - std::unique_lock lock(mtx); + std::lock_guard _(mtx); + if (receiver == rcv) { receiver = nullptr; @@ -105,11 +113,14 @@ void ReceiverResource::OnDataReceived( { (void)localLocator; - std::unique_lock lock(mtx); + std::lock_guard _(mtx); + MessageReceiver* rcv = receiver; - if (rcv != nullptr) + if (rcv != nullptr && active_callbacks_ >= 0) { + ++active_callbacks_; + CDRMessage_t msg(0); msg.wraps = true; msg.buffer = const_cast(data); @@ -119,8 +130,13 @@ void ReceiverResource::OnDataReceived( // TODO: Should we unlock in case UnregisterReceiver is called from callback ? rcv->processCDRMsg(remoteLocator, localLocator, &msg); - } + // allow disabling + if (--active_callbacks_ == 0) + { + cv_.notify_one(); + } + } } void ReceiverResource::disable() @@ -129,6 +145,15 @@ void ReceiverResource::disable() { Cleanup(); } + + // wait until all callbacks are finished + std::unique_lock lock(mtx); + cv_.wait(lock, [this] + { + return active_callbacks_ <= 0; + }); + // no more callbacks + active_callbacks_ = -1; } ReceiverResource::~ReceiverResource() diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index c15665894b7..69901207ce0 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -448,13 +448,9 @@ void RTPSParticipantImpl::enable() void RTPSParticipantImpl::disable() { - if (nullptr == mp_builtinProtocols) - { - return; - } - - // Ensure that other participants will not accidentally discover this one - stopRTPSParticipantAnnouncement(); + // Disabling event thread also disables participant announcement, so there is no need to call + // stopRTPSParticipantAnnouncement() + mp_event_thr.stop_thread(); // Disable Retries on Transports m_network_Factory.Shutdown(); @@ -468,10 +464,11 @@ void RTPSParticipantImpl::disable() deleteAllUserEndpoints(); - mp_event_thr.stop_thread(); - - delete(mp_builtinProtocols); - mp_builtinProtocols = nullptr; + if (nullptr != mp_builtinProtocols) + { + delete(mp_builtinProtocols); + mp_builtinProtocols = nullptr; + } } const std::vector& RTPSParticipantImpl::getAllWriters() const diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index d90018b1399..21c75d8c518 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -321,6 +321,7 @@ class RTPSParticipantImpl */ inline RTPSParticipantListener* getListener() { + std::lock_guard _(*getParticipantMutex()); return mp_participantListener; } @@ -331,6 +332,7 @@ class RTPSParticipantImpl void set_listener( RTPSParticipantListener* listener) { + std::lock_guard _(*getParticipantMutex()); mp_participantListener = listener; } diff --git a/test/mock/dds/DomainParticipantImpl/fastdds/domain/DomainParticipantImpl.hpp b/test/mock/dds/DomainParticipantImpl/fastdds/domain/DomainParticipantImpl.hpp index c3d2ca8b5b7..c170201adbf 100644 --- a/test/mock/dds/DomainParticipantImpl/fastdds/domain/DomainParticipantImpl.hpp +++ b/test/mock/dds/DomainParticipantImpl/fastdds/domain/DomainParticipantImpl.hpp @@ -502,7 +502,7 @@ class DomainParticipantImpl return participant_; } - fastrtps::rtps::RTPSParticipant* rtps_participant() + fastrtps::rtps::RTPSParticipant* get_rtps_participant() { return rtps_participant_; } diff --git a/test/mock/rtps/RTPSParticipant/fastdds/rtps/participant/RTPSParticipant.h b/test/mock/rtps/RTPSParticipant/fastdds/rtps/participant/RTPSParticipant.h index 79637c70ad8..54f3eaf110c 100644 --- a/test/mock/rtps/RTPSParticipant/fastdds/rtps/participant/RTPSParticipant.h +++ b/test/mock/rtps/RTPSParticipant/fastdds/rtps/participant/RTPSParticipant.h @@ -131,7 +131,7 @@ class RTPS_DllAPI RTPSParticipant const GUID_t& pguid, int16_t userDefinedId)); - ResourceEvent& get_resource_event() + ResourceEvent& get_resource_event() const { return mp_event_thr; } @@ -194,7 +194,7 @@ class RTPS_DllAPI RTPSParticipant RTPSParticipantListener* listener_; const GUID_t m_guid; - ResourceEvent mp_event_thr; + mutable ResourceEvent mp_event_thr; RTPSParticipantAttributes attributes_; }; diff --git a/test/unittest/dds/participant/ParticipantTests.cpp b/test/unittest/dds/participant/ParticipantTests.cpp index e0b4b619bc7..47584eb46aa 100644 --- a/test/unittest/dds/participant/ParticipantTests.cpp +++ b/test/unittest/dds/participant/ParticipantTests.cpp @@ -617,7 +617,7 @@ void get_rtps_attributes( ASSERT_NE(nullptr, participant_test); const DomainParticipantImpl* participant_impl = participant_test->get_impl(); ASSERT_NE(nullptr, participant_impl); - att = participant_impl->rtps_participant()->getRTPSParticipantAttributes(); + att = participant_impl->get_rtps_participant()->getRTPSParticipantAttributes(); } void helper_wait_for_at_least_entries(