Skip to content

Commit

Permalink
Fixing datarace on listener callbacks (#2889)
Browse files Browse the repository at this point in the history
* Refs 15250. Updating ReceiverResource closure

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 15250. Protect listener's getter & setter

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 15250. Fixing participant's disable()

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 15250. Protecting DomainParticipantImpl getters and setters

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 15250. Moving get_rtps_participant() definition to a header to avoid linking error on ParticipantTests.
The test binary links to fastrtps which doesn't export the method thus it must rely on a header definition.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 15250. Fixing potential deadlock

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 15250. Make DDS layer delegate into the user the resposability of having the listener alive (as RTPS layer does)

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs #15250. Addressed review comments.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15250. Addressed more review comments.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15250. Precondition on get_resource_event.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
MiguelBarro and MiguelCompany authored Aug 2, 2022
1 parent 75c536b commit 203e957
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 138 deletions.
2 changes: 2 additions & 0 deletions include/fastdds/dds/domain/DomainParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ class DomainParticipant : public Entity
/**
* @brief Getter for the resource event
*
* @pre The DomainParticipant is enabled.
*
* @return A reference to the resource event
*/
RTPS_DllAPI fastrtps::rtps::ResourceEvent& get_resource_event() const;
Expand Down
64 changes: 41 additions & 23 deletions include/fastdds/rtps/network/ReceiverResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
#ifndef _FASTDDS_RTPS_RECEIVER_RESOURCE_H
#define _FASTDDS_RTPS_RECEIVER_RESOURCE_H

#include <condition_variable>
#include <functional>
#include <vector>
#include <memory>
#include <vector>

#include <fastdds/rtps/messages/MessageReceiver.h>
#include <fastdds/rtps/transport/TransportInterface.h>

Expand All @@ -34,39 +36,46 @@ namespace rtps {
*/
class ReceiverResource : public fastdds::rtps::TransportReceiverInterface
{
//! Only NetworkFactory is ever allowed to construct a ReceiverResource from scratch.
//! In doing so, it guarantees the transport and channel are in a valid state for
//! this resource to exist.
//! Only NetworkFactory is ever allowed to construct a ReceiverResource from scratch.
//! In doing so, it guarantees the transport and channel are in a valid state for
//! this resource to exist.
friend class NetworkFactory;

public:

/**
* Method called by the transport when receiving data.
* @param data Pointer to the received data.
* @param size Number of bytes received.
* @param localLocator Locator identifying the local endpoint.
* @param remoteLocator Locator identifying the remote endpoint.
*/
virtual void OnDataReceived(const octet* data, const uint32_t size,
const Locator_t& localLocator, const Locator_t& remoteLocator) override;
* Method called by the transport when receiving data.
* @param data Pointer to the received data.
* @param size Number of bytes received.
* @param localLocator Locator identifying the local endpoint.
* @param remoteLocator Locator identifying the remote endpoint.
*/
virtual void OnDataReceived(
const octet* data,
const uint32_t size,
const Locator_t& localLocator,
const Locator_t& remoteLocator) override;

/**
* Reports whether this resource supports the given local locator (i.e., said locator
* maps to the transport channel managed by this resource).
*/
bool SupportsLocator(const Locator_t& localLocator);
bool SupportsLocator(
const Locator_t& localLocator);

/**
* Register a MessageReceiver object to be called upon reception of data.
* @param receiver The message receiver to register.
*/
void RegisterReceiver(MessageReceiver* receiver);
void RegisterReceiver(
MessageReceiver* receiver);

/**
* Unregister a MessageReceiver object to be called upon reception of data.
* @param receiver The message receiver to unregister.
*/
void UnregisterReceiver(MessageReceiver* receiver);
* Unregister a MessageReceiver object to be called upon reception of data.
* @param receiver The message receiver to unregister.
*/
void UnregisterReceiver(
MessageReceiver* receiver);

/**
* Closes related ChannelResources.
Expand All @@ -82,23 +91,32 @@ class ReceiverResource : public fastdds::rtps::TransportReceiverInterface
* Resources can only be transfered through move semantics. Copy, assignment, and
* construction outside of the factory are forbidden.
*/
ReceiverResource(ReceiverResource&&);
ReceiverResource(
ReceiverResource&&);

~ReceiverResource() override;

private:
ReceiverResource() = delete;
ReceiverResource(const ReceiverResource&) = delete;
ReceiverResource& operator=(const ReceiverResource&) = delete;

ReceiverResource(fastdds::rtps::TransportInterface&, const Locator_t&, uint32_t);
ReceiverResource() = delete;
ReceiverResource(
const ReceiverResource&) = delete;
ReceiverResource& operator =(
const ReceiverResource&) = delete;

ReceiverResource(
fastdds::rtps::TransportInterface&,
const Locator_t&,
uint32_t);
std::function<void()> Cleanup;
std::function<bool(const Locator_t&)> LocatorMapsToManagedChannel;
bool mValid; // Post-construction validity check for the NetworkFactory

std::mutex mtx;
std::condition_variable cv_;
MessageReceiver* receiver;
uint32_t max_message_size_;
int active_callbacks_;
};

} // namespace rtps
Expand Down
Loading

0 comments on commit 203e957

Please sign in to comment.