Skip to content

Commit

Permalink
Fix issues in Dynamic Network Interfaces (#5282) (#5304) (#5390)
Browse files Browse the repository at this point in the history
* Fix issues in Dynamic Network Interfaces (#5282)

* Refs #21690. Parse `--rescan` argument on communication applications.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Implement rescan mechanism.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add docker infrastructure.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add CMake infrastructure.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Ensure same domain and topic name are used.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add `--loops` argument to publisher.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Publisher exits after publishing all samples.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Improve subscriber script.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add test.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Make publisher wait subscriber.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Possible fix.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Clear locators before recalculating them.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Move local participant proxy update to PDP.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Improve new method's logic.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Include what you use.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add empty method to update endpoint locators.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Add implementation for `update_endpoint_locators_if_default_nts`.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Compare against old default locators.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21690. Update locators in attributes.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #17283. Avoid early return on `PDP::local_participant_attributes_update_nts`.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #17283. Apply suggestions.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 91bd7c8)

* Refs #17283: Fix conflicts

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Refs #17283: Keep original PR functionality

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Refs #17283: Please uncrustify

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Restore discovery server list check code.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Make backported methods static methods instead of inline code.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Methods get_default_xxx_locators with output argument.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Only update attributes when necessary.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>
Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
Co-authored-by: eProsima <jesuspoderoso@eprosima.com>
(cherry picked from commit 66ec998)
Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

# Conflicts:
#	src/cpp/rtps/participant/RTPSParticipantImpl.cpp
#	test/communication/PublisherMain.cpp
#	test/communication/SubscriberMain.cpp
#	test/dds/communication/SubscriberMain.cpp
#	test/dds/communication/SubscriberModule.hpp
#	test/dds/communication/security/PublisherModule.cpp
#	test/dds/communication/security/PublisherModule.hpp
#	test/dds/communication/security/SubscriberModule.cpp
#	test/dds/communication/security/SubscriberModule.hpp

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
mergify[bot] authored Nov 8, 2024
1 parent 67ce284 commit ad9b4eb
Show file tree
Hide file tree
Showing 15 changed files with 493 additions and 93 deletions.
5 changes: 5 additions & 0 deletions include/fastdds/rtps/builtin/discovery/participant/PDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
Expand Down Expand Up @@ -61,6 +64,7 @@ class RTPSWriter;
class RTPSReader;
class WriterHistory;
class ReaderHistory;
struct RTPSParticipantAllocationAttributes;
class RTPSParticipantImpl;
class RTPSParticipantListener;
class BuiltinProtocols;
Expand All @@ -70,6 +74,7 @@ class ReaderProxyData;
class WriterProxyData;
class ParticipantProxyData;
class ReaderListener;
class PDPEndpoints;
class PDPListener;
class PDPServerListener;
class ITopicPayloadPool;
Expand Down
265 changes: 202 additions & 63 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,17 @@ class RTPSParticipantImpl
*/
void get_default_metatraffic_locators();

void get_default_metatraffic_locators(
RTPSParticipantAttributes& att);

/**
* Get default unicast locators when not provided by the user.
*/
void get_default_unicast_locators();

void get_default_unicast_locators(
RTPSParticipantAttributes& att);

bool match_local_endpoints_ = true;

bool should_match_local_endpoints(
Expand Down
4 changes: 4 additions & 0 deletions test/dds/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,7 @@ if(Python3_Interpreter_FOUND)
endif()
endforeach()
endif()

if(UNIX AND NOT(APPLE) AND NOT(QNXNTO) AND NOT(ANDROID))
add_subdirectory(dyn_network)
endif()
6 changes: 3 additions & 3 deletions test/dds/communication/PubSubMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void publisher_run(
publisher->wait_discovery(wait);
}

publisher->run(samples, loops, interval);
publisher->run(samples, 0, loops, interval);
}

int main(
Expand Down Expand Up @@ -197,7 +197,7 @@ int main(
DomainParticipantFactory::get_instance()->load_XML_profiles_file(xml_file);
}

SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy);
SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy, false);
PublisherModule publisher(exit_on_lost_liveliness, fixed_type, zero_copy);

uint32_t result = 1;
Expand All @@ -208,7 +208,7 @@ int main(

if (subscriber.init(seed, magic))
{
result = subscriber.run(notexit, timeout) ? 0 : -1;
result = subscriber.run(notexit, 0, timeout) ? 0 : -1;
}

publisher_thread.join();
Expand Down
28 changes: 26 additions & 2 deletions test/dds/communication/PublisherMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ using namespace eprosima::fastdds::dds;
* --seed <int>
* --wait <int>
* --samples <int>
* --loops <int>
* --interval <int>
* --magic <str>
* --xmlfile <path>
* --interval <int>
* --rescan <int>
*/

int main(
Expand All @@ -46,7 +48,9 @@ int main(
uint32_t wait = 0;
char* xml_file = nullptr;
uint32_t samples = 4;
uint32_t loops = 0;
uint32_t interval = 250;
uint32_t rescan_interval_seconds = 0;
std::string magic;

while (arg_count < argc)
Expand Down Expand Up @@ -93,6 +97,16 @@ int main(

samples = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--loops") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--loops expects a parameter" << std::endl;
return -1;
}

loops = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--interval") == 0)
{
if (++arg_count >= argc)
Expand Down Expand Up @@ -123,6 +137,16 @@ int main(

xml_file = argv[arg_count];
}
else if (strcmp(argv[arg_count], "--rescan") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--rescan expects a parameter" << std::endl;
return -1;
}

rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10);
}
else
{
std::cout << "Wrong argument " << argv[arg_count] << std::endl;
Expand All @@ -146,7 +170,7 @@ int main(
publisher.wait_discovery(wait);
}

publisher.run(samples, 0, interval);
publisher.run(samples, rescan_interval_seconds, loops, interval);
return 0;
}

Expand Down
20 changes: 20 additions & 0 deletions test/dds/communication/PublisherModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,30 @@ void PublisherModule::wait_discovery(

void PublisherModule::run(
uint32_t samples,
const uint32_t rescan_interval,
const uint32_t loops,
uint32_t interval)
{
uint32_t current_loop = 0;
uint16_t index = 1;
void* sample = nullptr;

std::thread net_rescan_thread([this, rescan_interval]()
{
if (rescan_interval > 0)
{
auto interval = std::chrono::seconds(rescan_interval);
while (run_)
{
std::this_thread::sleep_for(interval);
if (run_)
{
participant_->set_qos(participant_->get_qos());
}
}
}
});

while (run_ && (loops == 0 || loops > current_loop))
{
if (zero_copy_)
Expand Down Expand Up @@ -184,6 +201,9 @@ void PublisherModule::run(

std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}

run_ = false;
net_rescan_thread.join();
}

void PublisherModule::on_publication_matched(
Expand Down
19 changes: 11 additions & 8 deletions test/dds/communication/PublisherModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
#ifndef TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP
#define TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantListener.hpp>
#include <fastdds/dds/publisher/PublisherListener.hpp>
Expand All @@ -27,9 +32,6 @@
#include "types/FixedSizedPubSubTypes.h"
#include "types/HelloWorldPubSubTypes.h"

#include <mutex>
#include <condition_variable>

namespace eprosima {
namespace fastdds {
namespace dds {
Expand All @@ -41,8 +43,8 @@ class PublisherModule

PublisherModule(
bool exit_on_lost_liveliness,
bool fixed_type = false,
bool zero_copy = false)
bool fixed_type,
bool zero_copy)
: exit_on_lost_liveliness_(exit_on_lost_liveliness)
, fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required
, zero_copy_(zero_copy)
Expand Down Expand Up @@ -80,8 +82,9 @@ class PublisherModule

void run(
uint32_t samples,
uint32_t loops = 0,
uint32_t interval = 250);
const uint32_t rescan_interval,
uint32_t loops,
uint32_t interval);

private:

Expand All @@ -93,7 +96,7 @@ class PublisherModule
bool exit_on_lost_liveliness_ = false;
bool fixed_type_ = false;
bool zero_copy_ = false;
bool run_ = true;
std::atomic_bool run_{true};
DomainParticipant* participant_ = nullptr;
TypeSupport type_;
Publisher* publisher_ = nullptr;
Expand Down
18 changes: 15 additions & 3 deletions test/dds/communication/SubscriberMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ using namespace eprosima::fastdds::dds;
* --notexit
* --fixed_type
* --zero_copy
* --succeed_on_timeout
* --seed <int>
* --samples <int>
* --magic <str>
* --timeout <int>
* --xmlfile <path>
* --publishers <int>
* --succeed_on_timeout
* --timeout <int>
* --rescan <int>
*/

int main(
Expand All @@ -48,6 +49,7 @@ int main(
uint32_t samples = 4;
uint32_t publishers = 1;
uint32_t timeout = 86400000; // 24 h in ms
uint32_t rescan_interval_seconds = 0;
char* xml_file = nullptr;
std::string magic;

Expand Down Expand Up @@ -129,6 +131,16 @@ int main(

publishers = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--rescan") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--rescan expects a parameter" << std::endl;
return -1;
}

rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10);
}
else
{
std::cout << "Wrong argument " << argv[arg_count] << std::endl;
Expand All @@ -147,7 +159,7 @@ int main(

if (subscriber.init(seed, magic))
{
return subscriber.run(notexit, timeout) ? 0 : -1;
return subscriber.run(notexit, rescan_interval_seconds, timeout) ? 0 : -1;
}

return -1;
Expand Down
25 changes: 23 additions & 2 deletions test/dds/communication/SubscriberModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,35 @@ bool SubscriberModule::init(

bool SubscriberModule::run(
bool notexit,
const uint32_t rescan_interval,
uint32_t timeout)
{
return run_for(notexit, std::chrono::milliseconds(timeout));
return run_for(notexit, rescan_interval, std::chrono::milliseconds(timeout));
}

bool SubscriberModule::run_for(
bool notexit,
const uint32_t rescan_interval,
const std::chrono::milliseconds& timeout)
{
bool returned_value = false;

std::thread net_rescan_thread([this, rescan_interval]()
{
if (rescan_interval > 0)
{
auto interval = std::chrono::seconds(rescan_interval);
while (run_)
{
std::this_thread::sleep_for(interval);
if (run_)
{
participant_->set_qos(participant_->get_qos());
}
}
}
});

while (notexit && run_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(250));
Expand All @@ -152,7 +170,7 @@ bool SubscriberModule::run_for(
std::unique_lock<std::mutex> lock(mutex_);
returned_value = cv_.wait_for(lock, timeout, [&]
{
if (succeeed_on_timeout_ && (std::chrono::steady_clock::now() - t0) > timeout)
if (succeed_on_timeout_ && (std::chrono::steady_clock::now() - t0) > timeout)
{
return true;
}
Expand Down Expand Up @@ -190,6 +208,9 @@ bool SubscriberModule::run_for(
returned_value = false;
}

run_ = false;
net_rescan_thread.join();

return returned_value;
}

Expand Down
Loading

0 comments on commit ad9b4eb

Please sign in to comment.