Skip to content

Commit

Permalink
Fix issues in Dynamic Network Interfaces (#5282)
Browse files Browse the repository at this point in the history
* Refs #21690. Parse `--rescan` argument on communication applications.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Implement rescan mechanism.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add docker infrastructure.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add CMake infrastructure.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Ensure same domain and topic name are used.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add `--loops` argument to publisher.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Publisher exits after publishing all samples.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Improve subscriber script.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add test.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Make publisher wait subscriber.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Possible fix.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Clear locators before recalculating them.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Move local participant proxy update to PDP.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Improve new method's logic.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Include what you use.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add empty method to update endpoint locators.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add implementation for `update_endpoint_locators_if_default_nts`.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Compare against old default locators.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Update locators in attributes.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #17283. Avoid early return on `PDP::local_participant_attributes_update_nts`.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #17283. Apply suggestions.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
MiguelCompany authored Oct 4, 2024
1 parent 6bb7e9a commit 91bd7c8
Show file tree
Hide file tree
Showing 21 changed files with 513 additions and 69 deletions.
117 changes: 117 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
#endif // if HAVE_SECURITY
#include <utils/shared_mutex.hpp>
#include <utils/TimeConversion.hpp>
#include <rtps/writer/BaseWriter.hpp>
#include <rtps/reader/BaseReader.hpp>

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -1698,6 +1700,121 @@ void PDP::add_builtin_security_attributes(

#endif // HAVE_SECURITY

void PDP::local_participant_attributes_update_nts(
const RTPSParticipantAttributes& new_atts)
{
// Update user data
auto participant_data = getLocalParticipantProxyData();
participant_data->m_userData.data_vec(new_atts.userData);

// If we are intraprocess only, we do not need to update locators
bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only();
if (announce_locators)
{
// Clear all locators
participant_data->metatraffic_locators.unicast.clear();
participant_data->metatraffic_locators.multicast.clear();
participant_data->default_locators.unicast.clear();
participant_data->default_locators.multicast.clear();

// Update default locators
for (const Locator_t& loc : new_atts.defaultUnicastLocatorList)
{
participant_data->default_locators.add_unicast_locator(loc);
}
for (const Locator_t& loc : new_atts.defaultMulticastLocatorList)
{
participant_data->default_locators.add_multicast_locator(loc);
}

// Update metatraffic locators
for (const auto& locator : new_atts.builtin.metatrafficUnicastLocatorList)
{
participant_data->metatraffic_locators.add_unicast_locator(locator);
}
if (!new_atts.builtin.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty())
{
for (const auto& locator : new_atts.builtin.metatrafficMulticastLocatorList)
{
participant_data->metatraffic_locators.add_multicast_locator(locator);
}
}

fastdds::rtps::network::external_locators::add_external_locators(*participant_data,
new_atts.builtin.metatraffic_external_unicast_locators,
new_atts.default_external_unicast_locators);
}
}

void PDP::update_endpoint_locators_if_default_nts(
const std::vector<BaseWriter*>& writers,
const std::vector<BaseReader*>& readers,
const RTPSParticipantAttributes& old_atts,
const RTPSParticipantAttributes& new_atts)
{
// Check if default locators have changed
const auto& old_default_unicast = old_atts.defaultUnicastLocatorList;
const auto& old_default_multicast = old_atts.defaultMulticastLocatorList;
const auto& new_default_unicast = new_atts.defaultUnicastLocatorList;
const auto& new_default_multicast = new_atts.defaultMulticastLocatorList;

// Early return if there is no change in default unicast locators
if ((old_default_unicast == new_default_unicast) &&
(old_default_multicast == new_default_multicast))
{
return;
}

// Update proxies of endpoints with default configured locators
EDP* edp = get_edp();
for (BaseWriter* writer : writers)
{
if ((old_default_multicast == writer->getAttributes().multicastLocatorList) &&
(old_default_unicast == writer->getAttributes().unicastLocatorList))
{
writer->getAttributes().multicastLocatorList = new_default_multicast;
writer->getAttributes().unicastLocatorList = new_default_unicast;

WriterProxyData* wdata = nullptr;
GUID_t participant_guid;
wdata = addWriterProxyData(writer->getGuid(), participant_guid,
[](WriterProxyData* proxy, bool is_update, const ParticipantProxyData& participant)
{
static_cast<void>(is_update);
assert(is_update);

proxy->set_locators(participant.default_locators);
return true;
});
assert(wdata != nullptr);
edp->process_writer_proxy_data(writer, wdata);
}
}
for (BaseReader* reader : readers)
{
if ((old_default_multicast == reader->getAttributes().multicastLocatorList) &&
(old_default_unicast == reader->getAttributes().unicastLocatorList))
{
reader->getAttributes().multicastLocatorList = new_default_multicast;
reader->getAttributes().unicastLocatorList = new_default_unicast;

ReaderProxyData* rdata = nullptr;
GUID_t participant_guid;
rdata = addReaderProxyData(reader->getGuid(), participant_guid,
[](ReaderProxyData* proxy, bool is_update, const ParticipantProxyData& participant)
{
static_cast<void>(is_update);
assert(is_update);

proxy->set_locators(participant.default_locators);
return true;
});
assert(rdata != nullptr);
edp->process_reader_proxy_data(reader, rdata);
}
}
}

} /* namespace rtps */
} /* namespace fastdds */
} /* namespace eprosima */
38 changes: 27 additions & 11 deletions src/cpp/rtps/builtin/discovery/participant/PDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,27 @@
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include <fastcdr/cdr/fixed_size_string.hpp>

