Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21137] RTPS WriterHistory refactor #4966

Merged
merged 45 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a091963
Refs #21082. Remove templated version of `new_change`.
MiguelCompany Jun 11, 2024
296f256
Refs #21082. Call simple version of `new_change` in rtps examples.
MiguelCompany Jun 13, 2024
9a5f50b
Refs #21082. Call simple version of `new_change` in `TypeLookupManager`.
MiguelCompany Jun 13, 2024
a7f73d6
Refs #21082. Refactor on EDP.
MiguelCompany Jun 13, 2024
813d8d2
Refs #21082. Refactor on PDP.
MiguelCompany Jun 14, 2024
93df4f5
Refs #21082. Refactor on SecurityManager.
MiguelCompany Jun 14, 2024
2999c0d
Refs #21082. Refactor on unit tests.
MiguelCompany Jun 14, 2024
1e08fac
Refs #21082. Refactor on WLP.
MiguelCompany Jun 14, 2024
0af5811
Refs #21082. Refactor on MonitorService.
MiguelCompany Jun 14, 2024
65ee24b
Refs #21082. Refactor on blackbox tests.
MiguelCompany Jun 14, 2024
1d5eaba
Refs #21082. Fix blackbox tests.
MiguelCompany Jun 17, 2024
88c43f5
Refs #21082. Remove `History::do_reserve_cache`.
MiguelCompany Jun 14, 2024
0a5c27e
Refs #21082. Remove `new_change` overload receiving `std::function`.
MiguelCompany Jun 14, 2024
6c195d2
Refs #21082. Remove `is_pool_initialized`.
MiguelCompany Jun 14, 2024
42368b6
Refs #21082. WriterHistory keeps a change pool.
MiguelCompany Jun 14, 2024
9d60289
Refs #21082. Implementation of `release_change` moved to WriterHistory.
MiguelCompany Jun 14, 2024
5ceda32
Refs #21082. Remove `release_change` from `RTPSWriter`.
MiguelCompany Jun 14, 2024
da8c9cf
Refs #21082. Add `create_change` to `WriterHistory`.
MiguelCompany Jun 17, 2024
d4bc402
Refs #21082. Change calls from `new_change` to `create_change`.
MiguelCompany Jun 17, 2024
161334f
Refs #21082. Remove `new_change` from `RTPSWriter`.
MiguelCompany Jun 17, 2024
7a80559
Refs #21082. Remove `remove_older_changes` from `RTPSWriter`.
MiguelCompany Jun 17, 2024
27fb7ac
Refs #21082. Add payload pool to `WriterHistory`.
MiguelCompany Jun 17, 2024
be46017
Refs #21082. Add new `create_change` overload.
MiguelCompany Jun 17, 2024
163f458
Refs #21082. Using new `create_change` overload where relevant.
MiguelCompany Jun 17, 2024
a433f24
Refs #21082. Refactor on `IPersistenceService`.
MiguelCompany Jun 17, 2024
7a0eced
Refs #21082. Refactor on `DataWriterHistory`.
MiguelCompany Jun 17, 2024
6af2861
Refs #21082. Several methods to create writers removed.
MiguelCompany Jun 17, 2024
75fcd15
Refs #21082. Move data sharing pool initialization to DataWriterImpl.
MiguelCompany Jun 18, 2024
dfa7b83
Refs #21082. Refactor creation of rtps writer.
MiguelCompany Jun 18, 2024
91e0e68
Refs #21082. Refactor PersistentWriter.
MiguelCompany Jun 18, 2024
9e4f645
Refs #21082. Remove constructors taking pools.
MiguelCompany Jun 18, 2024
7b30668
Refs #21082. RTPSWriter does not handle pools.
MiguelCompany Jun 18, 2024
10241f1
Refs #21082. Add pool getters to WriterHistory.
MiguelCompany Jun 18, 2024
72cee5b
Refs #21082. Remove pool references on StatexxxWriter.
MiguelCompany Jun 18, 2024
835b145
Refs #21082. Move pools from `Endpoint` to `BaseReader`.
MiguelCompany Jun 18, 2024
aa3ae1a
Refs #21082. Avoid accessing history attributes.
MiguelCompany Jun 18, 2024
aec5ad9
Refs #21082. Fixes on SecurityManager.
MiguelCompany Jun 18, 2024
84c2c78
Refs #21082. Fix build after rebase.
MiguelCompany Jun 19, 2024
f66a095
Refs #21082. Please linters.
MiguelCompany Jun 19, 2024
ced028c
Refs #21082. Fix release order in `WriterHistory` destructor.
MiguelCompany Jun 19, 2024
c79ba3c
Refs #21082. Move datasharing pool initialization to `RTPSWriter`.
MiguelCompany Jun 19, 2024
1eebb46
Refs #21082. Fix negative tests.
MiguelCompany Jun 19, 2024
fe9a1af
Refs #21137. Apply review suggestions.
MiguelCompany Jun 25, 2024
f614bb3
Refs #21137. Apply suggestions from code review
MiguelCompany Jun 26, 2024
89c4df5
Refs #21137. Add note to versions.md
EduPonz Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions examples/cpp/rtps/AsSocket/TestWriterSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,7 @@ void TestWriterSocket::run(
{
for (int i = 0; i < nmsgs; ++i )
{
CacheChange_t* ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
CacheChange_t* ch = mp_history->create_change(255, ALIVE);
#if defined(_WIN32)
ch->serializedPayload.length =
sprintf_s((char*)ch->serializedPayload.data, 255, "My example string %d", i) + 1;
Expand Down
12 changes: 3 additions & 9 deletions examples/cpp/rtps/Persistent/TestWriterPersistent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,12 @@ void TestWriterPersistent::run(

for (int i = 0; i < samples; ++i )
{
CacheChange_t* ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
CacheChange_t* ch = mp_history->create_change(255, ALIVE);
if (!ch) // In the case history is full, remove some old changes
{
std::cout << "cleaning history...";
mp_writer->remove_older_changes(20);
ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
mp_history->remove_min_change();
ch = mp_history->create_change(255, ALIVE);
}

#if defined(_WIN32)
Expand Down
12 changes: 3 additions & 9 deletions examples/cpp/rtps/Registered/TestWriterRegistered.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,12 @@ void TestWriterRegistered::run(

for (int i = 0; i < samples; ++i )
{
CacheChange_t* ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
CacheChange_t* ch = mp_history->create_change(255, ALIVE);
if (!ch) // In the case history is full, remove some old changes
{
std::cout << "cleaning history...";
mp_writer->remove_older_changes(20);
ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
mp_history->remove_min_change();
ch = mp_history->create_change(255, ALIVE);
}

#if defined(_WIN32)
Expand Down
11 changes: 0 additions & 11 deletions include/fastdds/rtps/Endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ class Endpoint

virtual ~Endpoint()
{
// As releasing the change pool will delete the cache changes it owns,
// the payload pool may be called to release their payloads, so we should
// ensure that the payload pool is destroyed after the change pool.
change_pool_.reset();
payload_pool_.reset();
}

public:
Expand Down Expand Up @@ -120,12 +115,6 @@ class Endpoint
//!Endpoint Mutex
mutable RecursiveTimedMutex mp_mutex;

//!Pool of serialized payloads.
std::shared_ptr<IPayloadPool> payload_pool_;

//!Pool of cache changes.
std::shared_ptr<IChangePool> change_pool_;

//!Fixed size of payloads
uint32_t fixed_payload_size_ = 0;

Expand Down
83 changes: 11 additions & 72 deletions include/fastdds/rtps/RTPSDomain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,92 +113,32 @@ class RTPSDomain

/**
* Create a RTPSWriter in a participant.
* @param p Pointer to the RTPSParticipant.
* @param watt Writer Attributes.
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
* @return Pointer to the created RTPSWriter.
*
* \warning The returned pointer is invalidated after a call to removeRTPSWriter() or stopAll(),
* so its use may result in undefined behaviour.
*/
FASTDDS_EXPORTED_API static RTPSWriter* createRTPSWriter(
RTPSParticipant* p,
WriterAttributes& watt,
WriterHistory* hist,
WriterListener* listen = nullptr);

/**
* Create a RTPSWriter in a participant using a custom payload pool.
* @param p Pointer to the RTPSParticipant.
* @param watt Writer Attributes.
* @param payload_pool Shared pointer to the IPayloadPool
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
* @return Pointer to the created RTPSWriter.
*
* \warning The returned pointer is invalidated after a call to removeRTPSWriter() or stopAll(),
* so its use may result in undefined behaviour.
*/
FASTDDS_EXPORTED_API static RTPSWriter* createRTPSWriter(
RTPSParticipant* p,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
WriterHistory* hist,
WriterListener* listen = nullptr);

/**
* Create a RTPSWriter in a participant using a custom payload pool.
* @param p Pointer to the RTPSParticipant.
* @param watt Writer Attributes.
* @param payload_pool Shared pointer to the IPayloadPool
* @param change_pool Shared pointer to the IChangePool
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
* @return Pointer to the created RTPSWriter.
* @param p Pointer to the RTPSParticipant.
* @param watt Writer Attributes.
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
*
* \warning The returned pointer is invalidated after a call to removeRTPSWriter() or stopAll(),
* so its use may result in undefined behaviour.
*/
FASTDDS_EXPORTED_API static RTPSWriter* createRTPSWriter(
RTPSParticipant* p,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
WriterHistory* hist,
WriterListener* listen = nullptr);

/**
* Create a RTPSWriter in a participant using a custom payload pool.
* @param p Pointer to the RTPSParticipant.
* @param entity_id Specific entity id to use for the created writer.
* @param watt Writer Attributes.
* @param payload_pool Shared pointer to the IPayloadPool
* @param change_pool Shared pointer to the IChangePool
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
* @return Pointer to the created RTPSWriter.
*
* \warning The returned pointer is invalidated after a call to removeRTPSWriter() or stopAll(),
* so its use may result in undefined behaviour.
*/
FASTDDS_EXPORTED_API static RTPSWriter* createRTPSWriter(
RTPSParticipant* p,
const EntityId_t& entity_id,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
WriterHistory* hist,
WriterListener* listen = nullptr);

/**
* Create a RTPSWriter in a participant.
* @param p Pointer to the RTPSParticipant.
* @param entity_id Specific entity id to use for the created writer.
* @param watt Writer Attributes.
* @param payload_pool Shared pointer to the IPayloadPool
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
*
* @param p Pointer to the RTPSParticipant.
* @param entity_id Specific entity id to use for the created writer.
* @param watt Writer Attributes.
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
*
* @return Pointer to the created RTPSWriter.
*
* \warning The returned pointer is invalidated after a call to removeRTPSWriter() or stopAll(),
Expand All @@ -208,7 +148,6 @@ class RTPSDomain
RTPSParticipant* p,
const EntityId_t& entity_id,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
WriterHistory* hist,
WriterListener* listen = nullptr);

Expand Down
4 changes: 0 additions & 4 deletions include/fastdds/rtps/history/History.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,6 @@ class History
//!Print the seqNum of the changes in the History (for debuggisi, mng purposes).
void print_changes_seqNum2();

FASTDDS_EXPORTED_API virtual bool do_reserve_cache(
CacheChange_t** change,
uint32_t size) = 0;

FASTDDS_EXPORTED_API virtual void do_release_cache(
CacheChange_t* ch) = 0;

Expand Down
4 changes: 0 additions & 4 deletions include/fastdds/rtps/history/ReaderHistory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,6 @@ class ReaderHistory : public History

protected:

FASTDDS_EXPORTED_API bool do_reserve_cache(
CacheChange_t** change,
uint32_t size) override;

FASTDDS_EXPORTED_API void do_release_cache(
CacheChange_t* ch) override;

Expand Down
115 changes: 106 additions & 9 deletions include/fastdds/rtps/history/WriterHistory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,23 @@
#ifndef FASTDDS_RTPS_HISTORY__WRITERHISTORY_HPP
#define FASTDDS_RTPS_HISTORY__WRITERHISTORY_HPP

#include <fastdds/rtps/history/History.hpp>
#include <cstdint>
#include <memory>

#include <fastdds/fastdds_dll.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/common/CacheChange.hpp>
#include <fastdds/rtps/common/ChangeKind_t.hpp>
#include <fastdds/rtps/common/InstanceHandle.hpp>
#include <fastdds/rtps/history/History.hpp>
#include <fastdds/rtps/history/IChangePool.hpp>
#include <fastdds/rtps/history/IPayloadPool.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

class HistoryAttributes;
class RTPSWriter;
class WriteParams;

Expand All @@ -48,12 +58,81 @@ class WriterHistory : public rtps::History
public:

/**
* Constructor of the WriterHistory.
* @brief Construct a WriterHistory.
*
* @param att Attributes configuring the WriterHistory.
*/
FASTDDS_EXPORTED_API WriterHistory(
const HistoryAttributes& att);

/**
* @brief Construct a WriterHistory with a custom payload pool.
*
* @param att Attributes configuring the WriterHistory.
* @param payload_pool Pool of payloads to be used by the WriterHistory.
*/
FASTDDS_EXPORTED_API WriterHistory(
const HistoryAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool);

/**
* @brief Construct a WriterHistory with custom payload and change pools.
*
* @param att Attributes configuring the WriterHistory.
* @param payload_pool Pool of payloads to be used by the WriterHistory.
* @param change_pool Pool of changes to be used by the WriterHistory.
*/
FASTDDS_EXPORTED_API WriterHistory(
const HistoryAttributes& att);
const HistoryAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool);

FASTDDS_EXPORTED_API virtual ~WriterHistory() override;

/**
* @brief Get the payload pool used by this history.
*
* @return Reference to the payload pool used by this history.
*/
FASTDDS_EXPORTED_API const std::shared_ptr<IPayloadPool>& get_payload_pool() const;

/**
* @brief Get the change pool used by this history.
*
* @return Reference to the change pool used by this history.
*/
FASTDDS_EXPORTED_API const std::shared_ptr<IChangePool>& get_change_pool() const;

/**
* @brief Create a new CacheChange_t object.
*
* @param change_kind Kind of the change.
* @param handle InstanceHandle_t of the change.
*
* @return Pointer to the new CacheChange_t object.
*
* @pre A writer has been associated with this history
*/
FASTDDS_EXPORTED_API CacheChange_t* create_change(
ChangeKind_t change_kind,
InstanceHandle_t handle = c_InstanceHandle_Unknown);

/**
* @brief Create a new CacheChange_t object with a specific payload size.
*
* @param payload_size Size of the payload.
* @param change_kind Kind of the change.
* @param handle InstanceHandle_t of the change.
*
* @return Pointer to the new CacheChange_t object.
*
* @pre A writer has been associated with this history
*/
FASTDDS_EXPORTED_API CacheChange_t* create_change(
uint32_t payload_size,
ChangeKind_t change_kind,
InstanceHandle_t handle = c_InstanceHandle_Unknown);

/**
* Add a CacheChange_t to the WriterHistory.
* @param a_change Pointer to the CacheChange_t to be added.
Expand Down Expand Up @@ -141,11 +220,24 @@ class WriterHistory : public rtps::History
return m_lastCacheChangeSeqNum + 1;
}

protected:
/**
* Release a change when it is not being used anymore.
*
* @param ch Pointer to the cache change to be released.
*
* @returns whether the operation succeeded or not
*
* @pre
* @li A writer has been associated with this history
* @li @c ch is not @c nullptr
* @li @c ch points to a cache change obtained from a call to @c this->create_change
*
* @post memory pointed to by @c ch is not accessed
*/
FASTDDS_EXPORTED_API bool release_change(
CacheChange_t* ch);

FASTDDS_EXPORTED_API bool do_reserve_cache(
CacheChange_t** change,
uint32_t size) override;
protected:

FASTDDS_EXPORTED_API void do_release_cache(
CacheChange_t* ch) override;
Expand Down Expand Up @@ -211,9 +303,9 @@ class WriterHistory : public rtps::History
}

//!Last CacheChange Sequence Number added to the History.
SequenceNumber_t m_lastCacheChangeSeqNum;
SequenceNumber_t m_lastCacheChangeSeqNum {};
//!Pointer to the associated RTPSWriter;
RTPSWriter* mp_writer;
RTPSWriter* mp_writer = nullptr;

uint32_t high_mark_for_frag_ = 0;

Expand Down Expand Up @@ -248,6 +340,11 @@ class WriterHistory : public rtps::History

void set_fragments(
CacheChange_t* change);

/// Reference to the change pool used by this history.
std::shared_ptr<IChangePool> change_pool_;
/// Reference to the payload pool used by this history.
std::shared_ptr<IPayloadPool> payload_pool_;
};

} // namespace rtps
Expand Down
Loading
Loading