From cc281a11ff0f0ee65e92e7bcad5b8cb83c4f8b96 Mon Sep 17 00:00:00 2001 From: Raul Sanchez-Mateos Lizano Date: Wed, 17 Jan 2024 14:06:40 +0100 Subject: [PATCH] Custom pools on DDS layer feature (#3755) (#4246) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Custom pools on DDS layer feature (#3755) * Custom Payload pools test implementation (#3719) * Refs #19024: Public API implementation Signed-off-by: JesusPoderoso * Refs #19024: Update versions.md Signed-off-by: JesusPoderoso * Refs #19023: Fix build issues Signed-off-by: JesusPoderoso * Refs #19023: Custom Payload pools test implementation Signed-off-by: Javier Santiago * Refs #19023: Update test to use public API Signed-off-by: JesusPoderoso * Refs #19023: Please linters Signed-off-by: JesusPoderoso * Refs #19023: Added delay between writing and checking payload request Signed-off-by: Javier Santiago --------- Signed-off-by: JesusPoderoso Signed-off-by: Javier Santiago Co-authored-by: JesusPoderoso Signed-off-by: Eduardo Ponz * Include custom pools impl (#3740) Signed-off-by: JesusPoderoso Signed-off-by: Eduardo Ponz * Refs #19024: Modified custom payload pool and datasharing interaction Signed-off-by: Javier Santiago Signed-off-by: Eduardo Ponz * Refs #19024. Correctly set payload owner on test pools. Signed-off-by: Miguel Company Signed-off-by: Eduardo Ponz --------- Signed-off-by: JesusPoderoso Signed-off-by: Javier Santiago Signed-off-by: Eduardo Ponz Signed-off-by: Miguel Company Co-authored-by: jsantiago-eProsima <90755661+jsantiago-eProsima@users.noreply.github.com> Co-authored-by: Javier Santiago Co-authored-by: Miguel Company * Add API to create writers and readers with a custom payload pool Signed-off-by: Raul Sanchez-Mateos * Use correct API in tests Signed-off-by: Raul Sanchez-Mateos * Fix Window warning about asio include Signed-off-by: Raul Sanchez-Mateos * Remove feature in forthcoming version file Signed-off-by: Raul Sanchez-Mateos * Add Asio include dir to datareader tests Signed-off-by: Raul Sanchez-Mateos --------- Signed-off-by: JesusPoderoso Signed-off-by: Javier Santiago Signed-off-by: Eduardo Ponz Signed-off-by: Miguel Company Signed-off-by: Raul Sanchez-Mateos Co-authored-by: Jesús Poderoso <120394830+JesusPoderoso@users.noreply.github.com> Co-authored-by: jsantiago-eProsima <90755661+jsantiago-eProsima@users.noreply.github.com> Co-authored-by: Javier Santiago Co-authored-by: Miguel Company --- include/fastdds/dds/publisher/Publisher.hpp | 34 ++++++ include/fastdds/dds/subscriber/Subscriber.hpp | 34 ++++++ src/cpp/fastdds/publisher/DataWriterImpl.cpp | 21 +++- src/cpp/fastdds/publisher/DataWriterImpl.hpp | 5 +- src/cpp/fastdds/publisher/Publisher.cpp | 24 +++- src/cpp/fastdds/publisher/PublisherImpl.cpp | 15 ++- src/cpp/fastdds/publisher/PublisherImpl.hpp | 9 +- src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 31 ++++-- src/cpp/fastdds/subscriber/DataReaderImpl.hpp | 7 +- src/cpp/fastdds/subscriber/Subscriber.cpp | 24 +++- src/cpp/fastdds/subscriber/SubscriberImpl.cpp | 15 ++- src/cpp/fastdds/subscriber/SubscriberImpl.hpp | 9 +- .../fastdds/publisher/DataWriterImpl.hpp | 3 +- .../fastdds/publisher/PublisherImpl.hpp | 5 +- .../fastdds/subscriber/DataReaderImpl.hpp | 3 +- .../fastdds/subscriber/SubscriberImpl.hpp | 5 +- .../api/dds-pim/CustomPayloadPool.hpp | 104 ++++++++++++++++++ .../blackbox/common/DDSBlackboxTestsBasic.cpp | 62 +++++++++++ test/unittest/common/CustomPayloadPool.hpp | 104 ++++++++++++++++++ .../dds/publisher/DataWriterTests.cpp | 54 +++++++++ test/unittest/dds/subscriber/CMakeLists.txt | 4 + .../dds/subscriber/DataReaderTests.cpp | 69 ++++++++++++ .../fastdds/publisher/PublisherImpl.hpp | 5 +- 23 files changed, 604 insertions(+), 42 deletions(-) create mode 100644 test/blackbox/api/dds-pim/CustomPayloadPool.hpp create mode 100644 test/unittest/common/CustomPayloadPool.hpp diff --git a/include/fastdds/dds/publisher/Publisher.hpp b/include/fastdds/dds/publisher/Publisher.hpp index 8f6e1a40838..a1423b40a9c 100644 --- a/include/fastdds/dds/publisher/Publisher.hpp +++ b/include/fastdds/dds/publisher/Publisher.hpp @@ -163,6 +163,23 @@ class Publisher : public DomainEntity DataWriterListener* listener = nullptr, const StatusMask& mask = StatusMask::all()); + /** + * This operation creates a DataWriter. The returned DataWriter will be attached and belongs to the Publisher. + * + * @param topic Topic the DataWriter will be listening + * @param qos QoS of the DataWriter. + * @param payload_pool IPayloadPool shared pointer that defines writer payload (default: nullptr). + * @param listener Pointer to the listener (default: nullptr). + * @param mask StatusMask that holds statuses the listener responds to (default: all). + * @return Pointer to the created DataWriter. nullptr if failed. + */ + RTPS_DllAPI DataWriter* create_datawriter_with_payload_pool( + Topic* topic, + const DataWriterQos& qos, + std::shared_ptr payload_pool, + DataWriterListener* listener = nullptr, + const StatusMask& mask = StatusMask::all()); + /** * This operation creates a DataWriter. The returned DataWriter will be attached and belongs to the Publisher. * @@ -178,6 +195,23 @@ class Publisher : public DomainEntity DataWriterListener* listener = nullptr, const StatusMask& mask = StatusMask::all()); + /** + * This operation creates a DataWriter. The returned DataWriter will be attached and belongs to the Publisher. + * + * @param topic Topic the DataWriter will be listening + * @param profile_name DataWriter profile name. + * @param payload_pool IPayloadPool shared pointer that defines writer payload (default: nullptr). + * @param listener Pointer to the listener (default: nullptr). + * @param mask StatusMask that holds statuses the listener responds to (default: all). + * @return Pointer to the created DataWriter. nullptr if failed. + */ + RTPS_DllAPI DataWriter* create_datawriter_with_profile_with_payload_pool( + Topic* topic, + const std::string& profile_name, + std::shared_ptr payload_pool, + DataWriterListener* listener = nullptr, + const StatusMask& mask = StatusMask::all()); + /** * This operation deletes a DataWriter that belongs to the Publisher. * diff --git a/include/fastdds/dds/subscriber/Subscriber.hpp b/include/fastdds/dds/subscriber/Subscriber.hpp index 337ace92387..bb5b78c5d4c 100644 --- a/include/fastdds/dds/subscriber/Subscriber.hpp +++ b/include/fastdds/dds/subscriber/Subscriber.hpp @@ -170,6 +170,23 @@ class Subscriber : public DomainEntity DataReaderListener* listener = nullptr, const StatusMask& mask = StatusMask::all()); + /** + * This operation creates a DataReader. The returned DataReader will be attached and belong to the Subscriber. + * + * @param topic Topic the DataReader will be listening. + * @param reader_qos QoS of the DataReader. + * @param payload_pool IPayloadPool shared pointer that defines reader payload (default: nullptr). + * @param listener Pointer to the listener (default: nullptr) + * @param mask StatusMask that holds statuses the listener responds to (default: all). + * @return Pointer to the created DataReader. nullptr if failed. + */ + RTPS_DllAPI DataReader* create_datareader_with_payload_pool( + TopicDescription* topic, + const DataReaderQos& reader_qos, + std::shared_ptr payload_pool, + DataReaderListener* listener = nullptr, + const StatusMask& mask = StatusMask::all()); + /** * This operation creates a DataReader. The returned DataReader will be attached and belongs to the Subscriber. * @@ -185,6 +202,23 @@ class Subscriber : public DomainEntity DataReaderListener* listener = nullptr, const StatusMask& mask = StatusMask::all()); + /** + * This operation creates a DataReader. The returned DataReader will be attached and belongs to the Subscriber. + * + * @param topic Topic the DataReader will be listening. + * @param profile_name DataReader profile name. + * @param payload_pool IPayloadPool shared pointer that defines reader payload (default: nullptr). + * @param listener Pointer to the listener (default: nullptr) + * @param mask StatusMask that holds statuses the listener responds to (default: all). + * @return Pointer to the created DataReader. nullptr if failed. + */ + RTPS_DllAPI DataReader* create_datareader_with_profile_with_payload_pool( + TopicDescription* topic, + const std::string& profile_name, + std::shared_ptr payload_pool, + DataReaderListener* listener = nullptr, + const StatusMask& mask = StatusMask::all()); + /** * This operation deletes a DataReader that belongs to the Subscriber. * diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index c465b46bd8f..3ecbd1b8c50 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -146,7 +146,8 @@ DataWriterImpl::DataWriterImpl( TypeSupport type, Topic* topic, const DataWriterQos& qos, - DataWriterListener* listen) + DataWriterListener* listen, + std::shared_ptr payload_pool) : publisher_(p) , type_(type) , topic_(topic) @@ -174,6 +175,12 @@ DataWriterImpl::DataWriterImpl( fastrtps::rtps::RTPSParticipantImpl::preprocess_endpoint_attributes( EntityId_t::unknown(), publisher_->get_participant_impl()->id_counter(), endpoint_attributes, guid_.entityId); guid_.guidPrefix = publisher_->get_participant_impl()->guid().guidPrefix; + + if (payload_pool != nullptr) + { + is_custom_payload_pool_ = true; + payload_pool_ = payload_pool; + } } DataWriterImpl::DataWriterImpl( @@ -1981,7 +1988,7 @@ bool DataWriterImpl::release_payload_pool() bool result = true; - if (is_data_sharing_compatible_) + if (is_data_sharing_compatible_ || is_custom_payload_pool_) { // No-op } @@ -2036,6 +2043,11 @@ ReturnCode_t DataWriterImpl::check_datasharing_compatible( return ReturnCode_t::RETCODE_OK; break; case DataSharingKind::ON: + if (is_custom_payload_pool_) + { + EPROSIMA_LOG_ERROR(DATA_WRITER, "Custom payload pool detected. Cannot force Data sharing usage."); + return ReturnCode_t::RETCODE_INCONSISTENT_POLICY; + } #if HAVE_SECURITY if (has_security_enabled) { @@ -2061,6 +2073,11 @@ ReturnCode_t DataWriterImpl::check_datasharing_compatible( return ReturnCode_t::RETCODE_OK; break; case DataSharingKind::AUTO: + if (is_custom_payload_pool_) + { + EPROSIMA_LOG_INFO(DATA_WRITER, "Custom payload pool detected. Data Sharing disabled."); + return ReturnCode_t::RETCODE_OK; + } #if HAVE_SECURITY if (has_security_enabled) { diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.hpp b/src/cpp/fastdds/publisher/DataWriterImpl.hpp index 3399bfb7e35..3498b51740d 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.hpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.hpp @@ -108,7 +108,8 @@ class DataWriterImpl : protected rtps::IReaderDataFilter TypeSupport type, Topic* topic, const DataWriterQos& qos, - DataWriterListener* listener = nullptr); + DataWriterListener* listener = nullptr, + std::shared_ptr payload_pool = nullptr); DataWriterImpl( PublisherImpl* p, @@ -488,6 +489,8 @@ class DataWriterImpl : protected rtps::IReaderDataFilter std::shared_ptr payload_pool_; + bool is_custom_payload_pool_ = false; + std::unique_ptr loans_; fastrtps::rtps::GUID_t guid_; diff --git a/src/cpp/fastdds/publisher/Publisher.cpp b/src/cpp/fastdds/publisher/Publisher.cpp index 3aded76fc39..3f3787c0b89 100644 --- a/src/cpp/fastdds/publisher/Publisher.cpp +++ b/src/cpp/fastdds/publisher/Publisher.cpp @@ -114,7 +114,17 @@ DataWriter* Publisher::create_datawriter( DataWriterListener* listener, const StatusMask& mask) { - return impl_->create_datawriter(topic, qos, listener, mask); + return impl_->create_datawriter(topic, qos, listener, mask, nullptr); +} + +DataWriter* Publisher::create_datawriter_with_payload_pool( + Topic* topic, + const DataWriterQos& qos, + std::shared_ptr payload_pool, + DataWriterListener* listener, + const StatusMask& mask) +{ + return impl_->create_datawriter(topic, qos, listener, mask, payload_pool); } DataWriter* Publisher::create_datawriter_with_profile( @@ -123,7 +133,17 @@ DataWriter* Publisher::create_datawriter_with_profile( DataWriterListener* listener, const StatusMask& mask) { - return impl_->create_datawriter_with_profile(topic, profile_name, listener, mask); + return impl_->create_datawriter_with_profile(topic, profile_name, listener, mask, nullptr); +} + +DataWriter* Publisher::create_datawriter_with_profile_with_payload_pool( + Topic* topic, + const std::string& profile_name, + std::shared_ptr payload_pool, + DataWriterListener* listener, + const StatusMask& mask) +{ + return impl_->create_datawriter_with_profile(topic, profile_name, listener, mask, payload_pool); } ReturnCode_t Publisher::delete_datawriter( diff --git a/src/cpp/fastdds/publisher/PublisherImpl.cpp b/src/cpp/fastdds/publisher/PublisherImpl.cpp index d99335a204a..bcdd2b11993 100644 --- a/src/cpp/fastdds/publisher/PublisherImpl.cpp +++ b/src/cpp/fastdds/publisher/PublisherImpl.cpp @@ -206,16 +206,18 @@ DataWriterImpl* PublisherImpl::create_datawriter_impl( const TypeSupport& type, Topic* topic, const DataWriterQos& qos, - DataWriterListener* listener) + DataWriterListener* listener, + std::shared_ptr payload_pool) { - return new DataWriterImpl(this, type, topic, qos, listener); + return new DataWriterImpl(this, type, topic, qos, listener, payload_pool); } DataWriter* PublisherImpl::create_datawriter( Topic* topic, const DataWriterQos& qos, DataWriterListener* listener, - const StatusMask& mask) + const StatusMask& mask, + std::shared_ptr payload_pool) { EPROSIMA_LOG_INFO(PUBLISHER, "CREATING WRITER IN TOPIC: " << topic->get_name()); //Look for the correct type registration @@ -234,7 +236,7 @@ DataWriter* PublisherImpl::create_datawriter( return nullptr; } - DataWriterImpl* impl = create_datawriter_impl(type_support, topic, qos, listener); + DataWriterImpl* impl = create_datawriter_impl(type_support, topic, qos, listener, payload_pool); return create_datawriter(topic, impl, mask); } @@ -269,7 +271,8 @@ DataWriter* PublisherImpl::create_datawriter_with_profile( Topic* topic, const std::string& profile_name, DataWriterListener* listener, - const StatusMask& mask) + const StatusMask& mask, + std::shared_ptr payload_pool) { // TODO (ILG): Change when we have full XML support for DDS QoS profiles PublisherAttributes attr; @@ -277,7 +280,7 @@ DataWriter* PublisherImpl::create_datawriter_with_profile( { DataWriterQos qos = default_datawriter_qos_; utils::set_qos_from_attributes(qos, attr); - return create_datawriter(topic, qos, listener, mask); + return create_datawriter(topic, qos, listener, mask, payload_pool); } return nullptr; diff --git a/src/cpp/fastdds/publisher/PublisherImpl.hpp b/src/cpp/fastdds/publisher/PublisherImpl.hpp index 7561023e059..183413cff49 100644 --- a/src/cpp/fastdds/publisher/PublisherImpl.hpp +++ b/src/cpp/fastdds/publisher/PublisherImpl.hpp @@ -105,13 +105,15 @@ class PublisherImpl Topic* topic, const DataWriterQos& qos, DataWriterListener* listener, - const StatusMask& mask = StatusMask::all()); + const StatusMask& mask = StatusMask::all(), + std::shared_ptr payload_pool = nullptr); DataWriter* create_datawriter_with_profile( Topic* topic, const std::string& profile_name, DataWriterListener* listener, - const StatusMask& mask = StatusMask::all()); + const StatusMask& mask = StatusMask::all(), + std::shared_ptr payload_pool = nullptr); ReturnCode_t delete_datawriter( const DataWriter* writer); @@ -255,7 +257,8 @@ class PublisherImpl const TypeSupport& type, Topic* topic, const DataWriterQos& qos, - DataWriterListener* listener); + DataWriterListener* listener, + std::shared_ptr payload_pool); static void set_qos( PublisherQos& to, diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index e9aca014123..700de22f92b 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -102,7 +102,8 @@ DataReaderImpl::DataReaderImpl( const TypeSupport& type, TopicDescription* topic, const DataReaderQos& qos, - DataReaderListener* listener) + DataReaderListener* listener, + std::shared_ptr payload_pool) : subscriber_(s) , type_(type) , topic_(topic) @@ -124,6 +125,12 @@ DataReaderImpl::DataReaderImpl( RTPSParticipantImpl::preprocess_endpoint_attributes( EntityId_t::unknown(), subscriber_->get_participant_impl()->id_counter(), endpoint_attributes, guid_.entityId); guid_.guidPrefix = subscriber_->get_participant_impl()->guid().guidPrefix; + + if (payload_pool != nullptr) + { + is_custom_payload_pool_ = true; + payload_pool_ = payload_pool; + } } ReturnCode_t DataReaderImpl::enable() @@ -1724,13 +1731,17 @@ std::shared_ptr DataReaderImpl::get_payload_pool() PoolConfig config = PoolConfig::from_history_attributes(history_.m_att); - if (!payload_pool_) + if (!sample_pool_) { - payload_pool_ = TopicPayloadPoolRegistry::get(topic_->get_impl()->get_rtps_topic_name(), config); sample_pool_ = std::make_shared(config, type_); } - - payload_pool_->reserve_history(config, true); + if (!is_custom_payload_pool_) + { + std::shared_ptr topic_payload_pool = TopicPayloadPoolRegistry::get( + topic_->get_impl()->get_rtps_topic_name(), config); + topic_payload_pool->reserve_history(config, true); + payload_pool_ = topic_payload_pool; + } return payload_pool_; } @@ -1738,8 +1749,14 @@ void DataReaderImpl::release_payload_pool() { assert(payload_pool_); - PoolConfig config = PoolConfig::from_history_attributes(history_.m_att); - payload_pool_->release_history(config, true); + if (!is_custom_payload_pool_) + { + PoolConfig config = PoolConfig::from_history_attributes(history_.m_att); + std::shared_ptr topic_payload_pool = + std::dynamic_pointer_cast(payload_pool_); + topic_payload_pool->release_history(config, true); + } + payload_pool_.reset(); } diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index 7b261a69bbb..faed35ec4e0 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -102,7 +102,8 @@ class DataReaderImpl const TypeSupport& type, TopicDescription* topic, const DataReaderQos& qos, - DataReaderListener* listener = nullptr); + DataReaderListener* listener = nullptr, + std::shared_ptr payload_pool = nullptr); public: @@ -482,8 +483,10 @@ class DataReaderImpl DataReader* user_datareader_ = nullptr; - std::shared_ptr payload_pool_; std::shared_ptr sample_pool_; + std::shared_ptr payload_pool_; + + bool is_custom_payload_pool_ = false; detail::SampleInfoPool sample_info_pool_; detail::DataReaderLoanManager loan_manager_; diff --git a/src/cpp/fastdds/subscriber/Subscriber.cpp b/src/cpp/fastdds/subscriber/Subscriber.cpp index 6df62f9d5a4..96a71c1ca4a 100644 --- a/src/cpp/fastdds/subscriber/Subscriber.cpp +++ b/src/cpp/fastdds/subscriber/Subscriber.cpp @@ -108,7 +108,17 @@ DataReader* Subscriber::create_datareader( DataReaderListener* listener, const StatusMask& mask) { - return impl_->create_datareader(topic, reader_qos, listener, mask); + return impl_->create_datareader(topic, reader_qos, listener, mask, nullptr); +} + +DataReader* Subscriber::create_datareader_with_payload_pool( + TopicDescription* topic, + const DataReaderQos& reader_qos, + std::shared_ptr payload_pool, + DataReaderListener* listener, + const StatusMask& mask) +{ + return impl_->create_datareader(topic, reader_qos, listener, mask, payload_pool); } DataReader* Subscriber::create_datareader_with_profile( @@ -117,7 +127,17 @@ DataReader* Subscriber::create_datareader_with_profile( DataReaderListener* listener, const StatusMask& mask) { - return impl_->create_datareader_with_profile(topic, profile_name, listener, mask); + return impl_->create_datareader_with_profile(topic, profile_name, listener, mask, nullptr); +} + +DataReader* Subscriber::create_datareader_with_profile_with_payload_pool( + TopicDescription* topic, + const std::string& profile_name, + std::shared_ptr payload_pool, + DataReaderListener* listener, + const StatusMask& mask) +{ + return impl_->create_datareader_with_profile(topic, profile_name, listener, mask, payload_pool); } ReturnCode_t Subscriber::delete_datareader( diff --git a/src/cpp/fastdds/subscriber/SubscriberImpl.cpp b/src/cpp/fastdds/subscriber/SubscriberImpl.cpp index fa68b1f5b5c..999185c46f8 100644 --- a/src/cpp/fastdds/subscriber/SubscriberImpl.cpp +++ b/src/cpp/fastdds/subscriber/SubscriberImpl.cpp @@ -175,16 +175,18 @@ DataReaderImpl* SubscriberImpl::create_datareader_impl( const TypeSupport& type, TopicDescription* topic, const DataReaderQos& qos, - DataReaderListener* listener) + DataReaderListener* listener, + std::shared_ptr payload_pool) { - return new DataReaderImpl(this, type, topic, qos, listener); + return new DataReaderImpl(this, type, topic, qos, listener, payload_pool); } DataReader* SubscriberImpl::create_datareader( TopicDescription* topic, const DataReaderQos& qos, DataReaderListener* listener, - const StatusMask& mask) + const StatusMask& mask, + std::shared_ptr payload_pool) { EPROSIMA_LOG_INFO(SUBSCRIBER, "CREATING SUBSCRIBER IN TOPIC: " << topic->get_name()); //Look for the correct type registration @@ -205,7 +207,7 @@ DataReader* SubscriberImpl::create_datareader( topic->get_impl()->reference(); - DataReaderImpl* impl = create_datareader_impl(type_support, topic, qos, listener); + DataReaderImpl* impl = create_datareader_impl(type_support, topic, qos, listener, payload_pool); DataReader* reader = new DataReader(impl, mask); impl->user_datareader_ = reader; @@ -230,7 +232,8 @@ DataReader* SubscriberImpl::create_datareader_with_profile( TopicDescription* topic, const std::string& profile_name, DataReaderListener* listener, - const StatusMask& mask) + const StatusMask& mask, + std::shared_ptr payload_pool) { // TODO (ILG): Change when we have full XML support for DDS QoS profiles SubscriberAttributes attr; @@ -238,7 +241,7 @@ DataReader* SubscriberImpl::create_datareader_with_profile( { DataReaderQos qos = default_datareader_qos_; utils::set_qos_from_attributes(qos, attr); - return create_datareader(topic, qos, listener, mask); + return create_datareader(topic, qos, listener, mask, payload_pool); } return nullptr; diff --git a/src/cpp/fastdds/subscriber/SubscriberImpl.hpp b/src/cpp/fastdds/subscriber/SubscriberImpl.hpp index fb6ba2df590..7f8212fa811 100644 --- a/src/cpp/fastdds/subscriber/SubscriberImpl.hpp +++ b/src/cpp/fastdds/subscriber/SubscriberImpl.hpp @@ -97,13 +97,15 @@ class SubscriberImpl TopicDescription* topic, const DataReaderQos& reader_qos, DataReaderListener* listener = nullptr, - const StatusMask& mask = StatusMask::all()); + const StatusMask& mask = StatusMask::all(), + std::shared_ptr payload_pool = nullptr); DataReader* create_datareader_with_profile( TopicDescription* topic, const std::string& profile_name, DataReaderListener* listener, - const StatusMask& mask = StatusMask::all()); + const StatusMask& mask = StatusMask::all(), + std::shared_ptr payload_pool = nullptr); ReturnCode_t delete_datareader( const DataReader* reader); @@ -298,7 +300,8 @@ class SubscriberImpl const TypeSupport& type, TopicDescription* topic, const DataReaderQos& qos, - DataReaderListener* listener); + DataReaderListener* listener, + std::shared_ptr payload_pool); }; } /* namespace dds */ diff --git a/src/cpp/statistics/fastdds/publisher/DataWriterImpl.hpp b/src/cpp/statistics/fastdds/publisher/DataWriterImpl.hpp index 498c3554efe..b91f9855dcf 100644 --- a/src/cpp/statistics/fastdds/publisher/DataWriterImpl.hpp +++ b/src/cpp/statistics/fastdds/publisher/DataWriterImpl.hpp @@ -58,8 +58,9 @@ class DataWriterImpl : public efd::DataWriterImpl efd::Topic* topic, const efd::DataWriterQos& qos, efd::DataWriterListener* listener, + std::shared_ptr payload_pool, std::shared_ptr stat_listener) - : BaseType(p, type, topic, qos, listener) + : BaseType(p, type, topic, qos, listener, payload_pool) , statistics_listener_(stat_listener) { } diff --git a/src/cpp/statistics/fastdds/publisher/PublisherImpl.hpp b/src/cpp/statistics/fastdds/publisher/PublisherImpl.hpp index ca51992f742..72fca819ff2 100644 --- a/src/cpp/statistics/fastdds/publisher/PublisherImpl.hpp +++ b/src/cpp/statistics/fastdds/publisher/PublisherImpl.hpp @@ -62,9 +62,10 @@ class PublisherImpl : public efd::PublisherImpl const efd::TypeSupport& type, efd::Topic* topic, const efd::DataWriterQos& qos, - efd::DataWriterListener* listener) override + efd::DataWriterListener* listener, + std::shared_ptr payload_pool) override { - return new DataWriterImpl(this, type, topic, qos, listener, statistics_listener_); + return new DataWriterImpl(this, type, topic, qos, listener, payload_pool, statistics_listener_); } private: diff --git a/src/cpp/statistics/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/statistics/fastdds/subscriber/DataReaderImpl.hpp index 5a78523aa4a..0454c08bb50 100644 --- a/src/cpp/statistics/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/statistics/fastdds/subscriber/DataReaderImpl.hpp @@ -48,8 +48,9 @@ class DataReaderImpl : public efd::DataReaderImpl efd::TopicDescription* topic, const efd::DataReaderQos& qos, efd::DataReaderListener* listener, + std::shared_ptr payload_pool, std::shared_ptr stat_listener) - : BaseType(s, type, topic, qos, listener) + : BaseType(s, type, topic, qos, listener, payload_pool) , statistics_listener_(stat_listener) { } diff --git a/src/cpp/statistics/fastdds/subscriber/SubscriberImpl.hpp b/src/cpp/statistics/fastdds/subscriber/SubscriberImpl.hpp index db68fb09b7b..3cdc5fd1ba1 100644 --- a/src/cpp/statistics/fastdds/subscriber/SubscriberImpl.hpp +++ b/src/cpp/statistics/fastdds/subscriber/SubscriberImpl.hpp @@ -53,9 +53,10 @@ class SubscriberImpl : public efd::SubscriberImpl const efd::TypeSupport& type, efd::TopicDescription* topic, const efd::DataReaderQos& qos, - efd::DataReaderListener* listener) override + efd::DataReaderListener* listener, + std::shared_ptr payload_pool) override { - return new DataReaderImpl(this, type, topic, qos, listener, statistics_listener_); + return new DataReaderImpl(this, type, topic, qos, listener, payload_pool, statistics_listener_); } private: diff --git a/test/blackbox/api/dds-pim/CustomPayloadPool.hpp b/test/blackbox/api/dds-pim/CustomPayloadPool.hpp new file mode 100644 index 00000000000..e4f00f2aa32 --- /dev/null +++ b/test/blackbox/api/dds-pim/CustomPayloadPool.hpp @@ -0,0 +1,104 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomPayloadPool.hpp + */ + +#ifndef DDS_CUSTOM_PAYLOAD_POOL_HPP +#define DDS_CUSTOM_PAYLOAD_POOL_HPP + +#include +#include +#include +#include + +class CustomPayloadPool : public eprosima::fastrtps::rtps::IPayloadPool +{ +public: + + ~CustomPayloadPool() = default; + + bool get_payload( + unsigned int size, + eprosima::fastrtps::rtps::CacheChange_t& cache_change) + { + // Reserve new memory for the payload buffer + unsigned char* payload = new unsigned char[size]; + + // Assign the payload buffer to the CacheChange and update sizes + cache_change.serializedPayload.data = payload; + cache_change.serializedPayload.length = size; + cache_change.serializedPayload.max_size = size; + + // Tell the CacheChange who needs to release its payload + cache_change.payload_owner(this); + + ++requested_payload_count; + + return true; + } + + bool get_payload( + eprosima::fastrtps::rtps::SerializedPayload_t& data, + eprosima::fastrtps::rtps::IPayloadPool*& /*data_owner*/, + eprosima::fastrtps::rtps::CacheChange_t& cache_change) + { + // Reserve new memory for the payload buffer + unsigned char* payload = new unsigned char[data.length]; + + // Copy the data + memcpy(payload, data.data, data.length); + + // Assign the payload buffer to the CacheChange and update sizes + cache_change.serializedPayload.data = payload; + cache_change.serializedPayload.length = data.length; + cache_change.serializedPayload.max_size = data.length; + + // Tell the CacheChange who needs to release its payload + cache_change.payload_owner(this); + + ++requested_payload_count; + + return true; + } + + bool release_payload( + eprosima::fastrtps::rtps::CacheChange_t& cache_change) + { + // Ensure precondition + assert(this == cache_change.payload_owner()); + + // Dealloc the buffer of the payload + delete[] cache_change.serializedPayload.data; + + // Reset sizes and pointers + cache_change.serializedPayload.data = nullptr; + cache_change.serializedPayload.length = 0; + cache_change.serializedPayload.max_size = 0; + + // Reset the owner of the payload + cache_change.payload_owner(nullptr); + + ++returned_payload_count; + + return true; + } + + uint32_t requested_payload_count = 0; + uint32_t returned_payload_count = 0; + +}; + +#endif // DDS_CUSTOM_PAYLOAD_POOL_HPP diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index c4cd711ecd1..1510051e447 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -42,6 +42,7 @@ #include #include "BlackboxTests.hpp" +#include "../api/dds-pim/CustomPayloadPool.hpp" #include "../api/dds-pim/PubSubReader.hpp" #include "../api/dds-pim/PubSubWriter.hpp" #include "../api/dds-pim/PubSubWriterReader.hpp" @@ -696,6 +697,67 @@ TEST(DDSBasic, participant_ignore_local_endpoints_two_participants) EXPECT_EQ(reader.block_for_all(std::chrono::seconds(1)), 5); } +/** + * @test This test checks both the visibility of custom pool functions + * for DataReader and DataWriters while also testing their correct + * behavior + */ +TEST(DDSBasic, endpoint_custom_payload_pools) +{ + DomainParticipant* participant = + DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT); + ASSERT_NE(participant, nullptr); + + Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); + ASSERT_NE(subscriber, nullptr); + + Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT); + ASSERT_NE(publisher, nullptr); + + // Register type + TypeSupport type; + + type.reset(new StringTestPubSubType()); + type.register_type(participant); + ASSERT_NE(nullptr, type); + + type.register_type(participant); + + Topic* topic = participant->create_topic("footopic", type.get_type_name(), TOPIC_QOS_DEFAULT); + ASSERT_NE(topic, nullptr); + + // Next QoS config checks the default qos configuration, + // create_datareader() should not return nullptr. + DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; + + std::shared_ptr reader_payload_pool = std::make_shared(); + + std::shared_ptr writer_payload_pool = std::make_shared(); + + DataReader* data_reader = subscriber->create_datareader_with_payload_pool( + topic, reader_qos, reader_payload_pool, nullptr, StatusMask::all()); + + DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT; + + DataWriter* data_writer = publisher->create_datawriter_with_payload_pool( + topic, writer_qos, writer_payload_pool, nullptr, StatusMask::all()); + + ASSERT_NE(data_reader, nullptr); + ASSERT_NE(data_writer, nullptr); + + StringTest data; + data.message("Lorem Ipsum"); + + data_writer->write(&data, HANDLE_NIL); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + + ASSERT_EQ(reader_payload_pool->requested_payload_count, 1u); + ASSERT_EQ(writer_payload_pool->requested_payload_count, 1u); + + participant->delete_contained_entities(); +} + } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/test/unittest/common/CustomPayloadPool.hpp b/test/unittest/common/CustomPayloadPool.hpp new file mode 100644 index 00000000000..e4f00f2aa32 --- /dev/null +++ b/test/unittest/common/CustomPayloadPool.hpp @@ -0,0 +1,104 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomPayloadPool.hpp + */ + +#ifndef DDS_CUSTOM_PAYLOAD_POOL_HPP +#define DDS_CUSTOM_PAYLOAD_POOL_HPP + +#include +#include +#include +#include + +class CustomPayloadPool : public eprosima::fastrtps::rtps::IPayloadPool +{ +public: + + ~CustomPayloadPool() = default; + + bool get_payload( + unsigned int size, + eprosima::fastrtps::rtps::CacheChange_t& cache_change) + { + // Reserve new memory for the payload buffer + unsigned char* payload = new unsigned char[size]; + + // Assign the payload buffer to the CacheChange and update sizes + cache_change.serializedPayload.data = payload; + cache_change.serializedPayload.length = size; + cache_change.serializedPayload.max_size = size; + + // Tell the CacheChange who needs to release its payload + cache_change.payload_owner(this); + + ++requested_payload_count; + + return true; + } + + bool get_payload( + eprosima::fastrtps::rtps::SerializedPayload_t& data, + eprosima::fastrtps::rtps::IPayloadPool*& /*data_owner*/, + eprosima::fastrtps::rtps::CacheChange_t& cache_change) + { + // Reserve new memory for the payload buffer + unsigned char* payload = new unsigned char[data.length]; + + // Copy the data + memcpy(payload, data.data, data.length); + + // Assign the payload buffer to the CacheChange and update sizes + cache_change.serializedPayload.data = payload; + cache_change.serializedPayload.length = data.length; + cache_change.serializedPayload.max_size = data.length; + + // Tell the CacheChange who needs to release its payload + cache_change.payload_owner(this); + + ++requested_payload_count; + + return true; + } + + bool release_payload( + eprosima::fastrtps::rtps::CacheChange_t& cache_change) + { + // Ensure precondition + assert(this == cache_change.payload_owner()); + + // Dealloc the buffer of the payload + delete[] cache_change.serializedPayload.data; + + // Reset sizes and pointers + cache_change.serializedPayload.data = nullptr; + cache_change.serializedPayload.length = 0; + cache_change.serializedPayload.max_size = 0; + + // Reset the owner of the payload + cache_change.payload_owner(nullptr); + + ++returned_payload_count; + + return true; + } + + uint32_t requested_payload_count = 0; + uint32_t returned_payload_count = 0; + +}; + +#endif // DDS_CUSTOM_PAYLOAD_POOL_HPP diff --git a/test/unittest/dds/publisher/DataWriterTests.cpp b/test/unittest/dds/publisher/DataWriterTests.cpp index 49cb6e5b238..b5967b7b756 100644 --- a/test/unittest/dds/publisher/DataWriterTests.cpp +++ b/test/unittest/dds/publisher/DataWriterTests.cpp @@ -43,6 +43,10 @@ #include #include "../../logging/mock/MockConsumer.h" +#include "../../common/CustomPayloadPool.hpp" + +#include +#include namespace eprosima { namespace fastdds { @@ -1943,6 +1947,56 @@ TEST(DataWriterTests, InstancePolicyAllocationConsistencyKeyed) ASSERT_EQ(ReturnCode_t::RETCODE_OK, default_data_writer1->set_qos(qos2)); } + +/* + * This test checks the proper behavior of the custom payload pool DataReader overload. + */ +TEST(DataWriterTests, CustomPoolCreation) +{ + DomainParticipant* participant = + DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT); + ASSERT_NE(participant, nullptr); + + Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); + ASSERT_NE(subscriber, nullptr); + + Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT); + ASSERT_NE(publisher, nullptr); + + TypeSupport type(new TopicDataTypeMock()); + type.register_type(participant); + + Topic* topic = participant->create_topic("footopic", type.get_type_name(), TOPIC_QOS_DEFAULT); + ASSERT_NE(topic, nullptr); + + // Next QoS config checks the default qos configuration, + // create_datareader() should not return nullptr. + DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; + + std::shared_ptr payload_pool = std::make_shared(); + + DataReader* data_reader = subscriber->create_datareader(topic, reader_qos); + + DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT; + + DataWriter* data_writer = + publisher->create_datawriter_with_payload_pool( + topic, writer_qos, payload_pool, nullptr, StatusMask::all()); + + ASSERT_NE(data_writer, nullptr); + ASSERT_NE(data_reader, nullptr); + + FooType data; + + data_writer->write(&data, HANDLE_NIL); + + ASSERT_EQ(payload_pool->requested_payload_count, 1u); + + participant->delete_contained_entities(); + + DomainParticipantFactory::get_instance()->delete_participant(participant); +} + } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/test/unittest/dds/subscriber/CMakeLists.txt b/test/unittest/dds/subscriber/CMakeLists.txt index c4a6ded9863..a718389a355 100644 --- a/test/unittest/dds/subscriber/CMakeLists.txt +++ b/test/unittest/dds/subscriber/CMakeLists.txt @@ -69,12 +69,16 @@ add_gtest(SubscriberTests SOURCES ${SUBSCRIBERTESTS_SOURCE}) add_executable(DataReaderTests ${DATAREADERTESTS_SOURCE}) target_compile_definitions(DataReaderTests PRIVATE + BOOST_ASIO_STANDALONE + ASIO_STANDALONE + ASIO_DISABLE_VISIBILITY $<$>,$>:__DEBUG> $<$:__INTERNALDEBUG> # Internal debug activated. ) target_include_directories(DataReaderTests PRIVATE ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include ${PROJECT_SOURCE_DIR}/src/cpp + ${Asio_INCLUDE_DIR} ) target_link_libraries(DataReaderTests fastrtps fastcdr foonathan_memory GTest::gmock diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp index 27ad50e268d..b14da29cb01 100644 --- a/test/unittest/dds/subscriber/DataReaderTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderTests.cpp @@ -17,6 +17,8 @@ #include #include #include +#include +#include #include #include @@ -67,6 +69,21 @@ #include #include +#include "../../common/CustomPayloadPool.hpp" +#include "fastdds/dds/common/InstanceHandle.hpp" +#include "fastdds/dds/core/policy/QosPolicies.hpp" + +#include + +#if defined(__cplusplus_winrt) +#define GET_PID GetCurrentProcessId +#elif defined(_WIN32) +#include +#define GET_PID _getpid +#else +#define GET_PID getpid +#endif // if defined(_WIN32) + using namespace eprosima::fastdds::dds; using namespace eprosima::fastrtps::rtps; using namespace eprosima::fastdds::rtps; @@ -3465,6 +3482,58 @@ TEST_F(DataReaderTests, InstancePolicyAllocationConsistencyKeyed) ASSERT_EQ(ReturnCode_t::RETCODE_OK, default_data_reader2->set_qos(qos2)); } +/* + * This test checks the proper behavior of the custom payload pool DataReader overload. + */ +TEST_F(DataReaderTests, CustomPoolCreation) +{ + DomainParticipant* participant = + DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT); + ASSERT_NE(participant, nullptr); + + Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); + ASSERT_NE(subscriber, nullptr); + + Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT); + ASSERT_NE(publisher, nullptr); + + TypeSupport type(new FooTypeSupport()); + type.register_type(participant); + + Topic* topic = participant->create_topic("footopic", type.get_type_name(), TOPIC_QOS_DEFAULT); + ASSERT_NE(topic, nullptr); + + // Next QoS config checks the default qos configuration, + // create_datareader() should not return nullptr. + DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; + + std::shared_ptr payload_pool = std::make_shared(); + + DataReader* data_reader = + subscriber->create_datareader_with_payload_pool( + topic, reader_qos, payload_pool, nullptr, StatusMask::all()); + + DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT; + writer_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; + + DataWriter* data_writer = publisher->create_datawriter(topic, writer_qos); + + FooType data; + data.index(0); + data.message()[0] = '\0'; + data.message()[1] = '\0'; + + data_writer->write(&data, HANDLE_NIL); + + ASSERT_EQ(payload_pool->requested_payload_count, 1u); + + ASSERT_NE(data_reader, nullptr); + + participant->delete_contained_entities(); + + DomainParticipantFactory::get_instance()->delete_participant(participant); +} + int main( int argc, char** argv) diff --git a/test/unittest/statistics/dds/StatisticsDomainParticipantMockTests/mock/statistics/fastdds/publisher/PublisherImpl.hpp b/test/unittest/statistics/dds/StatisticsDomainParticipantMockTests/mock/statistics/fastdds/publisher/PublisherImpl.hpp index 69479797f44..90cfd731f66 100644 --- a/test/unittest/statistics/dds/StatisticsDomainParticipantMockTests/mock/statistics/fastdds/publisher/PublisherImpl.hpp +++ b/test/unittest/statistics/dds/StatisticsDomainParticipantMockTests/mock/statistics/fastdds/publisher/PublisherImpl.hpp @@ -75,9 +75,10 @@ class PublisherImpl : public efd::PublisherImpl const efd::TypeSupport& type, efd::Topic* topic, const efd::DataWriterQos& qos, - efd::DataWriterListener* listener) override + efd::DataWriterListener* listener, + std::shared_ptr payload_pool) override { - return new DataWriterImpl(this, type, topic, qos, listener, statistics_listener_); + return new DataWriterImpl(this, type, topic, qos, listener, payload_pool, statistics_listener_); } efd::DataWriter* create_datawriter(