diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp index 3bf1b376783..60c6573b4b6 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp @@ -62,6 +62,8 @@ #endif // if HAVE_SECURITY #include #include +#include +#include namespace eprosima { namespace fastdds { @@ -1698,6 +1700,121 @@ void PDP::add_builtin_security_attributes( #endif // HAVE_SECURITY +void PDP::local_participant_attributes_update_nts( + const RTPSParticipantAttributes& new_atts) +{ + // Update user data + auto participant_data = getLocalParticipantProxyData(); + participant_data->m_userData.data_vec(new_atts.userData); + + // If we are intraprocess only, we do not need to update locators + bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only(); + if (announce_locators) + { + // Clear all locators + participant_data->metatraffic_locators.unicast.clear(); + participant_data->metatraffic_locators.multicast.clear(); + participant_data->default_locators.unicast.clear(); + participant_data->default_locators.multicast.clear(); + + // Update default locators + for (const Locator_t& loc : new_atts.defaultUnicastLocatorList) + { + participant_data->default_locators.add_unicast_locator(loc); + } + for (const Locator_t& loc : new_atts.defaultMulticastLocatorList) + { + participant_data->default_locators.add_multicast_locator(loc); + } + + // Update metatraffic locators + for (const auto& locator : new_atts.builtin.metatrafficUnicastLocatorList) + { + participant_data->metatraffic_locators.add_unicast_locator(locator); + } + if (!new_atts.builtin.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty()) + { + for (const auto& locator : new_atts.builtin.metatrafficMulticastLocatorList) + { + participant_data->metatraffic_locators.add_multicast_locator(locator); + } + } + + fastdds::rtps::network::external_locators::add_external_locators(*participant_data, + new_atts.builtin.metatraffic_external_unicast_locators, + new_atts.default_external_unicast_locators); + } +} + +void PDP::update_endpoint_locators_if_default_nts( + const std::vector& writers, + const std::vector& readers, + const RTPSParticipantAttributes& old_atts, + const RTPSParticipantAttributes& new_atts) +{ + // Check if default locators have changed + const auto& old_default_unicast = old_atts.defaultUnicastLocatorList; + const auto& old_default_multicast = old_atts.defaultMulticastLocatorList; + const auto& new_default_unicast = new_atts.defaultUnicastLocatorList; + const auto& new_default_multicast = new_atts.defaultMulticastLocatorList; + + // Early return if there is no change in default unicast locators + if ((old_default_unicast == new_default_unicast) && + (old_default_multicast == new_default_multicast)) + { + return; + } + + // Update proxies of endpoints with default configured locators + EDP* edp = get_edp(); + for (BaseWriter* writer : writers) + { + if ((old_default_multicast == writer->getAttributes().multicastLocatorList) && + (old_default_unicast == writer->getAttributes().unicastLocatorList)) + { + writer->getAttributes().multicastLocatorList = new_default_multicast; + writer->getAttributes().unicastLocatorList = new_default_unicast; + + WriterProxyData* wdata = nullptr; + GUID_t participant_guid; + wdata = addWriterProxyData(writer->getGuid(), participant_guid, + [](WriterProxyData* proxy, bool is_update, const ParticipantProxyData& participant) + { + static_cast(is_update); + assert(is_update); + + proxy->set_locators(participant.default_locators); + return true; + }); + assert(wdata != nullptr); + edp->process_writer_proxy_data(writer, wdata); + } + } + for (BaseReader* reader : readers) + { + if ((old_default_multicast == reader->getAttributes().multicastLocatorList) && + (old_default_unicast == reader->getAttributes().unicastLocatorList)) + { + reader->getAttributes().multicastLocatorList = new_default_multicast; + reader->getAttributes().unicastLocatorList = new_default_unicast; + + ReaderProxyData* rdata = nullptr; + GUID_t participant_guid; + rdata = addReaderProxyData(reader->getGuid(), participant_guid, + [](ReaderProxyData* proxy, bool is_update, const ParticipantProxyData& participant) + { + static_cast(is_update); + assert(is_update); + + proxy->set_locators(participant.default_locators); + return true; + }); + assert(rdata != nullptr); + edp->process_reader_proxy_data(reader, rdata); + } + } +} + } /* namespace rtps */ } /* namespace fastdds */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.h b/src/cpp/rtps/builtin/discovery/participant/PDP.h index 884da0daef9..9d94a874a7a 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.h +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.h @@ -22,13 +22,27 @@ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #include +#include #include #include #include +#include +#include +#include + +#include +#include #include +#include #include +#include +#include #include +#include +#include +#include +#include #include #include #include @@ -65,19 +79,11 @@ class TypeIdentifier; namespace rtps { -class PDPServerListener; -class PDPEndpoints; - -} // namespace rtps -} // namespace fastdds - -namespace fastdds { -namespace rtps { - -class RTPSWriter; -class RTPSReader; +class BaseWriter; +class BaseReader; class WriterHistory; class ReaderHistory; +struct RTPSParticipantAllocationAttributes; class RTPSParticipantImpl; class RTPSParticipantListener; class BuiltinProtocols; @@ -87,6 +93,7 @@ class ReaderProxyData; class WriterProxyData; class ParticipantProxyData; class ReaderListener; +class PDPEndpoints; class PDPListener; class PDPServerListener; class ITopicPayloadPool; @@ -484,6 +491,15 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable #endif // FASTDDS_STATISTICS + virtual void local_participant_attributes_update_nts( + const RTPSParticipantAttributes& new_atts); + + virtual void update_endpoint_locators_if_default_nts( + const std::vector& writers, + const std::vector& readers, + const RTPSParticipantAttributes& old_atts, + const RTPSParticipantAttributes& new_atts); + protected: //!Pointer to the builtin protocols object. diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 47e6ad5ffe1..09cd2105c67 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -1445,6 +1445,7 @@ void RTPSParticipantImpl::update_attributes( if (internal_metatraffic_locators_) { LocatorList_t metatraffic_unicast_locator_list = temp_atts.builtin.metatrafficUnicastLocatorList; + temp_atts.builtin.metatrafficUnicastLocatorList.clear(); get_default_metatraffic_locators(temp_atts); if (!(metatraffic_unicast_locator_list == temp_atts.builtin.metatrafficUnicastLocatorList)) { @@ -1455,6 +1456,7 @@ void RTPSParticipantImpl::update_attributes( if (internal_default_locators_) { LocatorList_t default_unicast_locator_list = temp_atts.defaultUnicastLocatorList; + temp_atts.defaultUnicastLocatorList.clear(); get_default_unicast_locators(temp_atts); if (!(default_unicast_locator_list == temp_atts.defaultUnicastLocatorList)) { @@ -1529,25 +1531,12 @@ void RTPSParticipantImpl::update_attributes( { std::lock_guard lock(*pdp->getMutex()); + pdp->local_participant_attributes_update_nts(temp_atts); - // Update user data - auto local_participant_proxy_data = pdp->getLocalParticipantProxyData(); - local_participant_proxy_data->m_userData.data_vec(temp_atts.userData); - - // Update metatraffic locators - for (auto locator : temp_atts.builtin.metatrafficMulticastLocatorList) - { - local_participant_proxy_data->metatraffic_locators.add_multicast_locator(locator); - } - for (auto locator : temp_atts.builtin.metatrafficUnicastLocatorList) - { - local_participant_proxy_data->metatraffic_locators.add_unicast_locator(locator); - } - - // Update default locators - for (auto locator : temp_atts.defaultUnicastLocatorList) + if (local_interfaces_changed && internal_default_locators_) { - local_participant_proxy_data->default_locators.add_unicast_locator(locator); + std::lock_guard _(endpoints_list_mutex); + pdp->update_endpoint_locators_if_default_nts(m_userWriterList, m_userReaderList, m_att, temp_atts); } if (local_interfaces_changed) diff --git a/test/dds/communication/CMakeLists.txt b/test/dds/communication/CMakeLists.txt index ee6ec3d8312..2b9df6996a3 100644 --- a/test/dds/communication/CMakeLists.txt +++ b/test/dds/communication/CMakeLists.txt @@ -315,3 +315,7 @@ if(Python3_Interpreter_FOUND) endif() endif() + +if(UNIX AND NOT(APPLE) AND NOT(QNXNTO) AND NOT(ANDROID)) + add_subdirectory(dyn_network) +endif() diff --git a/test/dds/communication/PubSubMain.cpp b/test/dds/communication/PubSubMain.cpp index e81c2a379c2..7bc40177269 100644 --- a/test/dds/communication/PubSubMain.cpp +++ b/test/dds/communication/PubSubMain.cpp @@ -51,7 +51,7 @@ void publisher_run( publisher->wait_discovery(wait); } - publisher->run(samples, loops, interval); + publisher->run(samples, 0, loops, interval); } int main( @@ -196,7 +196,7 @@ int main( DomainParticipantFactory::get_instance()->load_XML_profiles_file(xml_file); } - SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy); + SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy, false, false); PublisherModule publisher(exit_on_lost_liveliness, fixed_type, zero_copy); uint32_t result = 1; @@ -207,7 +207,7 @@ int main( if (subscriber.init(seed, magic)) { - result = subscriber.run(notexit, timeout) ? 0 : -1; + result = subscriber.run(notexit, 0, timeout) ? 0 : -1; } publisher_thread.join(); diff --git a/test/dds/communication/PublisherMain.cpp b/test/dds/communication/PublisherMain.cpp index bc9a2600d6e..25f3707824f 100644 --- a/test/dds/communication/PublisherMain.cpp +++ b/test/dds/communication/PublisherMain.cpp @@ -29,9 +29,11 @@ using namespace eprosima::fastdds::dds; * --seed * --wait * --samples + * --loops + * --interval * --magic * --xmlfile - * --interval + * --rescan */ int main( @@ -46,7 +48,9 @@ int main( uint32_t wait = 0; char* xml_file = nullptr; uint32_t samples = 4; + uint32_t loops = 0; uint32_t interval = 250; + uint32_t rescan_interval_seconds = 0; std::string magic; while (arg_count < argc) @@ -93,6 +97,16 @@ int main( samples = strtol(argv[arg_count], nullptr, 10); } + else if (strcmp(argv[arg_count], "--loops") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--loops expects a parameter" << std::endl; + return -1; + } + + loops = strtol(argv[arg_count], nullptr, 10); + } else if (strcmp(argv[arg_count], "--interval") == 0) { if (++arg_count >= argc) @@ -123,6 +137,16 @@ int main( xml_file = argv[arg_count]; } + else if (strcmp(argv[arg_count], "--rescan") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--rescan expects a parameter" << std::endl; + return -1; + } + + rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10); + } else { std::cout << "Wrong argument " << argv[arg_count] << std::endl; @@ -146,7 +170,7 @@ int main( publisher.wait_discovery(wait); } - publisher.run(samples, 0, interval); + publisher.run(samples, rescan_interval_seconds, loops, interval); return 0; } diff --git a/test/dds/communication/PublisherModule.cpp b/test/dds/communication/PublisherModule.cpp index c09a0fcdf23..bcde6a7157c 100644 --- a/test/dds/communication/PublisherModule.cpp +++ b/test/dds/communication/PublisherModule.cpp @@ -134,6 +134,7 @@ void PublisherModule::wait_discovery( void PublisherModule::run( uint32_t samples, + const uint32_t rescan_interval, const uint32_t loops, uint32_t interval) { @@ -141,6 +142,22 @@ void PublisherModule::run( uint16_t index = 1; void* sample = nullptr; + std::thread net_rescan_thread([this, rescan_interval]() + { + if (rescan_interval > 0) + { + auto interval = std::chrono::seconds(rescan_interval); + while (run_) + { + std::this_thread::sleep_for(interval); + if (run_) + { + participant_->set_qos(participant_->get_qos()); + } + } + } + }); + while (run_ && (loops == 0 || loops > current_loop)) { if (zero_copy_) @@ -187,6 +204,9 @@ void PublisherModule::run( std::this_thread::sleep_for(std::chrono::milliseconds(interval)); } + + run_ = false; + net_rescan_thread.join(); } void PublisherModule::on_publication_matched( diff --git a/test/dds/communication/PublisherModule.hpp b/test/dds/communication/PublisherModule.hpp index 21b68006e64..fc07e3c5111 100644 --- a/test/dds/communication/PublisherModule.hpp +++ b/test/dds/communication/PublisherModule.hpp @@ -19,8 +19,10 @@ #ifndef TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP #define TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP -#include +#include +#include #include +#include #include #include @@ -42,8 +44,8 @@ class PublisherModule PublisherModule( bool exit_on_lost_liveliness, - bool fixed_type = false, - bool zero_copy = false) + bool fixed_type, + bool zero_copy) : exit_on_lost_liveliness_(exit_on_lost_liveliness) , fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required , zero_copy_(zero_copy) @@ -83,8 +85,9 @@ class PublisherModule void run( uint32_t samples, - uint32_t loops = 0, - uint32_t interval = 250); + const uint32_t rescan_interval, + uint32_t loops, + uint32_t interval); private: @@ -96,7 +99,7 @@ class PublisherModule bool exit_on_lost_liveliness_ = false; bool fixed_type_ = false; bool zero_copy_ = false; - bool run_ = true; + std::atomic_bool run_{true}; DomainParticipant* participant_ = nullptr; TypeSupport type_; Publisher* publisher_ = nullptr; diff --git a/test/dds/communication/SubscriberMain.cpp b/test/dds/communication/SubscriberMain.cpp index 7e07441f7fd..8c93424492d 100644 --- a/test/dds/communication/SubscriberMain.cpp +++ b/test/dds/communication/SubscriberMain.cpp @@ -26,13 +26,15 @@ using namespace eprosima::fastdds::dds; * --notexit * --fixed_type * --zero_copy + * --succeed_on_timeout * --seed * --samples * --magic + * --timeout * --xmlfile * --publishers - * --succeed_on_timeout - * --timeout + * --die_on_data_received + * --rescan */ int main( @@ -49,6 +51,7 @@ int main( uint32_t samples = 4; uint32_t publishers = 1; uint32_t timeout = 86400000; // 24 h in ms + uint32_t rescan_interval_seconds = 0; char* xml_file = nullptr; std::string magic; @@ -134,6 +137,16 @@ int main( { die_on_data_received = true; } + else if (strcmp(argv[arg_count], "--rescan") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--rescan expects a parameter" << std::endl; + return -1; + } + + rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10); + } else { std::cout << "Wrong argument " << argv[arg_count] << std::endl; @@ -152,7 +165,7 @@ int main( if (subscriber.init(seed, magic)) { - return subscriber.run(notexit, timeout) ? 0 : -1; + return subscriber.run(notexit, rescan_interval_seconds, timeout) ? 0 : -1; } return -1; diff --git a/test/dds/communication/SubscriberModule.cpp b/test/dds/communication/SubscriberModule.cpp index 062b1982dbb..10db7f577d4 100644 --- a/test/dds/communication/SubscriberModule.cpp +++ b/test/dds/communication/SubscriberModule.cpp @@ -132,17 +132,35 @@ bool SubscriberModule::init( bool SubscriberModule::run( bool notexit, + const uint32_t rescan_interval, uint32_t timeout) { - return run_for(notexit, std::chrono::milliseconds(timeout)); + return run_for(notexit, rescan_interval, std::chrono::milliseconds(timeout)); } bool SubscriberModule::run_for( bool notexit, + const uint32_t rescan_interval, const std::chrono::milliseconds& timeout) { bool returned_value = false; + std::thread net_rescan_thread([this, rescan_interval]() + { + if (rescan_interval > 0) + { + auto interval = std::chrono::seconds(rescan_interval); + while (run_) + { + std::this_thread::sleep_for(interval); + if (run_) + { + participant_->set_qos(participant_->get_qos()); + } + } + } + }); + while (notexit && run_) { std::this_thread::sleep_for(std::chrono::milliseconds(250)); @@ -192,6 +210,9 @@ bool SubscriberModule::run_for( returned_value = false; } + run_ = false; + net_rescan_thread.join(); + return returned_value; } diff --git a/test/dds/communication/SubscriberModule.hpp b/test/dds/communication/SubscriberModule.hpp index 9d9062b4b13..c0bce0f4c45 100644 --- a/test/dds/communication/SubscriberModule.hpp +++ b/test/dds/communication/SubscriberModule.hpp @@ -19,10 +19,11 @@ #ifndef TEST_COMMUNICATION_SUBSCRIBER_HPP #define TEST_COMMUNICATION_SUBSCRIBER_HPP -#include +#include +#include #include #include -#include +#include #include #include @@ -45,10 +46,10 @@ class SubscriberModule SubscriberModule( const uint32_t publishers, const uint32_t max_number_samples, - bool fixed_type = false, - bool zero_copy = false, - bool succeed_on_timeout = false, - bool die_on_data_received = false) + bool fixed_type, + bool zero_copy, + bool succeed_on_timeout, + bool die_on_data_received) : publishers_(publishers) , max_number_samples_(max_number_samples) , fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required @@ -89,10 +90,12 @@ class SubscriberModule bool run( bool notexit, - uint32_t timeout = 86400000); + const uint32_t rescan_interval, + uint32_t timeout); bool run_for( bool notexit, + const uint32_t rescan_interval, const std::chrono::milliseconds& timeout); private: @@ -106,7 +109,7 @@ class SubscriberModule std::map number_samples_; bool fixed_type_ = false; bool zero_copy_ = false; - bool run_ = true; + std::atomic_bool run_{true}; bool succeed_on_timeout_ = false; DomainParticipant* participant_ = nullptr; TypeSupport type_; diff --git a/test/dds/communication/dyn_network/CMakeLists.txt b/test/dds/communication/dyn_network/CMakeLists.txt new file mode 100644 index 00000000000..0d95ac665e0 --- /dev/null +++ b/test/dds/communication/dyn_network/CMakeLists.txt @@ -0,0 +1,61 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +message(STATUS "Configuring dynamic network interfaces tests") + +# Find docker +find_program(DOCKER_EXECUTABLE docker) +if(NOT DOCKER_EXECUTABLE) + message(FATAL_ERROR "Docker not found") +endif() + +set(SHELL_EXECUTABLE "") +set(TINYXML2_LIB_DIR_COMPOSE_VOLUME "") +set(TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH "") + +# Linux configurations +if(UNIX AND NOT(APPLE) AND NOT(QNXNTO) AND NOT(ANDROID)) + # Find bash + find_program(BASH_EXECUTABLE bash) + if(NOT BASH_EXECUTABLE) + message(FATAL_ERROR "bash not found") + endif() + + set(SHELL_EXECUTABLE ${BASH_EXECUTABLE}) + +# Windows configurations +elseif(WIN32) + # We don't know which docker image to use for Windows yet + message(FATAL_ERROR "Windows not supported yet") + +# Unsupported platform +else() + message(FATAL_ERROR "Unsupported platform") +endif() + +# Configure TinyXML2 library path if installed in user library path +if(NOT (TINYXML2_FROM_SOURCE OR TINYXML2_FROM_THIRDPARTY)) + get_filename_component(TINYXML2_LIB_DIR ${TINYXML2_LIBRARY} DIRECTORY) + set(TINYXML2_LIB_DIR_COMPOSE_VOLUME "- ${TINYXML2_LIB_DIR}:${CMAKE_INSTALL_PREFIX}/${DATA_INSTALL_DIR}/fastdds:ro") + set(TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH ":${CMAKE_INSTALL_PREFIX}/${DATA_INSTALL_DIR}/fastdds") +endif() + +configure_file(Dockerfile + ${CMAKE_CURRENT_BINARY_DIR}/Dockerfile @ONLY) +configure_file(dynamic_interfaces.compose.yml + ${CMAKE_CURRENT_BINARY_DIR}/dynamic_interfaces.compose.yml @ONLY) +configure_file(launch_subscriber.bash + ${CMAKE_CURRENT_BINARY_DIR}/launch_subscriber.bash @ONLY) +add_test(NAME dds.communication.dynamic_interfaces + COMMAND ${DOCKER_EXECUTABLE} compose -f ${CMAKE_CURRENT_BINARY_DIR}/dynamic_interfaces.compose.yml up + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/test/dds/communication/dyn_network/Dockerfile b/test/dds/communication/dyn_network/Dockerfile new file mode 100644 index 00000000000..ebdcae10bba --- /dev/null +++ b/test/dds/communication/dyn_network/Dockerfile @@ -0,0 +1,27 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Tag, branch, or commit in github.com/eProsima/DDS-Suite +ARG ubuntu_version=22.04 +FROM ubuntu:$ubuntu_version AS ubuntu-net-tools + +# Needed for a dependency that forces to set timezone +ENV TZ=Europe/Madrid +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + +# Avoids using interactions during building +ENV DEBIAN_FRONTEND=noninteractive + +# Install apt dependencies +RUN apt-get update && apt-get install --yes net-tools && rm -rf /var/lib/apt/lists/* diff --git a/test/dds/communication/dyn_network/dynamic_interfaces.compose.yml b/test/dds/communication/dyn_network/dynamic_interfaces.compose.yml new file mode 100644 index 00000000000..9d45abbbc48 --- /dev/null +++ b/test/dds/communication/dyn_network/dynamic_interfaces.compose.yml @@ -0,0 +1,42 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +version: "3" + +services: + publisher: + image: ubuntu:22.04 + volumes: + - @PROJECT_BINARY_DIR@:@PROJECT_BINARY_DIR@ + - @fastcdr_LIB_DIR@:@fastcdr_LIB_DIR@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@@TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH@ + EXAMPLE_DIR: @PROJECT_BINARY_DIR@/test/dds/communication + command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/DDSCommunicationPublisher --xmlfile $${EXAMPLE_DIR}/simple_reliable_profile.xml --wait 1 --samples 10 --loops 1 --seed 0 --magic T" + + subscriber: + build: . + image: ubuntu-net-tools:22.04 + privileged: true + volumes: + - @PROJECT_BINARY_DIR@:@PROJECT_BINARY_DIR@ + - @fastcdr_LIB_DIR@:@fastcdr_LIB_DIR@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@@TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH@ + EXAMPLE_DIR: @PROJECT_BINARY_DIR@/test/dds/communication + working_dir: @PROJECT_BINARY_DIR@/test/dds/communication + command: @SHELL_EXECUTABLE@ "dyn_network/launch_subscriber.bash" + depends_on: + - publisher diff --git a/test/dds/communication/dyn_network/launch_subscriber.bash b/test/dds/communication/dyn_network/launch_subscriber.bash new file mode 100755 index 00000000000..9ee62e1338b --- /dev/null +++ b/test/dds/communication/dyn_network/launch_subscriber.bash @@ -0,0 +1,33 @@ +# Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash + +# Note: This script is intended to be used in a privileged container, since it requires to bring down and up the eth0 interface. + +echo "Putting down eth0 interface..." +ifconfig eth0 down + +echo "Launching subscriber..." +${EXAMPLE_DIR}/DDSCommunicationSubscriber --xmlfile ${EXAMPLE_DIR}/simple_reliable_profile.xml --samples 10 --seed 0 --magic T --rescan 2 & +subs_pid=$! +echo "Subscriber launched." + +echo "Waiting 2 seconds and bring up eth0 interface..." +sleep 2s +ifconfig eth0 up +echo "eth0 interface is up." + +echo "Waiting 3s for the subscriber (process id $subs_pid) to finish..." +wait $subs_pid diff --git a/test/dds/communication/security/PublisherMain.cpp b/test/dds/communication/security/PublisherMain.cpp index 8db6dee6f19..21e3eae210a 100644 --- a/test/dds/communication/security/PublisherMain.cpp +++ b/test/dds/communication/security/PublisherMain.cpp @@ -28,9 +28,10 @@ using namespace eprosima::fastdds::dds; * --seed * --wait * --samples + * --interval * --magic * --xmlfile - * --interval + * --rescan */ int main( @@ -46,6 +47,7 @@ int main( char* xml_file = nullptr; uint32_t samples = 4; uint32_t interval = 250; + uint32_t rescan_interval_seconds = 0; std::string magic; while (arg_count < argc) @@ -122,6 +124,16 @@ int main( xml_file = argv[arg_count]; } + else if (strcmp(argv[arg_count], "--rescan") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--rescan expects a parameter" << std::endl; + return -1; + } + + rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10); + } else { std::cout << "Wrong argument " << argv[arg_count] << std::endl; @@ -145,7 +157,7 @@ int main( publisher.wait_discovery(wait); } - publisher.run(samples, 0, interval); + publisher.run(samples, rescan_interval_seconds, 0, interval); return 0; } diff --git a/test/dds/communication/security/PublisherModule.cpp b/test/dds/communication/security/PublisherModule.cpp index 8505d09cfe2..5dcb8f30cbd 100644 --- a/test/dds/communication/security/PublisherModule.cpp +++ b/test/dds/communication/security/PublisherModule.cpp @@ -134,6 +134,7 @@ void PublisherModule::wait_discovery( void PublisherModule::run( uint32_t samples, + const uint32_t rescan_interval, const uint32_t loops, uint32_t interval) { @@ -141,6 +142,22 @@ void PublisherModule::run( uint16_t index = 1; void* sample = nullptr; + std::thread net_rescan_thread([this, rescan_interval]() + { + if (rescan_interval > 0) + { + auto interval = std::chrono::seconds(rescan_interval); + while (run_) + { + std::this_thread::sleep_for(interval); + if (run_) + { + participant_->set_qos(participant_->get_qos()); + } + } + } + }); + while (run_ && (loops == 0 || loops > current_loop)) { if (zero_copy_) @@ -187,6 +204,9 @@ void PublisherModule::run( std::this_thread::sleep_for(std::chrono::milliseconds(interval)); } + + run_ = false; + net_rescan_thread.join(); } void PublisherModule::on_publication_matched( diff --git a/test/dds/communication/security/PublisherModule.hpp b/test/dds/communication/security/PublisherModule.hpp index 7f316e5d81c..b34f546d812 100644 --- a/test/dds/communication/security/PublisherModule.hpp +++ b/test/dds/communication/security/PublisherModule.hpp @@ -19,8 +19,9 @@ #ifndef TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP #define TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP -#include +#include #include +#include #include #include @@ -42,8 +43,8 @@ class PublisherModule PublisherModule( bool exit_on_lost_liveliness, - bool fixed_type = false, - bool zero_copy = false) + bool fixed_type, + bool zero_copy) : exit_on_lost_liveliness_(exit_on_lost_liveliness) , fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required , zero_copy_(zero_copy) @@ -84,8 +85,9 @@ class PublisherModule void run( uint32_t samples, - uint32_t loops = 0, - uint32_t interval = 250); + const uint32_t rescan_interval, + uint32_t loops, + uint32_t interval); private: @@ -97,7 +99,7 @@ class PublisherModule bool exit_on_lost_liveliness_ = false; bool fixed_type_ = false; bool zero_copy_ = false; - bool run_ = true; + std::atomic_bool run_ {true}; DomainParticipant* participant_ = nullptr; TypeSupport type_; Publisher* publisher_ = nullptr; diff --git a/test/dds/communication/security/SubscriberMain.cpp b/test/dds/communication/security/SubscriberMain.cpp index f917d594c92..5bbd6ba9838 100644 --- a/test/dds/communication/security/SubscriberMain.cpp +++ b/test/dds/communication/security/SubscriberMain.cpp @@ -30,6 +30,8 @@ using namespace eprosima::fastdds::dds; * --magic * --xmlfile * --publishers + * --die_on_data_received + * --rescan */ int main( @@ -44,6 +46,7 @@ int main( uint32_t seed = 7800; uint32_t samples = 4; uint32_t publishers = 1; + uint32_t rescan_interval_seconds = 0; char* xml_file = nullptr; std::string magic; @@ -115,6 +118,16 @@ int main( { die_on_data_received = true; } + else if (strcmp(argv[arg_count], "--rescan") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--rescan expects a parameter" << std::endl; + return -1; + } + + rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10); + } else { std::cout << "Wrong argument " << argv[arg_count] << std::endl; @@ -133,7 +146,7 @@ int main( if (subscriber.init(seed, magic)) { - return subscriber.run(notexit) ? 0 : -1; + return subscriber.run(notexit, rescan_interval_seconds) ? 0 : -1; } return -1; diff --git a/test/dds/communication/security/SubscriberModule.cpp b/test/dds/communication/security/SubscriberModule.cpp index 349454864d0..1e6f13c92f3 100644 --- a/test/dds/communication/security/SubscriberModule.cpp +++ b/test/dds/communication/security/SubscriberModule.cpp @@ -130,17 +130,35 @@ bool SubscriberModule::init( } bool SubscriberModule::run( - bool notexit) + bool notexit, + const uint32_t rescan_interval) { - return run_for(notexit, std::chrono::hours(24)); + return run_for(notexit, rescan_interval, std::chrono::hours(24)); } bool SubscriberModule::run_for( bool notexit, + const uint32_t rescan_interval, const std::chrono::milliseconds& timeout) { bool returned_value = false; + std::thread net_rescan_thread([this, rescan_interval]() + { + if (rescan_interval > 0) + { + auto interval = std::chrono::seconds(rescan_interval); + while (run_) + { + std::this_thread::sleep_for(interval); + if (run_) + { + participant_->set_qos(participant_->get_qos()); + } + } + } + }); + while (notexit && run_) { std::this_thread::sleep_for(std::chrono::milliseconds(250)); @@ -184,6 +202,9 @@ bool SubscriberModule::run_for( returned_value = false; } + run_ = false; + net_rescan_thread.join(); + return returned_value; } diff --git a/test/dds/communication/security/SubscriberModule.hpp b/test/dds/communication/security/SubscriberModule.hpp index 988da3e5cd1..0798953e215 100644 --- a/test/dds/communication/security/SubscriberModule.hpp +++ b/test/dds/communication/security/SubscriberModule.hpp @@ -19,10 +19,11 @@ #ifndef TEST_COMMUNICATION_SUBSCRIBER_HPP #define TEST_COMMUNICATION_SUBSCRIBER_HPP -#include +#include +#include #include #include -#include +#include #include #include @@ -44,9 +45,9 @@ class SubscriberModule SubscriberModule( const uint32_t publishers, const uint32_t max_number_samples, - bool fixed_type = false, - bool zero_copy = false, - bool die_on_data_received = false) + bool fixed_type, + bool zero_copy, + bool die_on_data_received) : publishers_(publishers) , max_number_samples_(max_number_samples) , fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required @@ -85,10 +86,12 @@ class SubscriberModule const std::string& magic); bool run( - bool notexit); + bool notexit, + const uint32_t rescan_interval); bool run_for( bool notexit, + const uint32_t rescan_interval, const std::chrono::milliseconds& timeout); private: @@ -102,7 +105,7 @@ class SubscriberModule std::map number_samples_; bool fixed_type_ = false; bool zero_copy_ = false; - bool run_ = true; + std::atomic_bool run_{true}; DomainParticipant* participant_ = nullptr; TypeSupport type_; Subscriber* subscriber_ = nullptr;