#include <fastdds/dds/core/Time_t.hpp>
#include <fastdds/dds/core/policy/ParameterTypes.hpp>
#include <fastdds/dds/core/policy/QosPolicies.hpp>
#include <fastdds/rtps/attributes/ReaderAttributes.hpp>
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.hpp>
#include <fastdds/rtps/attributes/WriterAttributes.hpp>
#include <fastdds/rtps/common/CDRMessage_t.hpp>
#include <fastdds/rtps/common/Guid.hpp>
#include <fastdds/rtps/common/GuidPrefix_t.hpp>
#include <fastdds/rtps/common/InstanceHandle.hpp>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/common/Types.hpp>
#include <fastdds/rtps/common/WriteParams.hpp>
#include <fastdds/rtps/history/IPayloadPool.hpp>
#include <fastdds/rtps/participant/ParticipantDiscoveryInfo.hpp>
Expand Down Expand Up @@ -65,19 +79,11 @@ class TypeIdentifier;

namespace rtps {

class PDPServerListener;
class PDPEndpoints;

} // namespace rtps
} // namespace fastdds

namespace fastdds {
namespace rtps {

class RTPSWriter;
class RTPSReader;
class BaseWriter;
class BaseReader;
class WriterHistory;
class ReaderHistory;
struct RTPSParticipantAllocationAttributes;
class RTPSParticipantImpl;
class RTPSParticipantListener;
class BuiltinProtocols;
Expand All @@ -87,6 +93,7 @@ class ReaderProxyData;
class WriterProxyData;
class ParticipantProxyData;
class ReaderListener;
class PDPEndpoints;
class PDPListener;
class PDPServerListener;
class ITopicPayloadPool;
Expand Down Expand Up @@ -484,6 +491,15 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable

#endif // FASTDDS_STATISTICS

virtual void local_participant_attributes_update_nts(
const RTPSParticipantAttributes& new_atts);

virtual void update_endpoint_locators_if_default_nts(
const std::vector<BaseWriter*>& writers,
const std::vector<BaseReader*>& readers,
const RTPSParticipantAttributes& old_atts,
const RTPSParticipantAttributes& new_atts);

protected:

//!Pointer to the builtin protocols object.
Expand Down
23 changes: 6 additions & 17 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,7 @@ void RTPSParticipantImpl::update_attributes(
if (internal_metatraffic_locators_)
{
LocatorList_t metatraffic_unicast_locator_list = temp_atts.builtin.metatrafficUnicastLocatorList;
temp_atts.builtin.metatrafficUnicastLocatorList.clear();
get_default_metatraffic_locators(temp_atts);
if (!(metatraffic_unicast_locator_list == temp_atts.builtin.metatrafficUnicastLocatorList))
{
Expand All @@ -1455,6 +1456,7 @@ void RTPSParticipantImpl::update_attributes(
if (internal_default_locators_)
{
LocatorList_t default_unicast_locator_list = temp_atts.defaultUnicastLocatorList;
temp_atts.defaultUnicastLocatorList.clear();
get_default_unicast_locators(temp_atts);
if (!(default_unicast_locator_list == temp_atts.defaultUnicastLocatorList))
{
Expand Down Expand Up @@ -1529,25 +1531,12 @@ void RTPSParticipantImpl::update_attributes(

{
std::lock_guard<std::recursive_mutex> lock(*pdp->getMutex());
pdp->local_participant_attributes_update_nts(temp_atts);

// Update user data
auto local_participant_proxy_data = pdp->getLocalParticipantProxyData();
local_participant_proxy_data->m_userData.data_vec(temp_atts.userData);

// Update metatraffic locators
for (auto locator : temp_atts.builtin.metatrafficMulticastLocatorList)
{
local_participant_proxy_data->metatraffic_locators.add_multicast_locator(locator);
}
for (auto locator : temp_atts.builtin.metatrafficUnicastLocatorList)
{
local_participant_proxy_data->metatraffic_locators.add_unicast_locator(locator);
}

// Update default locators
for (auto locator : temp_atts.defaultUnicastLocatorList)
if (local_interfaces_changed && internal_default_locators_)
{
local_participant_proxy_data->default_locators.add_unicast_locator(locator);
std::lock_guard<shared_mutex> _(endpoints_list_mutex);
pdp->update_endpoint_locators_if_default_nts(m_userWriterList, m_userReaderList, m_att, temp_atts);
}

if (local_interfaces_changed)
Expand Down
4 changes: 4 additions & 0 deletions test/dds/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,7 @@ if(Python3_Interpreter_FOUND)
endif()

endif()

if(UNIX AND NOT(APPLE) AND NOT(QNXNTO) AND NOT(ANDROID))
add_subdirectory(dyn_network)
endif()
6 changes: 3 additions & 3 deletions test/dds/communication/PubSubMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void publisher_run(
publisher->wait_discovery(wait);
}

publisher->run(samples, loops, interval);
publisher->run(samples, 0, loops, interval);
}

int main(
Expand Down Expand Up @@ -196,7 +196,7 @@ int main(
DomainParticipantFactory::get_instance()->load_XML_profiles_file(xml_file);
}

SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy);
SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy, false, false);
PublisherModule publisher(exit_on_lost_liveliness, fixed_type, zero_copy);

uint32_t result = 1;
Expand All @@ -207,7 +207,7 @@ int main(

if (subscriber.init(seed, magic))
{
result = subscriber.run(notexit, timeout) ? 0 : -1;
result = subscriber.run(notexit, 0, timeout) ? 0 : -1;
}

publisher_thread.join();
Expand Down
28 changes: 26 additions & 2 deletions test/dds/communication/PublisherMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ using namespace eprosima::fastdds::dds;
* --seed <int>
* --wait <int>
* --samples <int>
* --loops <int>
* --interval <int>
* --magic <str>
* --xmlfile <path>
* --interval <int>
* --rescan <int>
*/

int main(
Expand All @@ -46,7 +48,9 @@ int main(
uint32_t wait = 0;
char* xml_file = nullptr;
uint32_t samples = 4;
uint32_t loops = 0;
uint32_t interval = 250;
uint32_t rescan_interval_seconds = 0;
std::string magic;

while (arg_count < argc)
Expand Down Expand Up @@ -93,6 +97,16 @@ int main(

samples = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--loops") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--loops expects a parameter" << std::endl;
return -1;
}

loops = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--interval") == 0)
{
if (++arg_count >= argc)
Expand Down Expand Up @@ -123,6 +137,16 @@ int main(

xml_file = argv[arg_count];
}
else if (strcmp(argv[arg_count], "--rescan") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--rescan expects a parameter" << std::endl;
return -1;
}

rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10);
}
else
{
std::cout << "Wrong argument " << argv[arg_count] << std::endl;
Expand All @@ -146,7 +170,7 @@ int main(
publisher.wait_discovery(wait);
}

publisher.run(samples, 0, interval);
publisher.run(samples, rescan_interval_seconds, loops, interval);
return 0;
}

Expand Down
20 changes: 20 additions & 0 deletions test/dds/communication/PublisherModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,30 @@ void PublisherModule::wait_discovery(

void PublisherModule::run(
uint32_t samples,
const uint32_t rescan_interval,
const uint32_t loops,
uint32_t interval)
{
uint32_t current_loop = 0;
uint16_t index = 1;
void* sample = nullptr;

std::thread net_rescan_thread([this, rescan_interval]()
{
if (rescan_interval > 0)
{
auto interval = std::chrono::seconds(rescan_interval);
while (run_)
{
std::this_thread::sleep_for(interval);
if (run_)
{
participant_->set_qos(participant_->get_qos());
}
}
}
});

while (run_ && (loops == 0 || loops > current_loop))
{
if (zero_copy_)
Expand Down Expand Up @@ -187,6 +204,9 @@ void PublisherModule::run(

std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}

run_ = false;
net_rescan_thread.join();
}

void PublisherModule::on_publication_matched(
Expand Down
Loading

0 comments on commit 91bd7c8

Please sign in to comment.