Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[15250] Fixing datarace on listener callbacks #2889

Merged
merged 10 commits into from
Aug 2, 2022
2 changes: 2 additions & 0 deletions include/fastdds/dds/domain/DomainParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ class DomainParticipant : public Entity
/**
* @brief Getter for the resource event
*
* @pre The DomainParticipant is enabled.
*
* @return A reference to the resource event
*/
RTPS_DllAPI fastrtps::rtps::ResourceEvent& get_resource_event() const;
Expand Down
64 changes: 41 additions & 23 deletions include/fastdds/rtps/network/ReceiverResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
#ifndef _FASTDDS_RTPS_RECEIVER_RESOURCE_H
#define _FASTDDS_RTPS_RECEIVER_RESOURCE_H

#include <condition_variable>
#include <functional>
#include <vector>
#include <memory>
#include <vector>

#include <fastdds/rtps/messages/MessageReceiver.h>
#include <fastdds/rtps/transport/TransportInterface.h>

Expand All @@ -34,39 +36,46 @@ namespace rtps {
*/
class ReceiverResource : public fastdds::rtps::TransportReceiverInterface
{
//! Only NetworkFactory is ever allowed to construct a ReceiverResource from scratch.
//! In doing so, it guarantees the transport and channel are in a valid state for
//! this resource to exist.
//! Only NetworkFactory is ever allowed to construct a ReceiverResource from scratch.
//! In doing so, it guarantees the transport and channel are in a valid state for
//! this resource to exist.
friend class NetworkFactory;

public:

/**
* Method called by the transport when receiving data.
* @param data Pointer to the received data.
* @param size Number of bytes received.
* @param localLocator Locator identifying the local endpoint.
* @param remoteLocator Locator identifying the remote endpoint.
*/
virtual void OnDataReceived(const octet* data, const uint32_t size,
const Locator_t& localLocator, const Locator_t& remoteLocator) override;
* Method called by the transport when receiving data.
* @param data Pointer to the received data.
* @param size Number of bytes received.
* @param localLocator Locator identifying the local endpoint.
* @param remoteLocator Locator identifying the remote endpoint.
*/
virtual void OnDataReceived(
const octet* data,
const uint32_t size,
const Locator_t& localLocator,
const Locator_t& remoteLocator) override;

/**
* Reports whether this resource supports the given local locator (i.e., said locator
* maps to the transport channel managed by this resource).
*/
bool SupportsLocator(const Locator_t& localLocator);
bool SupportsLocator(
const Locator_t& localLocator);

/**
* Register a MessageReceiver object to be called upon reception of data.
* @param receiver The message receiver to register.
*/
void RegisterReceiver(MessageReceiver* receiver);
void RegisterReceiver(
MessageReceiver* receiver);

/**
* Unregister a MessageReceiver object to be called upon reception of data.
* @param receiver The message receiver to unregister.
*/
void UnregisterReceiver(MessageReceiver* receiver);
* Unregister a MessageReceiver object to be called upon reception of data.
* @param receiver The message receiver to unregister.
*/
void UnregisterReceiver(
MessageReceiver* receiver);

/**
* Closes related ChannelResources.
Expand All @@ -82,23 +91,32 @@ class ReceiverResource : public fastdds::rtps::TransportReceiverInterface
* Resources can only be transfered through move semantics. Copy, assignment, and
* construction outside of the factory are forbidden.
*/
ReceiverResource(ReceiverResource&&);
ReceiverResource(
ReceiverResource&&);

~ReceiverResource() override;

private:
ReceiverResource() = delete;
ReceiverResource(const ReceiverResource&) = delete;
ReceiverResource& operator=(const ReceiverResource&) = delete;

ReceiverResource(fastdds::rtps::TransportInterface&, const Locator_t&, uint32_t);
ReceiverResource() = delete;
ReceiverResource(
const ReceiverResource&) = delete;
ReceiverResource& operator =(
const ReceiverResource&) = delete;

ReceiverResource(
fastdds::rtps::TransportInterface&,
const Locator_t&,
uint32_t);
std::function<void()> Cleanup;
std::function<bool(const Locator_t&)> LocatorMapsToManagedChannel;
bool mValid; // Post-construction validity check for the NetworkFactory

std::mutex mtx;
std::condition_variable cv_;
MessageReceiver* receiver;
uint32_t max_message_size_;
int active_callbacks_;
};

} // namespace rtps
Expand Down
Loading