diff --git a/include/fastdds/rtps/builtin/data/BuiltinEndpoints.hpp b/include/fastdds/rtps/builtin/data/BuiltinEndpoints.hpp new file mode 100644 index 00000000000..4c9408e79e1 --- /dev/null +++ b/include/fastdds/rtps/builtin/data/BuiltinEndpoints.hpp @@ -0,0 +1,47 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file BuiltinEndpoints.hpp + */ + +#ifndef FASTDDS_RTPS_BUILTIN_DATA__BUILTINENDPOINTS_HPP +#define FASTDDS_RTPS_BUILTIN_DATA__BUILTINENDPOINTS_HPP + +#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER (0x00000001 << 0) +#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR (0x00000001 << 1) +#define DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER (0x00000001 << 2) +#define DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR (0x00000001 << 3) +#define DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER (0x00000001 << 4) +#define DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR (0x00000001 << 5) +#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_PROXY_ANNOUNCER (0x00000001 << 6) +#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_PROXY_DETECTOR (0x00000001 << 7) +#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_STATE_ANNOUNCER (0x00000001 << 8) +#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_STATE_DETECTOR (0x00000001 << 9) +#define BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER (0x00000001 << 10) +#define BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER (0x00000001 << 11) +#define BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REQUEST_DATA_WRITER (0x00000001 << 12) +#define BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REQUEST_DATA_READER (0x00000001 << 13) +#define BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REPLY_DATA_WRITER (0x00000001 << 14) +#define BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REPLY_DATA_READER (0x00000001 << 15) +#define DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_ANNOUNCER (0x00000001 << 16) +#define DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_DETECTOR (0x00000001 << 17) +#define DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_ANNOUNCER (0x00000001 << 18) +#define DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_DETECTOR (0x00000001 << 19) +#define BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DATA_WRITER (0x00000001 << 20) +#define BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DATA_READER (0x00000001 << 21) +#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_ANNOUNCER (0x00000001 << 26) +#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR (0x00000001 << 27) + +#endif // FASTDDS_RTPS_BUILTIN_DATA__BUILTINENDPOINTS_HPP diff --git a/include/fastdds/rtps/builtin/data/ParticipantProxyData.h b/include/fastdds/rtps/builtin/data/ParticipantProxyData.h index cee9574049f..5602944ed2d 100644 --- a/include/fastdds/rtps/builtin/data/ParticipantProxyData.h +++ b/include/fastdds/rtps/builtin/data/ParticipantProxyData.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -38,31 +39,6 @@ #define BUILTIN_PARTICIPANT_DATA_MAX_SIZE 100 #define TYPELOOKUP_DATA_MAX_SIZE 5000 -#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER (0x00000001 << 0) -#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR (0x00000001 << 1) -#define DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER (0x00000001 << 2) -#define DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR (0x00000001 << 3) -#define DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER (0x00000001 << 4) -#define DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR (0x00000001 << 5) -#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_PROXY_ANNOUNCER (0x00000001 << 6) -#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_PROXY_DETECTOR (0x00000001 << 7) -#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_STATE_ANNOUNCER (0x00000001 << 8) -#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_STATE_DETECTOR (0x00000001 << 9) -#define BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER (0x00000001 << 10) -#define BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER (0x00000001 << 11) -#define BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REQUEST_DATA_WRITER (0x00000001 << 12) -#define BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REQUEST_DATA_READER (0x00000001 << 13) -#define BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REPLY_DATA_WRITER (0x00000001 << 14) -#define BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REPLY_DATA_READER (0x00000001 << 15) -#define DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_ANNOUNCER (0x00000001 << 16) -#define DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_DETECTOR (0x00000001 << 17) -#define DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_ANNOUNCER (0x00000001 << 18) -#define DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_DETECTOR (0x00000001 << 19) -#define BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DATA_WRITER (0x00000001 << 20) -#define BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DATA_READER (0x00000001 << 21) -#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_ANNOUNCER (0x00000001 << 26) -#define DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR (0x00000001 << 27) - namespace eprosima { namespace fastrtps { namespace rtps { diff --git a/include/fastdds/rtps/builtin/discovery/participant/PDP.h b/include/fastdds/rtps/builtin/discovery/participant/PDP.h index fffc0c1979a..ee384b4d317 100644 --- a/include/fastdds/rtps/builtin/discovery/participant/PDP.h +++ b/include/fastdds/rtps/builtin/discovery/participant/PDP.h @@ -22,8 +22,9 @@ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #include -#include #include +#include +#include #include #include @@ -40,6 +41,7 @@ namespace fastdds { namespace rtps { class PDPServerListener; +class PDPEndpoints; } // namespace rtps } // namespace fastdds @@ -128,7 +130,7 @@ class PDP virtual void announceParticipantState( bool new_change, bool dispose = false, - WriteParams& wparams = WriteParams::WRITE_PARAM_DEFAULT); + WriteParams& wparams = WriteParams::WRITE_PARAM_DEFAULT) = 0; //!Stop the RTPSParticipantAnnouncement (only used in tests). virtual void stopParticipantAnnouncement(); @@ -384,10 +386,8 @@ class PDP RTPSParticipantImpl* mp_RTPSParticipant; //!Discovery attributes. BuiltinAttributes m_discovery; - //!Pointer to the PDPWriter. - RTPSWriter* mp_PDPWriter; - //!Pointer to the PDPReader. - RTPSReader* mp_PDPReader; + //!Builtin PDP endpoints + std::unique_ptr builtin_endpoints_; //!Pointer to the EDP object. EDP* mp_EDP; //!Number of participant proxy data objects created @@ -408,14 +408,6 @@ class PDP std::atomic_bool m_hasChangedLocalPDP; //!Listener for the SPDP messages. ReaderListener* mp_listener; - //!WriterHistory - WriterHistory* mp_PDPWriterHistory; - //!Writer payload pool - std::shared_ptr writer_payload_pool_; - //!Reader History - ReaderHistory* mp_PDPReaderHistory; - //!Reader payload pool - std::shared_ptr reader_payload_pool_; //! ProxyPool for temporary reader proxies ProxyPool temp_reader_proxies_; //! ProxyPool for temporary writer proxies @@ -454,6 +446,26 @@ class PDP const GUID_t& participant_guid, InstanceHandle_t& key); + /** + * Force the sending of our local DPD to all remote RTPSParticipants and multicast Locators. + * @param writer RTPSWriter to use for sending the announcement + * @param history history where the change should be added + * @param new_change If true a new change (with new seqNum) is created and sent;If false the last change is re-sent + * @param dispose sets change kind to NOT_ALIVE_DISPOSED_UNREGISTERED + * @param wparams allows to identify the change + */ + void announceParticipantState( + RTPSWriter& writer, + WriterHistory& history, + bool new_change, + bool dispose = false, + WriteParams& wparams = WriteParams::WRITE_PARAM_DEFAULT); + + /** + * Called after creating the builtin endpoints to update the metatraffic unicast locators of BuiltinProtocols + */ + virtual void update_builtin_locators() = 0; + private: //!TimedEvent to periodically resend the local RTPSParticipant information. diff --git a/include/fastdds/rtps/builtin/discovery/participant/PDPSimple.h b/include/fastdds/rtps/builtin/discovery/participant/PDPSimple.h index e7fc9065875..40ae3fcbe54 100644 --- a/include/fastdds/rtps/builtin/discovery/participant/PDPSimple.h +++ b/include/fastdds/rtps/builtin/discovery/participant/PDPSimple.h @@ -54,7 +54,8 @@ class PDPSimple : public PDP * @param part Pointer to the RTPSParticipant. * @return True on success */ - bool init(RTPSParticipantImpl* part) override; + bool init( + RTPSParticipantImpl* part) override; /** * Creates an initializes a new participant proxy from a DATA(p) raw info @@ -63,8 +64,8 @@ class PDPSimple : public PDP * @return new ParticipantProxyData * or nullptr on failure */ ParticipantProxyData* createParticipantProxyData( - const ParticipantProxyData& p, - const GUID_t& writer_guid) override; + const ParticipantProxyData& p, + const GUID_t& writer_guid) override; /** * Some PDP classes require EDP matching with update PDP DATAs like EDPStatic @@ -79,28 +80,31 @@ class PDPSimple : public PDP * @param wparams allows to identify the change */ void announceParticipantState( - bool new_change, - bool dispose = false, - WriteParams& wparams = WriteParams::WRITE_PARAM_DEFAULT) override; + bool new_change, + bool dispose = false, + WriteParams& wparams = WriteParams::WRITE_PARAM_DEFAULT) override; /** * This method assigns remote endpoints to the builtin endpoints defined in this protocol. It also calls * the corresponding methods in EDP and WLP. * @param pdata Pointer to the ParticipantProxyData object. */ - void assignRemoteEndpoints(ParticipantProxyData* pdata) override; + void assignRemoteEndpoints( + ParticipantProxyData* pdata) override; /** * Remove remote endpoints from the participant discovery protocol * @param pdata Pointer to the ParticipantProxyData to remove */ - void removeRemoteEndpoints(ParticipantProxyData * pdata) override; + void removeRemoteEndpoints( + ParticipantProxyData* pdata) override; /** * This method notifies EDP and WLP of the existence of a new participant. * @param pdata */ - void notifyAboveRemoteEndpoints(const ParticipantProxyData& pdata) override; + void notifyAboveRemoteEndpoints( + const ParticipantProxyData& pdata) override; /** * Activate a new Remote Endpoint that has been statically discovered. @@ -113,10 +117,12 @@ class PDPSimple : public PDP int16_t userDefinedId, EndpointKind_t kind); + void update_builtin_locators() override; private: - void initializeParticipantProxyData(ParticipantProxyData* participant_data) override; + void initializeParticipantProxyData( + ParticipantProxyData* participant_data) override; /** * Create the SPDP Writer and Reader @@ -130,5 +136,5 @@ class PDPSimple : public PDP } /* namespace fastrtps */ } /* namespace eprosima */ -#endif +#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #endif //_FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_PDPSIMPLE_H_ diff --git a/src/cpp/rtps/builtin/BuiltinReader.hpp b/src/cpp/rtps/builtin/BuiltinReader.hpp new file mode 100644 index 00000000000..38bf295946b --- /dev/null +++ b/src/cpp/rtps/builtin/BuiltinReader.hpp @@ -0,0 +1,84 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file BuiltinReader.hpp + */ + +#ifndef RTPS_BUILTIN__BUILTINREADER_HPP_ +#define RTPS_BUILTIN__BUILTINREADER_HPP_ + +#include + +#include + +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/** + * Keeps data of a builtin reader + */ +template +struct BuiltinReader +{ + ~BuiltinReader() + { + release(); + } + + void release() + { + if (history_) + { + auto cfg = fastrtps::rtps::PoolConfig::from_history_attributes(history_->m_att); + history_.reset(); + if (payload_pool_) + { + payload_pool_->release_history(cfg, true); + } + } + } + + void remove_from_history( + const fastrtps::rtps::InstanceHandle_t& key) + { + history_->getMutex()->lock(); + for (auto it = history_->changesBegin(); it != history_->changesEnd(); ++it) + { + if ((*it)->instanceHandle == key) + { + history_->remove_change(*it); + break; + } + } + history_->getMutex()->unlock(); + } + + //! Payload pool for the topic + std::shared_ptr payload_pool_; + //! History for the builtin reader + std::unique_ptr history_; + //! Builtin RTPS reader + TReader* reader_ = nullptr; +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // RTPS_BUILTIN__BUILTINREADER_HPP_ diff --git a/src/cpp/rtps/builtin/BuiltinWriter.hpp b/src/cpp/rtps/builtin/BuiltinWriter.hpp new file mode 100644 index 00000000000..36d55f08ab4 --- /dev/null +++ b/src/cpp/rtps/builtin/BuiltinWriter.hpp @@ -0,0 +1,69 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file BuiltinWriter.hpp + */ + +#ifndef RTPS_BUILTIN__BUILTINWRITER_HPP_ +#define RTPS_BUILTIN__BUILTINWRITER_HPP_ + +#include + +#include + +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/** + * Keeps data of a builtin writer + */ +template +struct BuiltinWriter +{ + ~BuiltinWriter() + { + release(); + } + + void release() + { + if (history_) + { + auto cfg = fastrtps::rtps::PoolConfig::from_history_attributes(history_->m_att); + history_.reset(); + if (payload_pool_) + { + payload_pool_->release_history(cfg, false); + } + } + } + + //! Payload pool for the topic + std::shared_ptr payload_pool_; + //! History for the builtin writer + std::unique_ptr history_; + //! Builtin RTPS writer + TWriter* writer_ = nullptr; +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // RTPS_BUILTIN__BUILTINWRITER_HPP_ diff --git a/src/cpp/rtps/builtin/discovery/endpoint/EDPServer.cpp b/src/cpp/rtps/builtin/discovery/endpoint/EDPServer.cpp index 4ab23014b7b..b0bf6770acc 100644 --- a/src/cpp/rtps/builtin/discovery/endpoint/EDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDPServer.cpp @@ -267,7 +267,7 @@ bool EDPServer::removeLocalReader( else { // If the database doesn't take the ownership, then return the CacheChante_t to the pool. - get_pdp()->mp_PDPWriter->release_change(change); + get_pdp()->release_change_from_writer(change); } return true; } @@ -327,7 +327,7 @@ bool EDPServer::removeLocalWriter( else { // If the database doesn't take the ownership, then return the CacheChante_t to the pool. - get_pdp()->mp_PDPWriter->release_change(change); + get_pdp()->release_change_from_writer(change); } return true; } @@ -374,14 +374,14 @@ bool EDPServer::processLocalWriterProxyData( else { // If the database doesn't take the ownership, then return the CacheChante_t to the pool. - get_pdp()->mp_PDPWriter->release_change(change); + get_pdp()->release_change_from_writer(change); } // Return whether the DATA(w) was generated correctly return ret_val; } // Return the change to the pool and return false - get_pdp()->mp_PDPWriter->release_change(change); + get_pdp()->release_change_from_writer(change); return false; } @@ -424,14 +424,14 @@ bool EDPServer::processLocalReaderProxyData( else { // If the database doesn't take the ownership, then return the CacheChante_t to the pool. - get_pdp()->mp_PDPWriter->release_change(change); + get_pdp()->release_change_from_writer(change); } // Return whether the DATA(w) was generated correctly return ret_val; } // Return the change to the pool and return false - get_pdp()->mp_PDPWriter->release_change(change); + get_pdp()->release_change_from_writer(change); return false; } diff --git a/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpoints.hpp b/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpoints.hpp new file mode 100644 index 00000000000..2cf2e5dd63a --- /dev/null +++ b/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpoints.hpp @@ -0,0 +1,91 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DiscoveryServerPDPEndpoints.hpp + */ + +#ifndef FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_DS__DISCOVERYSERVERPDPENDPOINTS_HPP_ +#define FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_DS__DISCOVERYSERVERPDPENDPOINTS_HPP_ + +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/** + * Container for the builtin endpoints of non-secure PDPClient and PDPServer + */ +struct DiscoveryServerPDPEndpoints : public PDPEndpoints +{ + ~DiscoveryServerPDPEndpoints() override = default; + + fastrtps::rtps::BuiltinEndpointSet_t builtin_endpoints() const override + { + return DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER | DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; + } + + bool enable_pdp_readers( + fastrtps::rtps::RTPSParticipantImpl* participant) override + { + return participant->enableReader(reader.reader_); + } + + void disable_pdp_readers( + fastrtps::rtps::RTPSParticipantImpl* participant) override + { + participant->disableReader(reader.reader_); + } + + void delete_pdp_endpoints( + fastrtps::rtps::RTPSParticipantImpl* participant) override + { + participant->deleteUserEndpoint(writer.writer_->getGuid()); + participant->deleteUserEndpoint(reader.reader_->getGuid()); + } + + void remove_from_pdp_reader_history( + const fastrtps::rtps::InstanceHandle_t& remote_participant) override + { + reader.remove_from_history(remote_participant); + } + + void remove_from_pdp_reader_history( + fastrtps::rtps::CacheChange_t* change) override + { + reader.history_->remove_change(change); + } + + //! Builtin Simple PDP reader + BuiltinReader reader; + + //! Builtin Simple PDP writer + BuiltinWriter writer; +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_DS__DISCOVERYSERVERPDPENDPOINTS_HPP_ diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp index 34343e12964..49bd8f7ffff 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp @@ -57,6 +57,7 @@ #include +#include #include #include @@ -83,8 +84,6 @@ PDP::PDP ( const RTPSParticipantAllocationAttributes& allocation) : mp_builtin(built) , mp_RTPSParticipant(nullptr) - , mp_PDPWriter(nullptr) - , mp_PDPReader(nullptr) , mp_EDP(nullptr) , participant_proxies_number_(allocation.participants.initial) , participant_proxies_(allocation.participants) @@ -95,8 +94,6 @@ PDP::PDP ( , writer_proxies_pool_(allocation.total_writers()) , m_hasChangedLocalPDP(true) , mp_listener(nullptr) - , mp_PDPWriterHistory(nullptr) - , mp_PDPReaderHistory(nullptr) , temp_reader_proxies_({ allocation.locators.max_unicast_locators, allocation.locators.max_multicast_locators, @@ -133,32 +130,16 @@ PDP::PDP ( PDP::~PDP() { delete resend_participant_info_event_; - mp_RTPSParticipant->disableReader(mp_PDPReader); - delete mp_EDP; - mp_RTPSParticipant->deleteUserEndpoint(mp_PDPWriter->getGuid()); - mp_RTPSParticipant->deleteUserEndpoint(mp_PDPReader->getGuid()); - if (mp_PDPWriterHistory) - { - PoolConfig cfg = PoolConfig::from_history_attributes(mp_PDPWriterHistory->m_att); - delete mp_PDPWriterHistory; - if (writer_payload_pool_) - { - writer_payload_pool_->release_history(cfg, false); - } - } + builtin_endpoints_->disable_pdp_readers(mp_RTPSParticipant); - if (mp_PDPReaderHistory) - { - PoolConfig cfg = PoolConfig::from_history_attributes(mp_PDPReaderHistory->m_att); - delete mp_PDPReaderHistory; - if (reader_payload_pool_) - { - reader_payload_pool_->release_history(cfg, true); - } - } + delete mp_EDP; + + builtin_endpoints_->delete_pdp_endpoints(mp_RTPSParticipant); + builtin_endpoints_.reset(); delete mp_listener; + mp_listener = nullptr; for (ParticipantProxyData* it : participant_proxies_) { @@ -248,13 +229,9 @@ void PDP::initializeParticipantProxyData( //set_VendorId_eProsima(participant_data->m_VendorId); participant_data->m_VendorId = c_VendorId_eProsima; - participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER; - participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; + // TODO: participant_data->m_availableBuiltinEndpoints |= mp_builtin->available_builtin_endpoints(); -#if HAVE_SECURITY - participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_ANNOUNCER; - participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR; -#endif // if HAVE_SECURITY + participant_data->m_availableBuiltinEndpoints |= builtin_endpoints_->builtin_endpoints(); if (attributes.builtin.use_WriterLivelinessProtocol) { @@ -262,8 +239,11 @@ void PDP::initializeParticipantProxyData( participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER; #if HAVE_SECURITY - participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DATA_WRITER; - participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DATA_READER; + if (mp_RTPSParticipant->is_secure()) + { + participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DATA_WRITER; + participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DATA_READER; + } #endif // if HAVE_SECURITY } @@ -280,7 +260,10 @@ void PDP::initializeParticipantProxyData( } #if HAVE_SECURITY - participant_data->m_availableBuiltinEndpoints |= mp_RTPSParticipant->security_manager().builtin_endpoints(); + if (mp_RTPSParticipant->is_secure()) + { + participant_data->m_availableBuiltinEndpoints |= mp_RTPSParticipant->security_manager().builtin_endpoints(); + } #endif // if HAVE_SECURITY if (announce_locators) @@ -350,23 +333,23 @@ void PDP::initializeParticipantProxyData( participant_data->m_userData = attributes.userData; #if HAVE_SECURITY - IdentityToken* identity_token = nullptr; - if (mp_RTPSParticipant->security_manager().get_identity_token(&identity_token) && identity_token != nullptr) + if (mp_RTPSParticipant->is_secure()) { - participant_data->identity_token_ = std::move(*identity_token); - mp_RTPSParticipant->security_manager().return_identity_token(identity_token); - } + IdentityToken* identity_token = nullptr; + if (mp_RTPSParticipant->security_manager().get_identity_token(&identity_token) && identity_token != nullptr) + { + participant_data->identity_token_ = std::move(*identity_token); + mp_RTPSParticipant->security_manager().return_identity_token(identity_token); + } - PermissionsToken* permissions_token = nullptr; - if (mp_RTPSParticipant->security_manager().get_permissions_token(&permissions_token) - && permissions_token != nullptr) - { - participant_data->permissions_token_ = std::move(*permissions_token); - mp_RTPSParticipant->security_manager().return_permissions_token(permissions_token); - } + PermissionsToken* permissions_token = nullptr; + if (mp_RTPSParticipant->security_manager().get_permissions_token(&permissions_token) + && permissions_token != nullptr) + { + participant_data->permissions_token_ = std::move(*permissions_token); + mp_RTPSParticipant->security_manager().return_permissions_token(permissions_token); + } - if (mp_RTPSParticipant->is_secure()) - { const security::ParticipantSecurityAttributes& sec_attrs = mp_RTPSParticipant->security_attributes(); participant_data->security_attributes_ = sec_attrs.mask(); participant_data->plugin_security_attributes_ = sec_attrs.plugin_participant_attributes; @@ -395,7 +378,7 @@ bool PDP::initPDP( return false; } //UPDATE METATRAFFIC. - mp_builtin->updateMetatrafficLocators(this->mp_PDPReader->getAttributes().unicastLocatorList); + update_builtin_locators(); mp_mutex->lock(); ParticipantProxyData* pdata = add_participant_proxy_data(mp_RTPSParticipant->getGuid(), false, nullptr); @@ -445,10 +428,12 @@ bool PDP::enable() getRTPSParticipant()->on_entity_discovery(mp_RTPSParticipant->getGuid(), get_participant_proxy_data(mp_RTPSParticipant->getGuid().guidPrefix)->m_properties); - return mp_RTPSParticipant->enableReader(mp_PDPReader); + return builtin_endpoints_->enable_pdp_readers(mp_RTPSParticipant); } void PDP::announceParticipantState( + RTPSWriter& writer, + WriterHistory& history, bool new_change, bool dispose, WriteParams& wparams) @@ -468,12 +453,12 @@ void PDP::announceParticipantState( ParticipantProxyData proxy_data_copy(*local_participant_data); this->mp_mutex->unlock(); - if (mp_PDPWriterHistory->getHistorySize() > 0) + if (history.getHistorySize() > 0) { - mp_PDPWriterHistory->remove_min_change(); + history.remove_min_change(); } uint32_t cdr_size = proxy_data_copy.get_serialized_size(true); - change = mp_PDPWriter->new_change( + change = writer.new_change( [cdr_size]() -> uint32_t { return cdr_size; @@ -496,7 +481,7 @@ void PDP::announceParticipantState( { change->serializedPayload.length = (uint16_t)aux_msg.length; - mp_PDPWriterHistory->add_change(change, wparams); + history.add_change(change, wparams); } else { @@ -512,12 +497,12 @@ void PDP::announceParticipantState( ParticipantProxyData proxy_data_copy(*getLocalParticipantProxyData()); this->mp_mutex->unlock(); - if (mp_PDPWriterHistory->getHistorySize() > 0) + if (history.getHistorySize() > 0) { - mp_PDPWriterHistory->remove_min_change(); + history.remove_min_change(); } uint32_t cdr_size = proxy_data_copy.get_serialized_size(true); - change = mp_PDPWriter->new_change([cdr_size]() -> uint32_t + change = writer.new_change([cdr_size]() -> uint32_t { return cdr_size; }, @@ -539,7 +524,7 @@ void PDP::announceParticipantState( { change->serializedPayload.length = (uint16_t)aux_msg.length; - mp_PDPWriterHistory->add_change(change, wparams); + history.add_change(change, wparams); } else { @@ -548,7 +533,6 @@ void PDP::announceParticipantState( } } } - } void PDP::stopParticipantAnnouncement() @@ -1020,17 +1004,7 @@ bool PDP::remove_remote_participant( mp_builtin->mp_participantImpl->security_manager().remove_participant(*pdata); #endif // if HAVE_SECURITY - this->mp_PDPReaderHistory->getMutex()->lock(); - for (std::vector::iterator it = this->mp_PDPReaderHistory->changesBegin(); - it != this->mp_PDPReaderHistory->changesEnd(); ++it) - { - if ((*it)->instanceHandle == pdata->m_key) - { - this->mp_PDPReaderHistory->remove_change(*it); - break; - } - } - this->mp_PDPReaderHistory->getMutex()->unlock(); + builtin_endpoints_->remove_from_pdp_reader_history(pdata->m_key); auto listener = mp_RTPSParticipant->getListener(); if (listener != nullptr) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 13d0fed407d..48bfd49dfae 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -188,17 +189,26 @@ ParticipantProxyData* PDPClient::createParticipantProxyData( return pdata; } +void PDPClient::update_builtin_locators() +{ + auto endpoints = static_cast(builtin_endpoints_.get()); + mp_builtin->updateMetatrafficLocators(endpoints->reader.reader_->getAttributes().unicastLocatorList); +} + bool PDPClient::createPDPEndpoints() { EPROSIMA_LOG_INFO(RTPS_PDP, "Beginning PDPClient Endpoints creation"); const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes(); + auto endpoints = new fastdds::rtps::DiscoveryServerPDPEndpoints(); + builtin_endpoints_.reset(endpoints); + HistoryAttributes hatt; hatt.payloadMaxSize = mp_builtin->m_att.readerPayloadSize; hatt.initialReservedCaches = pdp_initial_reserved_caches; hatt.memoryPolicy = mp_builtin->m_att.readerHistoryMemoryPolicy; - mp_PDPReaderHistory = new ReaderHistory(hatt); + endpoints->reader.history_.reset(new ReaderHistory(hatt)); ReaderAttributes ratt; ratt.expectsInlineQos = false; @@ -214,12 +224,16 @@ bool PDPClient::createPDPEndpoints() mp_listener = new PDPListener(this); - if (mp_RTPSParticipant->createReader(&mp_PDPReader, ratt, mp_PDPReaderHistory, mp_listener, + RTPSReader* reader = nullptr; + if (mp_RTPSParticipant->createReader(&reader, ratt, endpoints->reader.history_.get(), mp_listener, c_EntityId_SPDPReader, true, false)) { - //#if HAVE_SECURITY - // mp_RTPSParticipant->set_endpoint_rtps_protection_supports(rout, false); - //#endif + endpoints->reader.reader_ = dynamic_cast(reader); + +#if HAVE_SECURITY + mp_RTPSParticipant->set_endpoint_rtps_protection_supports(reader, false); +#endif // if HAVE_SECURITY + // Initial peer list doesn't make sense in server scenario. Client should match its server list { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); @@ -233,17 +247,16 @@ bool PDPClient::createPDPEndpoints() else { EPROSIMA_LOG_ERROR(RTPS_PDP, "PDPClient Reader creation failed"); - delete(mp_PDPReaderHistory); - mp_PDPReaderHistory = nullptr; - delete(mp_listener); + delete mp_listener; mp_listener = nullptr; + endpoints->reader.release(); return false; } hatt.payloadMaxSize = mp_builtin->m_att.writerPayloadSize; hatt.initialReservedCaches = pdp_initial_reserved_caches; hatt.memoryPolicy = mp_builtin->m_att.writerHistoryMemoryPolicy; - mp_PDPWriterHistory = new WriterHistory(hatt); + endpoints->writer.history_.reset(new WriterHistory(hatt)); WriterAttributes watt; watt.endpoint.endpointKind = WRITER; @@ -263,12 +276,16 @@ bool PDPClient::createPDPEndpoints() watt.mode = ASYNCHRONOUS_WRITER; } - if (mp_RTPSParticipant->createWriter(&mp_PDPWriter, watt, mp_PDPWriterHistory, nullptr, + RTPSWriter* wout = nullptr; + if (mp_RTPSParticipant->createWriter(&wout, watt, endpoints->writer.history_.get(), nullptr, c_EntityId_SPDPWriter, true)) { - //#if HAVE_SECURITY - // mp_RTPSParticipant->set_endpoint_rtps_protection_supports(wout, false); - //#endif + endpoints->writer.writer_ = dynamic_cast(wout); + +#if HAVE_SECURITY + mp_RTPSParticipant->set_endpoint_rtps_protection_supports(wout, false); +#endif // if HAVE_SECURITY + { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); @@ -281,8 +298,7 @@ bool PDPClient::createPDPEndpoints() else { EPROSIMA_LOG_ERROR(RTPS_PDP, "PDPClient Writer creation failed"); - delete(mp_PDPWriterHistory); - mp_PDPWriterHistory = nullptr; + endpoints->writer.release(); return false; } EPROSIMA_LOG_INFO(RTPS_PDP, "PDPClient Endpoints creation finished"); @@ -326,6 +342,8 @@ void PDPClient::removeRemoteEndpoints( // EDP endpoints have been already unmatch by the associated listener assert(!mp_EDP->areRemoteEndpointsMatched(pdata)); + auto endpoints = static_cast(builtin_endpoints_.get()); + bool is_server = false; { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); @@ -358,7 +376,7 @@ void PDPClient::removeRemoteEndpoints( wguid.guidPrefix = pdata->m_guid.guidPrefix; wguid.entityId = c_EntityId_SPDPWriter; - mp_PDPReader->matched_writer_remove(wguid); + endpoints->reader.reader_->matched_writer_remove(wguid); // rematch but discarding any previous state of the server // because we know the server shutdown intencionally @@ -371,7 +389,7 @@ void PDPClient::removeRemoteEndpoints( temp_writer_data->set_remote_locators(pdata->metatraffic_locators, network, true); temp_writer_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; temp_writer_data->m_qos.m_durability.kind = TRANSIENT_DURABILITY_QOS; - mp_PDPReader->matched_writer_add(*temp_writer_data); + endpoints->reader.reader_->matched_writer_add(*temp_writer_data); } auxendp = endp; @@ -382,7 +400,7 @@ void PDPClient::removeRemoteEndpoints( GUID_t rguid; rguid.guidPrefix = pdata->m_guid.guidPrefix; rguid.entityId = c_EntityId_SPDPReader; - mp_PDPWriter->matched_reader_remove(rguid); + endpoints->writer.writer_->matched_reader_remove(rguid); auto temp_reader_data = get_temporary_reader_proxies_pool().get(); @@ -392,21 +410,23 @@ void PDPClient::removeRemoteEndpoints( temp_reader_data->set_remote_locators(pdata->metatraffic_locators, network, true); temp_reader_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; temp_reader_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; - mp_PDPWriter->matched_reader_add(*temp_reader_data); + endpoints->writer.writer_->matched_reader_add(*temp_reader_data); } } } bool PDPClient::all_servers_acknowledge_PDP() { + auto endpoints = static_cast(builtin_endpoints_.get()); + // check if already initialized - assert(mp_PDPWriterHistory && mp_PDPWriter); + assert(endpoints->writer.history_ && endpoints->writer.writer_); // get a reference to client proxy data CacheChange_t* pPD; - if (mp_PDPWriterHistory->get_min_change(&pPD)) + if (endpoints->writer.history_->get_min_change(&pPD)) { - return mp_PDPWriter->is_acked_by_all(pPD); + return endpoints->writer.writer_->is_acked_by_all(pPD); } else { @@ -420,9 +440,9 @@ bool PDPClient::all_servers_acknowledge_PDP() bool PDPClient::is_all_servers_PDPdata_updated() { // Assess all server DATA has been received - fastrtps::rtps::StatefulReader* pR = dynamic_cast(mp_PDPReader); - assert(pR); - return pR->isInCleanState(); + auto endpoints = static_cast(builtin_endpoints_.get()); + assert(endpoints->reader.reader_); + return endpoints->reader.reader_->isInCleanState(); } void PDPClient::announceParticipantState( @@ -432,6 +452,10 @@ void PDPClient::announceParticipantState( { if (enabled_) { + auto endpoints = static_cast(builtin_endpoints_.get()); + fastrtps::rtps::StatefulWriter& writer = *(endpoints->writer.writer_); + WriterHistory& history = *endpoints->writer.history_; + /* Protect writer sequence number. Make sure in order to prevent AB BA deadlock that the PDP mutex is systematically lock before the writer one (if needed): @@ -443,12 +467,12 @@ void PDPClient::announceParticipantState( std::lock_guard lock(*getMutex()); - std::lock_guard wlock(mp_PDPWriter->getMutex()); + std::lock_guard wlock(writer.getMutex()); WriteParams wp; SampleIdentity local; - local.writer_guid(mp_PDPWriter->getGuid()); - local.sequence_number(mp_PDPWriterHistory->next_sequence_number()); + local.writer_guid(writer.getGuid()); + local.sequence_number(history.next_sequence_number()); wp.sample_identity(local); wp.related_sample_identity(local); @@ -459,12 +483,9 @@ void PDPClient::announceParticipantState( // note here we can no longer receive and DATA or ACKNACK from clients. // In order to avoid that we send the message directly as in the standard stateless PDP - fastrtps::rtps::StatefulWriter* pW = dynamic_cast(mp_PDPWriter); - assert(pW); - CacheChange_t* change = nullptr; - if ((change = pW->new_change( + if ((change = writer.new_change( [this]() -> uint32_t { return mp_builtin->m_att.writerPayloadSize; @@ -472,7 +493,7 @@ void PDPClient::announceParticipantState( NOT_ALIVE_DISPOSED_UNREGISTERED, getLocalParticipantProxyData()->m_key))) { // update the sequence number - change->sequenceNumber = mp_PDPWriterHistory->next_sequence_number(); + change->sequenceNumber = history.next_sequence_number(); change->write_params = wp; std::vector remote_readers; @@ -505,7 +526,7 @@ void PDPClient::announceParticipantState( } DirectMessageSender sender(getRTPSParticipant(), &remote_readers, &locators); - RTPSMessageGroup group(getRTPSParticipant(), mp_PDPWriter, &sender); + RTPSMessageGroup group(getRTPSParticipant(), &writer, &sender); if (!group.add_data(*change, false)) { EPROSIMA_LOG_ERROR(RTPS_PDP, "Error sending announcement from client to servers"); @@ -513,17 +534,17 @@ void PDPClient::announceParticipantState( } // free change - mp_PDPWriter->release_change(change); + writer.release_change(change); } else { - PDP::announceParticipantState(new_change, dispose, wp); + PDP::announceParticipantState(writer, history, new_change, dispose, wp); if (!new_change) { // retrieve the participant discovery data CacheChange_t* pPD; - if (mp_PDPWriterHistory->get_min_change(&pPD)) + if (history.get_min_change(&pPD)) { std::vector remote_readers; LocatorList locators; @@ -543,7 +564,7 @@ void PDPClient::announceParticipantState( } DirectMessageSender sender(getRTPSParticipant(), &remote_readers, &locators); - RTPSMessageGroup group(getRTPSParticipant(), mp_PDPWriter, &sender); + RTPSMessageGroup group(getRTPSParticipant(), &writer, &sender); if (!group.add_data(*pPD, false)) { @@ -590,7 +611,8 @@ bool PDPClient::match_servers_EDP_endpoints() void PDPClient::update_remote_servers_list() { - if (!mp_PDPReader || !mp_PDPWriter) + auto endpoints = static_cast(builtin_endpoints_.get()); + if (!endpoints->reader.reader_ || !endpoints->writer.writer_) { EPROSIMA_LOG_ERROR(SERVER_CLIENT_DISCOVERY, "Cannot update server list within an uninitialized Client"); return; @@ -600,12 +622,12 @@ void PDPClient::update_remote_servers_list() for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { - if (!mp_PDPReader->matched_writer_is_matched(it.GetPDPWriter())) + if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter())) { match_pdp_writer_nts_(it); } - if (!mp_PDPWriter->matched_reader_is_matched(it.GetPDPReader())) + if (!endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader())) { match_pdp_reader_nts_(it); } @@ -616,6 +638,7 @@ void PDPClient::update_remote_servers_list() void PDPClient::match_pdp_writer_nts_( const eprosima::fastdds::rtps::RemoteServerAttributes& server_att) { + auto endpoints = static_cast(builtin_endpoints_.get()); const NetworkFactory& network = mp_RTPSParticipant->network_factory(); auto temp_writer_data = get_temporary_writer_proxies_pool().get(); @@ -625,12 +648,13 @@ void PDPClient::match_pdp_writer_nts_( temp_writer_data->set_remote_unicast_locators(server_att.metatrafficUnicastLocatorList, network); temp_writer_data->m_qos.m_durability.kind = TRANSIENT_DURABILITY_QOS; temp_writer_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; - mp_PDPReader->matched_writer_add(*temp_writer_data); + endpoints->reader.reader_->matched_writer_add(*temp_writer_data); } void PDPClient::match_pdp_reader_nts_( const eprosima::fastdds::rtps::RemoteServerAttributes& server_att) { + auto endpoints = static_cast(builtin_endpoints_.get()); const NetworkFactory& network = mp_RTPSParticipant->network_factory(); auto temp_reader_data = get_temporary_reader_proxies_pool().get(); @@ -640,7 +664,7 @@ void PDPClient::match_pdp_reader_nts_( temp_reader_data->set_remote_unicast_locators(server_att.metatrafficUnicastLocatorList, network); temp_reader_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; temp_reader_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; - mp_PDPWriter->matched_reader_add(*temp_reader_data); + endpoints->writer.writer_->matched_reader_add(*temp_reader_data); } const std::string& ros_discovery_server_env() @@ -990,16 +1014,18 @@ bool PDPClient::remove_remote_participant( return true; } + auto endpoints = static_cast(builtin_endpoints_.get()); + // Erase Proxies created before having the Participant GUID_t wguid; wguid.guidPrefix = partGUID.guidPrefix; wguid.entityId = c_EntityId_SPDPWriter; - mp_PDPReader->matched_writer_remove(wguid); + endpoints->reader.reader_->matched_writer_remove(wguid); GUID_t rguid; rguid.guidPrefix = partGUID.guidPrefix; rguid.entityId = c_EntityId_SPDPReader; - mp_PDPWriter->matched_reader_remove(rguid); + endpoints->writer.writer_->matched_reader_remove(rguid); update_remote_servers_list(); diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.h b/src/cpp/rtps/builtin/discovery/participant/PDPClient.h index aaaafd4f248..454c17534d8 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.h +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.h @@ -145,6 +145,8 @@ class PDPClient : public PDP protected: + void update_builtin_locators() override; + /** * Manually match the local PDP reader with the PDP writer of a given server. The function is * not thread safe (nts) in the sense that it does not take the PDP mutex. It does however take diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPEndpoints.hpp b/src/cpp/rtps/builtin/discovery/participant/PDPEndpoints.hpp new file mode 100644 index 00000000000..a50d2bad6d1 --- /dev/null +++ b/src/cpp/rtps/builtin/discovery/participant/PDPEndpoints.hpp @@ -0,0 +1,79 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file PDPEndpoints.hpp + */ + +#ifndef FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT__PDPENDPOINTS_HPP_ +#define FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT__PDPENDPOINTS_HPP_ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/** + * Models that the base PDP class can have a different container for its builtin endpoints. + * Each subclass of PDP might contain a different subclass of PDPEndpoints. + */ +class PDPEndpoints +{ +public: + + // Designed for inheritance, so destructor must be virtual + virtual ~PDPEndpoints() = default; + + /** + * Returns a mask with the list of builtin endpoints contained by this class. + * + * @return The list of builtin endpoints to announce. + */ + virtual fastrtps::rtps::BuiltinEndpointSet_t builtin_endpoints() const = 0; + + virtual bool enable_pdp_readers( + fastrtps::rtps::RTPSParticipantImpl* participant) = 0; + virtual void disable_pdp_readers( + fastrtps::rtps::RTPSParticipantImpl* participant) = 0; + virtual void delete_pdp_endpoints( + fastrtps::rtps::RTPSParticipantImpl* participant) = 0; + virtual void remove_from_pdp_reader_history( + const fastrtps::rtps::InstanceHandle_t& remote_participant) = 0; + virtual void remove_from_pdp_reader_history( + fastrtps::rtps::CacheChange_t* change) = 0; + +protected: + + // Cannot be directly constructed + PDPEndpoints() = default; + + // Non-copyable, non-moveable + PDPEndpoints( + const PDPEndpoints&) = delete; + PDPEndpoints( + PDPEndpoints&&) = delete; + PDPEndpoints& operator =( + const PDPEndpoints&) = delete; + PDPEndpoints& operator =( + PDPEndpoints&&) = delete; +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT__PDPENDPOINTS_HPP_ diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp index 0b7b3c2a0e3..af20b93f739 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp @@ -32,6 +32,7 @@ #include #include +#include #include #include @@ -64,7 +65,7 @@ void PDPListener::onNewCacheChangeAdded( if (!this->get_key(change)) { EPROSIMA_LOG_WARNING(RTPS_PDP, "Problem getting the key of the change, removing"); - parent_pdp_->mp_PDPReaderHistory->remove_change(change); + parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change); return; } } @@ -79,7 +80,7 @@ void PDPListener::onNewCacheChangeAdded( if (guid == parent_pdp_->getRTPSParticipant()->getGuid()) { EPROSIMA_LOG_INFO(RTPS_PDP, "Message from own RTPSParticipant, removing"); - parent_pdp_->mp_PDPReaderHistory->remove_change(change); + parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change); return; } @@ -216,7 +217,7 @@ void PDPListener::onNewCacheChangeAdded( } //Remove change form history. - parent_pdp_->mp_PDPReaderHistory->remove_change(change); + parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change); } bool PDPListener::get_key( diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index a34e4b997c6..972e821e88a 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -38,6 +38,7 @@ #include #include +#include #include #include @@ -152,7 +153,7 @@ bool PDPServer::init( // Activate listeners EDPServer* edp = static_cast(mp_EDP); - getRTPSParticipant()->enableReader(mp_PDPReader); + builtin_endpoints_->enable_pdp_readers(getRTPSParticipant()); getRTPSParticipant()->enableReader(edp->subscriptions_reader_.first); getRTPSParticipant()->enableReader(edp->publications_reader_.first); @@ -226,12 +227,21 @@ ParticipantProxyData* PDPServer::createParticipantProxyData( return pdata; } +void PDPServer::update_builtin_locators() +{ + auto endpoints = static_cast(builtin_endpoints_.get()); + mp_builtin->updateMetatrafficLocators(endpoints->reader.reader_->getAttributes().unicastLocatorList); +} + bool PDPServer::createPDPEndpoints() { EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "Beginning PDPServer Endpoints creation"); const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes(); + auto endpoints = new fastdds::rtps::DiscoveryServerPDPEndpoints(); + builtin_endpoints_.reset(endpoints); + /*********************************** * PDP READER ***********************************/ @@ -240,7 +250,7 @@ bool PDPServer::createPDPEndpoints() hatt.payloadMaxSize = mp_builtin->m_att.readerPayloadSize; hatt.initialReservedCaches = pdp_initial_reserved_caches; hatt.memoryPolicy = mp_builtin->m_att.readerHistoryMemoryPolicy; - mp_PDPReaderHistory = new ReaderHistory(hatt); + endpoints->reader.history_.reset(new ReaderHistory(hatt)); // PDP Reader Attributes ReaderAttributes ratt; @@ -266,11 +276,18 @@ bool PDPServer::createPDPEndpoints() mp_listener = new PDPServerListener(this); // Create PDP Reader - if (mp_RTPSParticipant->createReader(&mp_PDPReader, ratt, mp_PDPReaderHistory, + RTPSReader* reader = nullptr; + if (mp_RTPSParticipant->createReader(&reader, ratt, endpoints->reader.history_.get(), mp_listener, c_EntityId_SPDPReader, true, false)) { + endpoints->reader.reader_ = dynamic_cast(reader); + // Enable unknown clients to reach this reader - mp_PDPReader->enableMessagesFromUnkownWriters(true); + reader->enableMessagesFromUnkownWriters(true); + +#if HAVE_SECURITY + mp_RTPSParticipant->set_endpoint_rtps_protection_supports(reader, false); +#endif // if HAVE_SECURITY { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); @@ -286,10 +303,9 @@ bool PDPServer::createPDPEndpoints() else { EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "PDPServer Reader creation failed"); - delete(mp_PDPReaderHistory); - mp_PDPReaderHistory = nullptr; - delete(mp_listener); + delete mp_listener; mp_listener = nullptr; + endpoints->reader.release(); return false; } @@ -301,7 +317,7 @@ bool PDPServer::createPDPEndpoints() hatt.payloadMaxSize = mp_builtin->m_att.writerPayloadSize; hatt.initialReservedCaches = pdp_initial_reserved_caches; hatt.memoryPolicy = mp_builtin->m_att.writerHistoryMemoryPolicy; - mp_PDPWriterHistory = new WriterHistory(hatt); + endpoints->writer.history_.reset(new WriterHistory(hatt)); // PDP Writer Attributes WriterAttributes watt; @@ -329,14 +345,21 @@ bool PDPServer::createPDPEndpoints() watt.mode = ASYNCHRONOUS_WRITER; // Create PDP Writer - if (mp_RTPSParticipant->createWriter(&mp_PDPWriter, watt, mp_PDPWriterHistory, + RTPSWriter* wout = nullptr; + if (mp_RTPSParticipant->createWriter(&wout, watt, endpoints->writer.history_.get(), nullptr, c_EntityId_SPDPWriter, true)) { + endpoints->writer.writer_ = dynamic_cast(wout); + +#if HAVE_SECURITY + mp_RTPSParticipant->set_endpoint_rtps_protection_supports(wout, false); +#endif // if HAVE_SECURITY + // Set pdp filter to writer IReaderDataFilter* pdp_filter = static_cast*>(&discovery_db_); - mp_PDPWriter->reader_data_filter(pdp_filter); + wout->reader_data_filter(pdp_filter); // Enable separate sending so the filter can be called for each change and reader proxy - mp_PDPWriter->set_separate_sending(true); + wout->set_separate_sending(true); { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); @@ -351,12 +374,11 @@ bool PDPServer::createPDPEndpoints() else { EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "PDPServer Writer creation failed"); - delete(mp_PDPWriterHistory); - mp_PDPWriterHistory = nullptr; + endpoints->writer.release(); return false; } // TODO check if this should be done here or before this point in creation - mp_PDPWriterHistory->remove_all_changes(); + endpoints->writer.history_->remove_all_changes(); EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "PDPServer Endpoints creation finished"); return true; @@ -402,6 +424,7 @@ void PDPServer::assignRemoteEndpoints( { EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "Assigning remote endpoint for RTPSParticipant: " << pdata->m_guid.guidPrefix); + auto endpoints = static_cast(builtin_endpoints_.get()); const NetworkFactory& network = mp_RTPSParticipant->network_factory(); uint32_t endp = pdata->m_availableBuiltinEndpoints; bool use_multicast_locators = !mp_RTPSParticipant->getAttributes().builtin.avoid_builtin_multicast || @@ -421,7 +444,7 @@ void PDPServer::assignRemoteEndpoints( temp_writer_data->set_remote_locators(pdata->metatraffic_locators, network, use_multicast_locators); temp_writer_data->m_qos.m_reliability.kind = dds::RELIABLE_RELIABILITY_QOS; temp_writer_data->m_qos.m_durability.kind = dds::TRANSIENT_LOCAL_DURABILITY_QOS; - mp_PDPReader->matched_writer_add(*temp_writer_data); + endpoints->reader.reader_->matched_writer_add(*temp_writer_data); } else { @@ -443,7 +466,7 @@ void PDPServer::assignRemoteEndpoints( temp_reader_data->set_remote_locators(pdata->metatraffic_locators, network, use_multicast_locators); temp_reader_data->m_qos.m_reliability.kind = dds::RELIABLE_RELIABILITY_QOS; temp_reader_data->m_qos.m_durability.kind = dds::TRANSIENT_LOCAL_DURABILITY_QOS; - mp_PDPWriter->matched_reader_add(*temp_reader_data); + endpoints->writer.writer_->matched_reader_add(*temp_reader_data); } else { @@ -477,10 +500,12 @@ void PDPServer::removeRemoteEndpoints( EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "For RTPSParticipant: " << pdata->m_guid); uint32_t endp = pdata->m_availableBuiltinEndpoints; + auto endpoints = static_cast(builtin_endpoints_.get()); + if (endp & DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER) { GUID_t writer_guid(pdata->m_guid.guidPrefix, c_EntityId_SPDPWriter); - mp_PDPReader->matched_writer_remove(writer_guid); + endpoints->reader.reader_->matched_writer_remove(writer_guid); } else { @@ -492,7 +517,7 @@ void PDPServer::removeRemoteEndpoints( if (endp & DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR) { GUID_t reader_guid(pdata->m_guid.guidPrefix, c_EntityId_SPDPReader); - mp_PDPWriter->matched_reader_remove(reader_guid); + endpoints->writer.writer_->matched_reader_remove(reader_guid); } else { @@ -558,9 +583,6 @@ void PDPServer::announceParticipantState( "Announcing Server " << mp_RTPSParticipant->getGuid() << " (new change: " << new_change << ")"); CacheChange_t* change = nullptr; - StatefulWriter* pW = dynamic_cast(mp_PDPWriter); - assert(pW); - /* Protect writer sequence number. Make sure in order to prevent AB BA deadlock that the PDP mutex is systematically locked before the writer one (if needed): @@ -572,7 +594,13 @@ void PDPServer::announceParticipantState( getMutex()->lock(); - std::lock_guard wlock(pW->getMutex()); + auto endpoints = static_cast(builtin_endpoints_.get()); + assert(endpoints->writer.writer_); + + fastrtps::rtps::StatefulWriter& writer = *(endpoints->writer.writer_); + WriterHistory& history = *endpoints->writer.history_; + + std::lock_guard wlock(writer.getMutex()); if (!dispose) { @@ -584,10 +612,10 @@ void PDPServer::announceParticipantState( // Prepare identity WriteParams wp; - SequenceNumber_t sn = mp_PDPWriterHistory->next_sequence_number(); + SequenceNumber_t sn = history.next_sequence_number(); { SampleIdentity local; - local.writer_guid(mp_PDPWriter->getGuid()); + local.writer_guid(writer.getGuid()); local.sequence_number(sn); wp.sample_identity(local); wp.related_sample_identity(local); @@ -597,7 +625,7 @@ void PDPServer::announceParticipantState( getMutex()->unlock(); uint32_t cdr_size = proxy_data_copy.get_serialized_size(true); - change = mp_PDPWriter->new_change( + change = writer.new_change( [cdr_size]() -> uint32_t { return cdr_size; @@ -646,7 +674,7 @@ void PDPServer::announceParticipantState( } // Add our change to PDPWriterHistory - mp_PDPWriterHistory->add_change(change, wp); + history.add_change(change, wp); change->write_params = wp; // Update the database with our own data @@ -662,7 +690,7 @@ void PDPServer::announceParticipantState( // Already there, dispose EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "DiscoveryDatabase already initialized with local DATA(p) on creation"); - mp_PDPWriter->release_change(change); + writer.release_change(change); } } // Doesn't make sense to send the DATA directly if it hasn't been introduced in the history yet (missing @@ -698,10 +726,10 @@ void PDPServer::announceParticipantState( // Prepare identity WriteParams wp; - SequenceNumber_t sn = mp_PDPWriterHistory->next_sequence_number(); + SequenceNumber_t sn = history.next_sequence_number(); { SampleIdentity local; - local.writer_guid(mp_PDPWriter->getGuid()); + local.writer_guid(writer.getGuid()); local.sequence_number(sn); wp.sample_identity(local); wp.related_sample_identity(local); @@ -710,7 +738,7 @@ void PDPServer::announceParticipantState( // Unlock PDP mutex since it's no longer needed. getMutex()->unlock(); - change = pW->new_change( + change = writer.new_change( [cdr_size]() -> uint32_t { return cdr_size; @@ -734,7 +762,7 @@ void PDPServer::announceParticipantState( { // Dispose if already there // It may happen if the participant is not removed fast enough - mp_PDPWriter->release_change(change); + writer.release_change(change); return; } } @@ -778,6 +806,8 @@ bool PDPServer::remove_remote_participant( const GUID_t& partGUID, ParticipantDiscoveryInfo::DISCOVERY_STATUS reason) { + auto endpoints = static_cast(builtin_endpoints_.get()); + // Notify the DiscoveryDataBase on lease duration removal because the listener // has already notified the database in all other cases if (ParticipantDiscoveryInfo::DROPPED_PARTICIPANT == reason) @@ -786,14 +816,14 @@ bool PDPServer::remove_remote_participant( // TODO check in standard if DROP payload is always 0 // We create the drop from Reader to make release simplier - mp_PDPReader->reserveCache(&pC, mp_builtin->m_att.writerPayloadSize); + endpoints->reader.reader_->reserveCache(&pC, mp_builtin->m_att.writerPayloadSize); // We must create the corresponding DATA(p[UD]) if (nullptr != pC) { pC->instanceHandle = partGUID; pC->kind = NOT_ALIVE_DISPOSED_UNREGISTERED; - pC->writerGUID = mp_PDPWriter->getGuid(); + pC->writerGUID = endpoints->writer.writer_->getGuid(); // Reset the internal CacheChange_t union. pC->writer_info.next = nullptr; pC->writer_info.previous = nullptr; @@ -802,8 +832,8 @@ bool PDPServer::remove_remote_participant( // Use this server identity in order to hint clients it's a lease duration demise WriteParams& wp = pC->write_params; SampleIdentity local; - local.writer_guid(mp_PDPWriter->getGuid()); - local.sequence_number(mp_PDPWriterHistory->next_sequence_number()); + local.writer_guid(endpoints->writer.writer_->getGuid()); + local.sequence_number(endpoints->writer.history_->next_sequence_number()); wp.sample_identity(local); wp.related_sample_identity(local); @@ -819,7 +849,7 @@ bool PDPServer::remove_remote_participant( else { // if the database doesn't take the ownership remove - mp_PDPReader->releaseCache(pC); + endpoints->reader.reader_->releaseCache(pC); } } } @@ -908,7 +938,8 @@ bool PDPServer::server_update_routine() void PDPServer::update_remote_servers_list() { - if (!mp_PDPReader || !mp_PDPWriter) + auto endpoints = static_cast(builtin_endpoints_.get()); + if (!endpoints->reader.reader_ || !endpoints->writer.writer_) { EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "Cannot update server list within an uninitialized Server"); return; @@ -920,12 +951,12 @@ void PDPServer::update_remote_servers_list() for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { - if (!mp_PDPReader->matched_writer_is_matched(it.GetPDPWriter())) + if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter())) { match_pdp_writer_nts_(it); } - if (!mp_PDPWriter->matched_reader_is_matched(it.GetPDPReader())) + if (!endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader())) { match_pdp_reader_nts_(it); } @@ -941,6 +972,8 @@ bool PDPServer::process_writers_acknowledgements() { EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "process_writers_acknowledgements start"); + auto endpoints = static_cast(builtin_endpoints_.get()); + // Execute first ACK for endpoints because PDP acked changes relevance in EDP, // which can result in false positives in EDP acknowledgements. @@ -952,8 +985,7 @@ bool PDPServer::process_writers_acknowledgements() pending |= process_history_acknowledgement(edp->publications_writer_.first, edp->publications_writer_.second); /* PDP Writer's History */ - pending |= process_history_acknowledgement( - static_cast(mp_PDPWriter), mp_PDPWriterHistory); + pending |= process_history_acknowledgement(endpoints->writer.writer_, endpoints->writer.history_.get()); return pending; } @@ -1045,7 +1077,8 @@ History::iterator PDPServer::process_change_acknowledgement( bool PDPServer::process_disposals() { EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "process_disposals start"); - // EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "process_disposals start"); + + auto endpoints = static_cast(builtin_endpoints_.get()); EDPServer* edp = static_cast(mp_EDP); fastrtps::rtps::WriterHistory* pubs_history = edp->publications_writer_.second; fastrtps::rtps::WriterHistory* subs_history = edp->subscriptions_writer_.second; @@ -1062,20 +1095,20 @@ bool PDPServer::process_disposals() // Get the identity of the participant from which the change came. fastrtps::rtps::GuidPrefix_t change_guid_prefix = discovery_db_.guid_from_change(change).guidPrefix; - change->writerGUID.guidPrefix = mp_PDPWriter->getGuid().guidPrefix; + change->writerGUID.guidPrefix = endpoints->writer.writer_->getGuid().guidPrefix; // DATA(Up) case if (discovery_db_.is_participant(change)) { // Lock PDP writer - std::unique_lock lock(mp_PDPWriter->getMutex()); + std::unique_lock lock(endpoints->writer.writer_->getMutex()); // Remove all DATA(p) with the same sample identity as the DATA(Up) from PDP writer's history. - remove_related_alive_from_history_nts(mp_PDPWriterHistory, change_guid_prefix); + remove_related_alive_from_history_nts(endpoints->writer.history_.get(), change_guid_prefix); // Add DATA(Up) to PDP writer's history eprosima::fastrtps::rtps::WriteParams wp = change->write_params; - mp_PDPWriterHistory->add_change(change, wp); + endpoints->writer.history_->add_change(change, wp); } // DATA(Uw) case else if (discovery_db_.is_writer(change)) @@ -1135,6 +1168,7 @@ void PDPServer::process_changes_release_( const std::vector& changes) { // We will need the EDP publications/subscriptions writers, readers, and histories + auto endpoints = static_cast(builtin_endpoints_.get()); EDPServer* edp = static_cast(mp_EDP); // For each change to erase, first try to erase in case is in writer history and then it releases it @@ -1143,15 +1177,15 @@ void PDPServer::process_changes_release_( // Check if change owner is this participant. In that case, the change comes from a writer pool (PDP, EDP // publications or EDP subscriptions) // We compare the instance handle, as the only changes from our own server are its owns - if (discovery_db().guid_from_change(ch) == mp_PDPWriter->getGuid()) + if (discovery_db().guid_from_change(ch) == endpoints->writer.writer_->getGuid()) { if (discovery_db_.is_participant(ch)) { // The change must return to the pool even if not present in the history // Normally Data(Up) will not be in history except in Own Server destruction - if (!remove_change_from_writer_history(mp_PDPWriter, mp_PDPWriterHistory, ch)) + if (!remove_change_from_writer_history(endpoints->writer.writer_, endpoints->writer.history_.get(), ch)) { - mp_PDPWriter->release_change(ch); + endpoints->writer.writer_->release_change(ch); } } else if (discovery_db_.is_writer(ch)) @@ -1193,8 +1227,12 @@ void PDPServer::process_changes_release_( // take it out upon reading it in the listeners if (discovery_db_.is_participant(ch)) { - remove_change_from_writer_history(mp_PDPWriter, mp_PDPWriterHistory, ch, false); - mp_PDPReader->releaseCache(ch); + remove_change_from_writer_history( + endpoints->writer.writer_, + endpoints->writer.history_.get(), + ch, + false); + endpoints->reader.reader_->releaseCache(ch); } else if (discovery_db_.is_writer(ch)) { @@ -1278,7 +1316,8 @@ bool PDPServer::process_to_send_lists() { // Process pdp_to_send_ EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "Processing pdp_to_send"); - process_to_send_list(discovery_db_.pdp_to_send(), mp_PDPWriter, mp_PDPWriterHistory); + auto endpoints = static_cast(builtin_endpoints_.get()); + process_to_send_list(discovery_db_.pdp_to_send(), endpoints->writer.writer_, endpoints->writer.history_.get()); } else { @@ -1366,15 +1405,16 @@ bool PDPServer::remove_change_from_history_nts( bool PDPServer::pending_ack() { + auto endpoints = static_cast(builtin_endpoints_.get()); EDPServer* edp = static_cast(mp_EDP); bool ret = (!discovery_db_.server_acked_by_all() || - mp_PDPWriterHistory->getHistorySize() > 1 || + endpoints->writer.history_->getHistorySize() > 1 || edp->publications_writer_.second->getHistorySize() > 0 || edp->subscriptions_writer_.second->getHistorySize() > 0); - EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "PDP writer history length " << mp_PDPWriterHistory->getHistorySize()); + EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "PDP writer history length " << endpoints->writer.history_->getHistorySize()); EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, - "is server " << mp_PDPWriter->getGuid() << " acked by all? " << + "is server " << endpoints->writer.writer_->getGuid() << " acked by all? " << discovery_db_.server_acked_by_all()); EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "Are there pending changes? " << ret); return ret; @@ -1401,7 +1441,9 @@ eprosima::fastrtps::rtps::ResourceEvent& PDPServer::get_resource_event_thread() bool PDPServer::all_servers_acknowledge_pdp() { // check if already initialized - assert(mp_PDPWriterHistory && mp_PDPWriter); + auto endpoints = static_cast(builtin_endpoints_.get()); + static_cast(endpoints); + assert(endpoints->writer.history_ && endpoints->writer.writer_); return discovery_db_.server_acked_by_my_servers(); } @@ -1446,17 +1488,17 @@ void PDPServer::send_announcement( return; } + auto endpoints = static_cast(builtin_endpoints_.get()); DirectMessageSender sender(getRTPSParticipant(), &remote_readers, &locators); - RTPSMessageGroup group(getRTPSParticipant(), mp_PDPWriter, &sender); + RTPSMessageGroup group(getRTPSParticipant(), endpoints->writer.writer_, &sender); if (dispose) { - fastrtps::rtps::StatefulWriter* writer = static_cast(mp_PDPWriter); - writer->fastrtps::rtps::StatefulWriter::incrementHBCount(); + endpoints->writer.writer_->fastrtps::rtps::StatefulWriter::incrementHBCount(); group.add_heartbeat( change->sequenceNumber, change->sequenceNumber, - writer->getHeartbeatCount(), + endpoints->writer.writer_->getHeartbeatCount(), true, false); } @@ -1519,7 +1561,8 @@ bool PDPServer::process_backup_discovery_database_restore( EDPServerSUBListener* edp_sub_listener = static_cast(edp->subscriptions_listener_); // These mutexes are necessary to send messages to the listeners - std::unique_lock lock(mp_PDPReader->getMutex()); + auto endpoints = static_cast(builtin_endpoints_.get()); + std::unique_lock lock(endpoints->reader.reader_->getMutex()); std::unique_lock lock_edpp(edp->publications_reader_.first->getMutex()); std::unique_lock lock_edps(edp->subscriptions_reader_.first->getMutex()); @@ -1539,7 +1582,7 @@ bool PDPServer::process_backup_discovery_database_restore( std::istringstream(it.value()["change"]["sample_identity"].get()) >> sample_identity_aux; // Reserve memory for new change. There will not be changes from own server - if (!mp_PDPReader->reserveCache(&change_aux, length)) + if (!endpoints->reader.reader_->reserveCache(&change_aux, length)) { EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "Error creating CacheChange"); // TODO release changes and exit @@ -1555,12 +1598,12 @@ bool PDPServer::process_backup_discovery_database_restore( // If the change was read as is_local we must pass it to listener with his own writer_guid if (it.value()["is_local"].get() && change_aux->write_params.sample_identity().writer_guid().guidPrefix != - mp_PDPWriter->getGuid().guidPrefix && + endpoints->writer.writer_->getGuid().guidPrefix && change_aux->kind == fastrtps::rtps::ALIVE) { change_aux->writerGUID = change_aux->write_params.sample_identity().writer_guid(); change_aux->sequenceNumber = change_aux->write_params.sample_identity().sequence_number(); - mp_listener->onNewCacheChangeAdded(mp_PDPReader, change_aux); + mp_listener->onNewCacheChangeAdded(endpoints->reader.reader_, change_aux); } } @@ -1596,7 +1639,7 @@ bool PDPServer::process_backup_discovery_database_restore( // should store in DDB if it is local even for endpoints // call listener to create proxy info for other entities different than server if (change_aux->write_params.sample_identity().writer_guid().guidPrefix != - mp_PDPWriter->getGuid().guidPrefix + endpoints->writer.writer_->getGuid().guidPrefix && change_aux->kind == fastrtps::rtps::ALIVE && it.value()["topic"] != discovery_db().virtual_topic()) { @@ -1632,7 +1675,7 @@ bool PDPServer::process_backup_discovery_database_restore( // call listener to create proxy info for other entities different than server if (change_aux->write_params.sample_identity().writer_guid().guidPrefix != - mp_PDPWriter->getGuid().guidPrefix + endpoints->writer.writer_->getGuid().guidPrefix && change_aux->kind == fastrtps::rtps::ALIVE && it.value()["topic"] != discovery_db().virtual_topic()) { @@ -1663,7 +1706,7 @@ bool PDPServer::process_backup_restore_queue( // EDPServerPUBListener* edp_pub_listener = static_cast(edp->publications_listener_); // EDPServerSUBListener* edp_sub_listener = static_cast(edp->subscriptions_listener_); - // std::unique_lock lock(mp_PDPReader->getMutex()); + // std::unique_lock lock(endpoints->reader.reader_->getMutex()); // std::unique_lock lock_edpp(edp->publications_reader_.first->getMutex()); // std::unique_lock lock_edps(edp->subscriptions_reader_.first->getMutex()); @@ -1680,11 +1723,11 @@ bool PDPServer::process_backup_restore_queue( // (std::istringstream) json_change["instance_handle"].get() >> instance_handle_aux; // // Belongs to own server - // if (sample_identity_aux.writer_guid() == mp_PDPWriter->getGuid()) + // if (sample_identity_aux.writer_guid() == endpoints->writer.writer_->getGuid()) // { // if (discovery_db_.is_participant(iHandle2GUID(instance_handle_aux))) // { - // if (!mp_PDPWriterHistory->reserve_Cache(&change_aux, length)) + // if (!endpoints->writer.history_->reserve_Cache(&change_aux, length)) // [this]() -> uint32_t // { // return mp_PDP->builtin_attributes().readerPayloadSize; @@ -1697,7 +1740,7 @@ bool PDPServer::process_backup_restore_queue( // else // { // ddb::from_json(json_change, *change_aux); - // mp_listener->onNewCacheChangeAdded(mp_PDPReader, change_aux); + // mp_listener->onNewCacheChangeAdded(endpoints->reader.reader_, change_aux); // } // } @@ -1733,7 +1776,7 @@ bool PDPServer::process_backup_restore_queue( // { // if (discovery_db_.is_participant(iHandle2GUID(instance_handle_aux))) // { - // if (!mp_PDPReaderHistory->reserve_Cache(&change_aux, length)) + // if (!endpoints->reader.history_->reserve_Cache(&change_aux, length)) // { // EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "Error creating CacheChange"); // // TODO release changes and exit @@ -1741,7 +1784,7 @@ bool PDPServer::process_backup_restore_queue( // else // { // ddb::from_json(json_change, *change_aux); - // mp_listener->onNewCacheChangeAdded(mp_PDPReader, change_aux); + // mp_listener->onNewCacheChangeAdded(endpoints->reader.reader_, change_aux); // } // } @@ -1805,6 +1848,7 @@ void PDPServer::process_backup_store() void PDPServer::match_pdp_writer_nts_( const eprosima::fastdds::rtps::RemoteServerAttributes& server_att) { + auto endpoints = static_cast(builtin_endpoints_.get()); const NetworkFactory& network = mp_RTPSParticipant->network_factory(); auto temp_writer_data = get_temporary_writer_proxies_pool().get(); @@ -1814,12 +1858,13 @@ void PDPServer::match_pdp_writer_nts_( temp_writer_data->set_remote_unicast_locators(server_att.metatrafficUnicastLocatorList, network); temp_writer_data->m_qos.m_durability.durabilityKind(durability_); temp_writer_data->m_qos.m_reliability.kind = dds::RELIABLE_RELIABILITY_QOS; - mp_PDPReader->matched_writer_add(*temp_writer_data); + endpoints->reader.reader_->matched_writer_add(*temp_writer_data); } void PDPServer::match_pdp_reader_nts_( const eprosima::fastdds::rtps::RemoteServerAttributes& server_att) { + auto endpoints = static_cast(builtin_endpoints_.get()); const NetworkFactory& network = mp_RTPSParticipant->network_factory(); auto temp_reader_data = get_temporary_reader_proxies_pool().get(); @@ -1829,7 +1874,14 @@ void PDPServer::match_pdp_reader_nts_( temp_reader_data->set_remote_unicast_locators(server_att.metatrafficUnicastLocatorList, network); temp_reader_data->m_qos.m_durability.kind = dds::TRANSIENT_LOCAL_DURABILITY_QOS; temp_reader_data->m_qos.m_reliability.kind = dds::RELIABLE_RELIABILITY_QOS; - mp_PDPWriter->matched_reader_add(*temp_reader_data); + endpoints->writer.writer_->matched_reader_add(*temp_reader_data); +} + +void PDPServer::release_change_from_writer( + eprosima::fastrtps::rtps::CacheChange_t* change) +{ + auto endpoints = static_cast(builtin_endpoints_.get()); + endpoints->writer.writer_->release_change(change); } } // namespace rtps diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp index 05b65d4d817..bfe73fd446c 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp @@ -187,6 +187,8 @@ class PDPServer : public fastrtps::rtps::PDP protected: + void update_builtin_locators() override; + /* * Get Pointer to the server resource event thread. */ @@ -294,6 +296,14 @@ class PDPServer : public fastrtps::rtps::PDP void match_pdp_reader_nts_( const eprosima::fastdds::rtps::RemoteServerAttributes& server_att); + /** + * Release a change from the history of the PDP writer. + * + * @param change The CacheChange_t to be released. + */ + void release_change_from_writer( + eprosima::fastrtps::rtps::CacheChange_t* change); + private: //! Server thread diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp index 85368e56a48..ff0082a80d7 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp @@ -30,6 +30,7 @@ #include #include +#include #include #include @@ -61,10 +62,11 @@ void PDPServerListener::onNewCacheChangeAdded( " --------------------"); EPROSIMA_LOG_INFO(RTPS_PDP_LISTENER, "PDP Server Message received: " << change_in->instanceHandle); + auto endpoints = static_cast(pdp_server()->builtin_endpoints_.get()); // Get PDP reader history - auto pdp_history = pdp_server()->mp_PDPReaderHistory; + auto pdp_history = endpoints->reader.history_.get(); // Get PDP reader to release change - auto pdp_reader = pdp_server()->mp_PDPReader; + auto pdp_reader = endpoints->reader.reader_; bool routine_should_be_awake = false; diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp index a2fa029dbcb..cfc1087faa4 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp @@ -42,6 +42,7 @@ #include #include +#include #include #include @@ -67,6 +68,12 @@ PDPSimple::~PDPSimple() { } +void PDPSimple::update_builtin_locators() +{ + auto endpoints = static_cast(builtin_endpoints_.get()); + mp_builtin->updateMetatrafficLocators(endpoints->reader.reader_->getAttributes().unicastLocatorList); +} + void PDPSimple::initializeParticipantProxyData( ParticipantProxyData* participant_data) { @@ -223,20 +230,15 @@ void PDPSimple::announceParticipantState( { if (enabled_) { - PDP::announceParticipantState(new_change, dispose, wp); + auto endpoints = static_cast(builtin_endpoints_.get()); + StatelessWriter& writer = *(endpoints->writer.writer_); + WriterHistory& history = *(endpoints->writer.history_); + + PDP::announceParticipantState(writer, history, new_change, dispose, wp); if (!(dispose || new_change)) { - StatelessWriter* pW = dynamic_cast(mp_PDPWriter); - - if (pW != nullptr) - { - pW->unsent_changes_reset(); - } - else - { - EPROSIMA_LOG_ERROR(RTPS_PDP, "Using PDPSimple protocol with a reliable writer"); - } + writer.unsent_changes_reset(); } } } @@ -247,11 +249,15 @@ bool PDPSimple::createPDPEndpoints() const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes(); const RTPSParticipantAllocationAttributes& allocation = pattr.allocation; + const BuiltinAttributes& builtin_att = mp_builtin->m_att; + + auto endpoints = new fastdds::rtps::SimplePDPEndpoints(); + builtin_endpoints_.reset(endpoints); //SPDP BUILTIN RTPSParticipant READER HistoryAttributes hatt; - hatt.payloadMaxSize = mp_builtin->m_att.readerPayloadSize; - hatt.memoryPolicy = mp_builtin->m_att.readerHistoryMemoryPolicy; + hatt.payloadMaxSize = builtin_att.readerPayloadSize; + hatt.memoryPolicy = builtin_att.readerHistoryMemoryPolicy; hatt.initialReservedCaches = 25; if (allocation.participants.initial > 0) { @@ -263,10 +269,11 @@ bool PDPSimple::createPDPEndpoints() } PoolConfig reader_pool_cfg = PoolConfig::from_history_attributes(hatt); - reader_payload_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipant", reader_pool_cfg); - reader_payload_pool_->reserve_history(reader_pool_cfg, true); + endpoints->reader.payload_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipant", reader_pool_cfg); + endpoints->reader.payload_pool_->reserve_history(reader_pool_cfg, true); + + endpoints->reader.history_.reset(new ReaderHistory(hatt)); - mp_PDPReaderHistory = new ReaderHistory(hatt); ReaderAttributes ratt; ratt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList; ratt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList; @@ -277,22 +284,22 @@ bool PDPSimple::createPDPEndpoints() ratt.endpoint.reliabilityKind = BEST_EFFORT; ratt.matched_writers_allocation = allocation.participants; mp_listener = new PDPListener(this); - if (mp_RTPSParticipant->createReader(&mp_PDPReader, ratt, reader_payload_pool_, mp_PDPReaderHistory, mp_listener, - c_EntityId_SPDPReader, true, false)) + RTPSReader* reader = nullptr; + if (mp_RTPSParticipant->createReader(&reader, ratt, + endpoints->reader.payload_pool_, endpoints->reader.history_.get(), + mp_listener, c_EntityId_SPDPReader, true, false)) { + endpoints->reader.reader_ = dynamic_cast(reader); #if HAVE_SECURITY - mp_RTPSParticipant->set_endpoint_rtps_protection_supports(mp_PDPReader, false); + mp_RTPSParticipant->set_endpoint_rtps_protection_supports(reader, false); #endif // if HAVE_SECURITY } else { EPROSIMA_LOG_ERROR(RTPS_PDP, "SimplePDP Reader creation failed"); - delete mp_PDPReaderHistory; - mp_PDPReaderHistory = nullptr; delete mp_listener; mp_listener = nullptr; - reader_payload_pool_->release_history(reader_pool_cfg, true); - reader_payload_pool_.reset(); + endpoints->reader.release(); return false; } @@ -303,10 +310,10 @@ bool PDPSimple::createPDPEndpoints() hatt.memoryPolicy = mp_builtin->m_att.writerHistoryMemoryPolicy; PoolConfig writer_pool_cfg = PoolConfig::from_history_attributes(hatt); - writer_payload_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipant", writer_pool_cfg); - writer_payload_pool_->reserve_history(writer_pool_cfg, false); + endpoints->writer.payload_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipant", writer_pool_cfg); + endpoints->writer.payload_pool_->reserve_history(writer_pool_cfg, false); - mp_PDPWriterHistory = new WriterHistory(hatt); + endpoints->writer.history_.reset(new WriterHistory(hatt)); WriterAttributes watt; watt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators; watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; @@ -322,15 +329,16 @@ bool PDPSimple::createPDPEndpoints() watt.mode = ASYNCHRONOUS_WRITER; } - RTPSWriter* wout; - if (mp_RTPSParticipant->createWriter(&wout, watt, writer_payload_pool_, mp_PDPWriterHistory, nullptr, + RTPSWriter* wout = nullptr; + if (mp_RTPSParticipant->createWriter(&wout, watt, endpoints->writer.payload_pool_, endpoints->writer.history_.get(), + nullptr, c_EntityId_SPDPWriter, true)) { + endpoints->writer.writer_ = dynamic_cast(wout); #if HAVE_SECURITY mp_RTPSParticipant->set_endpoint_rtps_protection_supports(wout, false); #endif // if HAVE_SECURITY - mp_PDPWriter = wout; - if (mp_PDPWriter != nullptr) + if (endpoints->writer.writer_ != nullptr) { const NetworkFactory& network = mp_RTPSParticipant->network_factory(); LocatorList_t fixed_locators; @@ -342,16 +350,13 @@ bool PDPSimple::createPDPEndpoints() fixed_locators.push_back(local_locator); } } - dynamic_cast(wout)->set_fixed_locators(fixed_locators); + endpoints->writer.writer_->set_fixed_locators(fixed_locators); } } else { EPROSIMA_LOG_ERROR(RTPS_PDP, "SimplePDP Writer creation failed"); - delete mp_PDPWriterHistory; - mp_PDPWriterHistory = nullptr; - writer_payload_pool_->release_history(writer_pool_cfg, false); - writer_payload_pool_.reset(); + endpoints->writer.release(); return false; } EPROSIMA_LOG_INFO(RTPS_PDP, "SPDP Endpoints creation finished"); @@ -363,6 +368,8 @@ void PDPSimple::assignRemoteEndpoints( { EPROSIMA_LOG_INFO(RTPS_PDP, "For RTPSParticipant: " << pdata->m_guid.guidPrefix); + auto endpoints = static_cast(builtin_endpoints_.get()); + const NetworkFactory& network = mp_RTPSParticipant->network_factory(); uint32_t endp = pdata->m_availableBuiltinEndpoints; uint32_t auxendp = endp; @@ -381,7 +388,7 @@ void PDPSimple::assignRemoteEndpoints( temp_writer_data->set_remote_locators(pdata->metatraffic_locators, network, use_multicast_locators); temp_writer_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; temp_writer_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; - mp_PDPReader->matched_writer_add(*temp_writer_data); + endpoints->reader.reader_->matched_writer_add(*temp_writer_data); } auxendp = endp; auxendp &= DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; @@ -396,9 +403,9 @@ void PDPSimple::assignRemoteEndpoints( temp_reader_data->set_remote_locators(pdata->metatraffic_locators, network, use_multicast_locators); temp_reader_data->m_qos.m_reliability.kind = BEST_EFFORT_RELIABILITY_QOS; temp_reader_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; - mp_PDPWriter->matched_reader_add(*temp_reader_data); + endpoints->writer.writer_->matched_reader_add(*temp_reader_data); - StatelessWriter* pW = dynamic_cast(mp_PDPWriter); + StatelessWriter* pW = endpoints->writer.writer_; if (pW != nullptr) { @@ -423,20 +430,23 @@ void PDPSimple::removeRemoteEndpoints( ParticipantProxyData* pdata) { EPROSIMA_LOG_INFO(RTPS_PDP, "For RTPSParticipant: " << pdata->m_guid); + + auto endpoints = static_cast(builtin_endpoints_.get()); + uint32_t endp = pdata->m_availableBuiltinEndpoints; uint32_t auxendp = endp; auxendp &= DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER; if (auxendp != 0) { GUID_t writer_guid(pdata->m_guid.guidPrefix, c_EntityId_SPDPWriter); - mp_PDPReader->matched_writer_remove(writer_guid); + endpoints->reader.reader_->matched_writer_remove(writer_guid); } auxendp = endp; auxendp &= DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; if (auxendp != 0) { GUID_t reader_guid(pdata->m_guid.guidPrefix, c_EntityId_SPDPReader); - mp_PDPWriter->matched_reader_remove(reader_guid); + endpoints->writer.writer_->matched_reader_remove(reader_guid); } } diff --git a/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp b/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp new file mode 100644 index 00000000000..ba09cfe566a --- /dev/null +++ b/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp @@ -0,0 +1,91 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SimplePDPEndpoints.hpp + */ + +#ifndef FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__SIMPLEPDPENDPOINTS_HPP_ +#define FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__SIMPLEPDPENDPOINTS_HPP_ + +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/** + * Container for the builtin endpoints of PDPSimple + */ +struct SimplePDPEndpoints : public PDPEndpoints +{ + ~SimplePDPEndpoints() override = default; + + fastrtps::rtps::BuiltinEndpointSet_t builtin_endpoints() const override + { + return DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER | DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; + } + + bool enable_pdp_readers( + fastrtps::rtps::RTPSParticipantImpl* participant) override + { + return participant->enableReader(reader.reader_); + } + + void disable_pdp_readers( + fastrtps::rtps::RTPSParticipantImpl* participant) override + { + participant->disableReader(reader.reader_); + } + + void delete_pdp_endpoints( + fastrtps::rtps::RTPSParticipantImpl* participant) override + { + participant->deleteUserEndpoint(writer.writer_->getGuid()); + participant->deleteUserEndpoint(reader.reader_->getGuid()); + } + + void remove_from_pdp_reader_history( + const fastrtps::rtps::InstanceHandle_t& remote_participant) override + { + reader.remove_from_history(remote_participant); + } + + void remove_from_pdp_reader_history( + fastrtps::rtps::CacheChange_t* change) override + { + reader.history_->remove_change(change); + } + + //! Builtin Simple PDP reader + BuiltinReader reader; + + //! Builtin Simple PDP writer + BuiltinWriter writer; +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__SIMPLEPDPENDPOINTS_HPP_