diff --git a/.github/workflows/config/ci.meta b/.github/workflows/config/ci.meta index 13abcf5975d..922251be0a8 100644 --- a/.github/workflows/config/ci.meta +++ b/.github/workflows/config/ci.meta @@ -8,7 +8,6 @@ names: - "-DFASTDDS_ENFORCE_LOG_INFO=ON" - "-DFASTDDS_PIM_API_TESTS=ON" - "-DFASTDDS_STATISTICS=ON" - - "-DFASTRTPS_API_TESTS=OFF" - "-DINSTALL_EXAMPLES=ON" - "-DINSTALL_TOOLS=ON" - "-DINTERNAL_DEBUG=ON" diff --git a/test/blackbox/CMakeLists.txt b/test/blackbox/CMakeLists.txt index 3791de07f81..82dda392f5a 100644 --- a/test/blackbox/CMakeLists.txt +++ b/test/blackbox/CMakeLists.txt @@ -13,7 +13,6 @@ # limitations under the License. option(RTPS_API_TESTS "Enable tests using RTPS API" ON) -option(FASTRTPS_API_TESTS "Enable tests using FastRTPS API" OFF) option(FASTDDS_PIM_API_TESTS "Enable tests using FastDDS API" ON) if(WIN32) @@ -197,47 +196,6 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/auth_handshake_props_profile.xml ${CMAKE_CURRENT_BINARY_DIR}/auth_handshake_props_profile.xml COPYONLY) file(COPY "${CMAKE_CURRENT_SOURCE_DIR}/datagrams" DESTINATION "${CMAKE_CURRENT_BINARY_DIR}") -if(FASTRTPS_API_TESTS) - set(BLACKBOXTESTS_FASTRTPS_SOURCE - ${BLACKBOXTESTS_SOURCE} - api/fastrtps_deprecated/ReqRepHelloWorldRequester.cpp - api/fastrtps_deprecated/ReqRepHelloWorldReplier.cpp - ) - - add_executable(BlackboxTests_FastRTPS ${BLACKBOXTESTS_FASTRTPS_SOURCE}) - target_compile_definitions(BlackboxTests_FastRTPS PRIVATE - BOOST_ASIO_STANDALONE - ASIO_STANDALONE - $<$>:FASTDDS_SHM_TRANSPORT_DISABLED> # Do not compile SHM Transport - $<$>,$>:__DEBUG> - $<$:__INTERNALDEBUG> # Internal debug activated. - $<$:_ENABLE_ATOMIC_ALIGNMENT_FIX> - ) - target_include_directories(BlackboxTests_FastRTPS PRIVATE - ${Asio_INCLUDE_DIR} - api/fastrtps_deprecated) - target_link_libraries(BlackboxTests_FastRTPS - fastrtps - fastcdr - foonathan_memory - GTest::gtest - $<$:eProsima_p11> # $ - ) - - gtest_discover_tests(BlackboxTests_FastRTPS - PROPERTIES - ENVIRONMENT "CERTS_PATH=${PROJECT_SOURCE_DIR}/test/certs" - ENVIRONMENT "TOPIC_RANDOM_NUMBER=${TOPIC_RANDOM_NUMBER}" - ENVIRONMENT "W_UNICAST_PORT_RANDOM_NUMBER=${W_UNICAST_PORT_RANDOM_NUMBER}" - ENVIRONMENT "R_UNICAST_PORT_RANDOM_NUMBER=${R_UNICAST_PORT_RANDOM_NUMBER}" - ENVIRONMENT "MULTICAST_PORT_RANDOM_NUMBER=${MULTICAST_PORT_RANDOM_NUMBER}" - ENVIRONMENT $,OPENSSL_CONF=${OPENSSL_CONF},> - TEST_PREFIX "BlackboxTests_FastRTPS." - TEST_FILTER ${BLACKBOX_HIGH_LEVEL_IGNORED_TESTS} - NO_PRETTY_VALUES - ) -endif(FASTRTPS_API_TESTS) - if(FASTDDS_PIM_API_TESTS) set(BLACKBOXTESTS_FASTDDS_PIM_SOURCE ${DDS_BLACKBOXTESTS_SOURCE} diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp deleted file mode 100644 index 3131dc4efd5..00000000000 --- a/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp +++ /dev/null @@ -1,678 +0,0 @@ -// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file PubSubParticipant.hpp - * - */ - -#ifndef _TEST_BLACKBOX_PUBSUBPARTICIPANT_HPP_ -#define _TEST_BLACKBOX_PUBSUBPARTICIPANT_HPP_ - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace eprosima { -namespace fastrtps { - -/** - * @brief A class with one participant that can have multiple publishers and subscribers - */ -template -class PubSubParticipant -{ - class PubListener : public PublisherListener - { - friend class PubSubParticipant; - - public: - - PubListener( - PubSubParticipant* participant) - : participant_(participant) - { - } - - ~PubListener() - { - } - - void onPublicationMatched( - Publisher* pub, - rtps::MatchingInfo& info) override - { - (void)pub; - (info.status == rtps::MATCHED_MATCHING) ? participant_->pub_matched() : participant_->pub_unmatched(); - } - - void on_liveliness_lost( - Publisher* pub, - const LivelinessLostStatus& status) override - { - (void)pub; - (void)status; - participant_->pub_liveliness_lost(); - } - - private: - - PubListener& operator =( - const PubListener&) = delete; - //! A pointer to the participant - PubSubParticipant* participant_; - }; - - class SubListener : public SubscriberListener - { - friend class PubSubParticipant; - - public: - - SubListener( - PubSubParticipant* participant) - : participant_(participant) - { - } - - ~SubListener() - { - } - - void onSubscriptionMatched( - Subscriber* sub, - rtps::MatchingInfo& info) override - { - (void)sub; - (info.status == rtps::MATCHED_MATCHING) ? participant_->sub_matched() : participant_->sub_unmatched(); - } - - void on_liveliness_changed( - Subscriber* sub, - const LivelinessChangedStatus& status) override - { - (void)sub; - (status.alive_count_change == - 1) ? participant_->sub_liveliness_recovered() : participant_->sub_liveliness_lost(); - - } - - private: - - SubListener& operator =( - const SubListener&) = delete; - //! A pointer to the participant - PubSubParticipant* participant_; - }; - -public: - - typedef TypeSupport type_support; - typedef typename type_support::type type; - - PubSubParticipant( - unsigned int num_publishers, - unsigned int num_subscribers, - unsigned int num_expected_publishers, - unsigned int num_expected_subscribers) - : participant_(nullptr) - , participant_attr_() - , num_publishers_(num_publishers) - , num_subscribers_(num_subscribers) - , num_expected_subscribers_(num_expected_subscribers) - , num_expected_publishers_(num_expected_publishers) - , publishers_(num_publishers) - , subscribers_(num_subscribers) - , publisher_attr_() - , pub_listener_(this) - , sub_listener_(this) - , pub_matched_(0) - , sub_matched_(0) - , pub_times_liveliness_lost_(0) - , sub_times_liveliness_lost_(0) - , sub_times_liveliness_recovered_(0) - { - -#if defined(PREALLOCATED_MEMORY_MODE_TEST) - publisher_attr_.historyMemoryPolicy = rtps::PREALLOCATED_MEMORY_MODE; -#elif defined(DYNAMIC_RESERVE_MEMORY_MODE_TEST) - publisher_attr_.historyMemoryPolicy = rtps::DYNAMIC_RESERVE_MEMORY_MODE; -#else - publisher_attr_.historyMemoryPolicy = rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; -#endif // if defined(PREALLOCATED_WITH_REALLOC_MEMORY_MODE_TEST) - - // By default, heartbeat period and nack response delay are 100 milliseconds. - publisher_attr_.times.heartbeatPeriod.seconds = 0; - publisher_attr_.times.heartbeatPeriod.nanosec = 100000000; - publisher_attr_.times.nackResponseDelay.seconds = 0; - publisher_attr_.times.nackResponseDelay.nanosec = 100000000; - - // Increase default max_blocking_time to 1 second, as our CI infrastructure shows some - // big CPU overhead sometimes - publisher_attr_.qos.m_reliability.max_blocking_time.seconds = 1; - publisher_attr_.qos.m_reliability.max_blocking_time.nanosec = 0; - - // By default, heartbeat period delay is 100 milliseconds. - subscriber_attr_.times.heartbeatResponseDelay = 0.1; - } - - ~PubSubParticipant() - { - if (participant_ != nullptr) - { - Domain::removeParticipant(participant_); - participant_ = nullptr; - } - } - - bool init_participant() - { - participant_attr_.domainId = (uint32_t)GET_PID() % 230; - participant_ = Domain::createParticipant(participant_attr_); - if (participant_ != nullptr) - { - Domain::registerType(participant_, &type_); - return true; - } - return false; - } - - bool init_publisher( - unsigned int index) - { - if (participant_ == nullptr) - { - return false; - } - if (index >= num_publishers_) - { - return false; - } - - auto pub = Domain::createPublisher(participant_, publisher_attr_, &pub_listener_); - if (pub != nullptr) - { - publishers_[index] = pub; - return true; - } - return false; - } - - bool init_subscriber( - unsigned int index) - { - if (index >= num_subscribers_) - { - return false; - } - auto subscriber = Domain::createSubscriber(participant_, subscriber_attr_, &sub_listener_); - if (subscriber != nullptr) - { - subscribers_[index] = subscriber; - return true; - } - return false; - } - - eprosima::fastrtps::Publisher& get_native_writer( - unsigned int index) - { - return *(publishers_[index]); - } - - eprosima::fastrtps::Subscriber& get_native_reader( - unsigned int index) - { - return *(subscribers_[index]); - } - - bool send_sample( - type& msg, - unsigned int index = 0) - { - return publishers_[index]->write((void*)&msg); - } - - void assert_liveliness_participant() - { - participant_->assert_liveliness(); - } - - void assert_liveliness( - unsigned int index = 0) - { - publishers_[index]->assert_liveliness(); - } - - void pub_wait_discovery( - std::chrono::seconds timeout = std::chrono::seconds::zero()) - { - std::unique_lock lock(pub_mutex_); - - std::cout << "Publisher is waiting discovery..." << std::endl; - - if (timeout == std::chrono::seconds::zero()) - { - pub_cv_.wait(lock, [&]() - { - return pub_matched_ == num_expected_publishers_; - }); - } - else - { - pub_cv_.wait_for(lock, timeout, [&]() - { - return pub_matched_ == num_expected_publishers_; - }); - } - - std::cout << "Publisher discovery finished " << std::endl; - } - - void pub_wait_discovery( - unsigned int expected_match, - std::chrono::seconds timeout = std::chrono::seconds::zero()) - { - std::unique_lock lock(pub_mutex_); - - std::cout << "Publisher is waiting discovery..." << std::endl; - - if (timeout == std::chrono::seconds::zero()) - { - pub_cv_.wait(lock, [&]() - { - return pub_matched_ == expected_match; - }); - } - else - { - pub_cv_.wait_for(lock, timeout, [&]() - { - return pub_matched_ == expected_match; - }); - } - - std::cout << "Publisher discovery finished " << std::endl; - } - - void sub_wait_discovery( - std::chrono::seconds timeout = std::chrono::seconds::zero()) - { - std::unique_lock lock(sub_mutex_); - - std::cout << "Subscriber is waiting discovery..." << std::endl; - - if (timeout == std::chrono::seconds::zero()) - { - sub_cv_.wait(lock, [&]() - { - return sub_matched_ == num_expected_subscribers_; - }); - } - else - { - sub_cv_.wait_for(lock, timeout, [&]() - { - return sub_matched_ == num_expected_subscribers_; - }); - } - - std::cout << "Subscriber discovery finished " << std::endl; - } - - void sub_wait_discovery( - unsigned int expected_match, - std::chrono::seconds timeout = std::chrono::seconds::zero()) - { - std::unique_lock lock(sub_mutex_); - - std::cout << "Subscriber is waiting discovery..." << std::endl; - - if (timeout == std::chrono::seconds::zero()) - { - sub_cv_.wait(lock, [&]() - { - return sub_matched_ == expected_match; - }); - } - else - { - sub_cv_.wait_for(lock, timeout, [&]() - { - return sub_matched_ == expected_match; - }); - } - - std::cout << "Subscriber discovery finished " << std::endl; - } - - void pub_wait_liveliness_lost( - unsigned int times = 1) - { - std::unique_lock lock(pub_liveliness_mutex_); - pub_liveliness_cv_.wait(lock, [&]() - { - return pub_times_liveliness_lost_ >= times; - }); - } - - void sub_wait_liveliness_recovered( - unsigned int num_recovered) - { - std::unique_lock lock(sub_liveliness_mutex_); - sub_liveliness_cv_.wait(lock, [&]() - { - return sub_times_liveliness_recovered_ >= num_recovered; - }); - } - - void sub_wait_liveliness_lost( - unsigned int num_lost) - { - std::unique_lock lock(sub_liveliness_mutex_); - sub_liveliness_cv_.wait(lock, [&]() - { - return sub_times_liveliness_lost_ >= num_lost; - }); - } - - template - size_t sub_wait_liveliness_lost_for( - unsigned int expected_num_lost, - const std::chrono::duration<_Rep, _Period>& max_wait) - { - std::unique_lock lock(sub_liveliness_mutex_); - sub_liveliness_cv_.wait_for(lock, max_wait, [this, &expected_num_lost]() -> bool - { - return sub_times_liveliness_lost_ >= expected_num_lost; - }); - - return sub_times_liveliness_lost_; - } - - PubSubParticipant& property_policy( - const eprosima::fastrtps::rtps::PropertyPolicy property_policy) - { - participant_attr_.rtps.properties = property_policy; - return *this; - } - - PubSubParticipant& disable_builtin_transport() - { - participant_attr_.rtps.useBuiltinTransports = false; - return *this; - } - - PubSubParticipant& add_user_transport_to_pparams( - std::shared_ptr userTransportDescriptor) - { - participant_attr_.rtps.userTransports.push_back(userTransportDescriptor); - return *this; - } - - PubSubParticipant& pub_property_policy( - const eprosima::fastrtps::rtps::PropertyPolicy property_policy) - { - publisher_attr_.properties = property_policy; - return *this; - } - - PubSubParticipant& sub_property_policy( - const eprosima::fastrtps::rtps::PropertyPolicy property_policy) - { - subscriber_attr_.properties = property_policy; - return *this; - } - - PubSubParticipant& pub_topic_name( - std::string topicName) - { - // Generate topic name - std::ostringstream t; - t << topicName << "_" << asio::ip::host_name() << "_" << GET_PID(); - publisher_attr_.topic.topicName = t.str(); - publisher_attr_.topic.topicDataType = type_.getName(); - return *this; - } - - PubSubParticipant& sub_topic_name( - std::string topicName) - { - // Generate topic name - std::ostringstream t; - t << topicName << "_" << asio::ip::host_name() << "_" << GET_PID(); - subscriber_attr_.topic.topicName = t.str(); - subscriber_attr_.topic.topicDataType = type_.getName(); - return *this; - } - - PubSubParticipant& reliability( - const ReliabilityQosPolicyKind kind) - { - publisher_attr_.qos.m_reliability.kind = kind; - subscriber_attr_.qos.m_reliability.kind = kind; - return *this; - } - - PubSubParticipant& pub_liveliness_kind( - const LivelinessQosPolicyKind kind) - { - publisher_attr_.qos.m_liveliness.kind = kind; - return *this; - } - - PubSubParticipant& pub_liveliness_lease_duration( - const Duration_t lease_duration) - { - publisher_attr_.qos.m_liveliness.lease_duration = lease_duration; - return *this; - } - - PubSubParticipant& pub_liveliness_announcement_period( - const Duration_t announcement_period) - { - publisher_attr_.qos.m_liveliness.announcement_period = announcement_period; - return *this; - } - - PubSubParticipant& sub_liveliness_kind( - const LivelinessQosPolicyKind& kind) - { - subscriber_attr_.qos.m_liveliness.kind = kind; - return *this; - } - - PubSubParticipant& sub_liveliness_lease_duration( - const Duration_t lease_duration) - { - subscriber_attr_.qos.m_liveliness.lease_duration = lease_duration; - return *this; - } - - PubSubParticipant& pub_deadline_period( - const Duration_t& deadline_period) - { - publisher_attr_.qos.m_deadline.period = deadline_period; - return *this; - } - - PubSubParticipant& sub_deadline_period( - const Duration_t& deadline_period) - { - subscriber_attr_.qos.m_deadline.period = deadline_period; - return *this; - } - - bool sub_update_deadline_period( - const Duration_t& deadline_period, - unsigned int index) - { - if (index >= num_subscribers_) - { - return false; - } - if (subscribers_[index] == nullptr) - { - return false; - } - - SubscriberAttributes attr; - attr = subscriber_attr_; - attr.qos.m_deadline.period = deadline_period; - - return subscribers_[index]->updateAttributes(attr); - } - - void pub_liveliness_lost() - { - std::unique_lock lock(pub_liveliness_mutex_); - pub_times_liveliness_lost_++; - pub_liveliness_cv_.notify_one(); - } - - void sub_liveliness_lost() - { - std::unique_lock lock(sub_liveliness_mutex_); - sub_times_liveliness_lost_++; - sub_liveliness_cv_.notify_one(); - } - - void sub_liveliness_recovered() - { - std::unique_lock lock(sub_liveliness_mutex_); - sub_times_liveliness_recovered_++; - sub_liveliness_cv_.notify_one(); - } - - unsigned int pub_times_liveliness_lost() - { - std::unique_lock lock(pub_liveliness_mutex_); - return pub_times_liveliness_lost_; - } - - unsigned int sub_times_liveliness_lost() - { - std::unique_lock lock(sub_liveliness_mutex_); - return sub_times_liveliness_lost_; - } - - unsigned int sub_times_liveliness_recovered() - { - std::unique_lock lock(sub_liveliness_mutex_); - return sub_times_liveliness_recovered_; - } - -private: - - PubSubParticipant& operator =( - const PubSubParticipant&) = delete; - - void pub_matched() - { - std::unique_lock lock(pub_mutex_); - ++pub_matched_; - pub_cv_.notify_one(); - } - - void pub_unmatched() - { - std::unique_lock lock(pub_mutex_); - --pub_matched_; - pub_cv_.notify_one(); - } - - void sub_matched() - { - std::unique_lock lock(sub_mutex_); - ++sub_matched_; - sub_cv_.notify_one(); - } - - void sub_unmatched() - { - std::unique_lock lock(sub_mutex_); - --sub_matched_; - sub_cv_.notify_one(); - } - - //! The participant - Participant* participant_; - //! Participant attributes - ParticipantAttributes participant_attr_; - //! Number of publishers in this participant - unsigned int num_publishers_; - //! Number of subscribers in this participant - unsigned int num_subscribers_; - //! Number of expected subscribers to match - unsigned int num_expected_subscribers_; - //! Number of expected subscribers to match - unsigned int num_expected_publishers_; - //! A vector of publishers - std::vector publishers_; - //! A vector of subscribers - std::vector subscribers_; - //! Publisher attributes - PublisherAttributes publisher_attr_; - //! Subscriber attributes - SubscriberAttributes subscriber_attr_; - //! A listener for publishers - PubListener pub_listener_; - //! A listener for subscribers - SubListener sub_listener_; - - std::mutex pub_mutex_; - std::mutex sub_mutex_; - std::condition_variable pub_cv_; - std::condition_variable sub_cv_; - std::atomic pub_matched_; - std::atomic sub_matched_; - - //! Number of times liveliness was lost on the publishing side - unsigned int pub_times_liveliness_lost_; - //! The number of times liveliness was lost on the subscribing side - unsigned int sub_times_liveliness_lost_; - //! The number of times liveliness was recovered on the subscribing side - unsigned int sub_times_liveliness_recovered_; - //! A mutex protecting liveliness data - std::mutex sub_liveliness_mutex_; - //! A condition variable for liveliness data - std::condition_variable sub_liveliness_cv_; - //! A mutex protecting liveliness of publisher - std::mutex pub_liveliness_mutex_; - //! A condition variable for liveliness of publisher - std::condition_variable pub_liveliness_cv_; - - type_support type_; -}; - -} // namespace fastrtps -} // namespace eprosima - -#endif // _TEST_BLACKBOX_PUBSUBPARTICIPANT_HPP_ diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp deleted file mode 100644 index 2297ab359a0..00000000000 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ /dev/null @@ -1,1584 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file PubSubReader.hpp - * - */ - -#ifndef _TEST_BLACKBOX_PUBSUBREADER_HPP_ -#define _TEST_BLACKBOX_PUBSUBREADER_HPP_ - -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using eprosima::fastrtps::rtps::IPLocator; -using eprosima::fastrtps::rtps::UDPTransportDescriptor; -using eprosima::fastrtps::rtps::UDPv4TransportDescriptor; -using eprosima::fastrtps::rtps::UDPv6TransportDescriptor; - -template -class PubSubReader -{ -public: - - typedef TypeSupport type_support; - typedef typename type_support::type type; - -private: - - class ParticipantListener : public eprosima::fastrtps::ParticipantListener - { - public: - - ParticipantListener( - PubSubReader& reader) - : reader_(reader) - { - } - - ~ParticipantListener() - { - } - - void onParticipantDiscovery( - eprosima::fastrtps::Participant*, - eprosima::fastrtps::rtps::ParticipantDiscoveryInfo&& info) override - { - if (reader_.onDiscovery_ != nullptr) - { - std::unique_lock lock(reader_.mutexDiscovery_); - reader_.discovery_result_ |= reader_.onDiscovery_(info); - reader_.cvDiscovery_.notify_one(); - } - - if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT) - { - reader_.participant_matched(); - - } - else if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT || - info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT) - { - reader_.participant_unmatched(); - } - } - - void onPublisherDiscovery( - eprosima::fastrtps::Participant*, - eprosima::fastrtps::rtps::WriterDiscoveryInfo&& info) override - { - if (reader_.onEndpointDiscovery_ != nullptr) - { - std::unique_lock lock(reader_.mutexDiscovery_); - reader_.discovery_result_ |= reader_.onEndpointDiscovery_(info); - reader_.cvDiscovery_.notify_one(); - } - } - -#if HAVE_SECURITY - void onParticipantAuthentication( - eprosima::fastrtps::Participant*, - eprosima::fastrtps::rtps::ParticipantAuthenticationInfo&& info) override - { - if (info.status == eprosima::fastrtps::rtps::ParticipantAuthenticationInfo::AUTHORIZED_PARTICIPANT) - { - reader_.authorized(); - } - else if (info.status == eprosima::fastrtps::rtps::ParticipantAuthenticationInfo::UNAUTHORIZED_PARTICIPANT) - { - reader_.unauthorized(); - } - } - -#endif // if HAVE_SECURITY - - private: - - ParticipantListener& operator =( - const ParticipantListener&) = delete; - PubSubReader& reader_; - - } - participant_listener_; - - class Listener : public eprosima::fastrtps::SubscriberListener - { - public: - - Listener( - PubSubReader& reader) - : reader_(reader) - , times_deadline_missed_(0) - { - } - - ~Listener() - { - } - - void onNewDataMessage( - eprosima::fastrtps::Subscriber* sub) override - { - ASSERT_NE(sub, nullptr); - reader_.message_receive_count_.fetch_add(1); - reader_.message_receive_cv_.notify_one(); - - if (reader_.receiving_.load()) - { - bool ret = false; - do - { - reader_.receive_one(sub, ret); - } while (ret); - } - } - - void onSubscriptionMatched( - eprosima::fastrtps::Subscriber* /*sub*/, - eprosima::fastrtps::rtps::MatchingInfo& info) override - { - if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING) - { - std::cout << "Subscriber matched publisher " << info.remoteEndpointGuid << std::endl; - reader_.matched(); - } - else - { - std::cout << "Subscriber unmatched publisher " << info.remoteEndpointGuid << std::endl; - reader_.unmatched(); - } - } - - void on_requested_deadline_missed( - eprosima::fastrtps::Subscriber* sub, - const eprosima::fastrtps::RequestedDeadlineMissedStatus& status) override - { - (void)sub; - - times_deadline_missed_ = status.total_count; - } - - void on_liveliness_changed( - eprosima::fastrtps::Subscriber* sub, - const eprosima::fastrtps::LivelinessChangedStatus& status) override - { - (void)sub; - - reader_.set_liveliness_changed_status(status); - - if (status.alive_count_change == 1) - { - reader_.liveliness_recovered(); - - } - else if (status.not_alive_count_change == 1) - { - reader_.liveliness_lost(); - - } - } - - unsigned int missed_deadlines() const - { - return times_deadline_missed_; - } - - private: - - Listener& operator =( - const Listener&) = delete; - - PubSubReader& reader_; - - //! Number of times deadline was missed - unsigned int times_deadline_missed_; - - } - listener_; - - friend class Listener; - -public: - - PubSubReader( - const std::string& topic_name, - bool take = true) - : participant_listener_(*this) - , listener_(*this) - , participant_(nullptr) - , subscriber_(nullptr) - , topic_name_(topic_name) - , initialized_(false) - , matched_(0) - , participant_matched_(0) - , receiving_(false) - , current_processed_count_(0) - , number_samples_expected_(0) - , discovery_result_(false) - , onDiscovery_(nullptr) - , onEndpointDiscovery_(nullptr) - , take_(take) -#if HAVE_SECURITY - , authorized_(0) - , unauthorized_(0) -#endif // if HAVE_SECURITY - , liveliness_mutex_() - , liveliness_cv_() - , times_liveliness_lost_(0) - , times_liveliness_recovered_(0) - , message_receive_count_(0) - { - subscriber_attr_.topic.topicDataType = type_.getName(); - // Generate topic name - std::ostringstream t; - t << topic_name_ << "_" << asio::ip::host_name() << "_" << GET_PID(); - subscriber_attr_.topic.topicName = t.str(); - subscriber_attr_.topic.topicKind = - type_.m_isGetKeyDefined ? ::eprosima::fastrtps::rtps::WITH_KEY : ::eprosima::fastrtps::rtps::NO_KEY; - - // By default, memory mode is PREALLOCATED_WITH_REALLOC_MEMORY_MODE - subscriber_attr_.historyMemoryPolicy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - - // By default, heartbeat period delay is 100 milliseconds. - subscriber_attr_.times.heartbeatResponseDelay.seconds = 0; - subscriber_attr_.times.heartbeatResponseDelay.nanosec = 100000000; - } - - ~PubSubReader() - { - if (participant_ != nullptr) - { - eprosima::fastrtps::Domain::removeParticipant(participant_); - } - } - - eprosima::fastrtps::Subscriber& get_native_reader() const - { - return *subscriber_; - } - - void init() - { - //Create participant - // Use local copies of attributes to catch #6507 issues with valgrind - eprosima::fastrtps::ParticipantAttributes participant_attr; - eprosima::fastrtps::SubscriberAttributes subscriber_attr; - - if (!xml_file_.empty()) - { - eprosima::fastrtps::Domain::loadXMLProfilesFile(xml_file_); - if (!participant_profile_.empty()) - { - // Need to specify ID in XML - participant_ = eprosima::fastrtps::Domain::createParticipant(participant_profile_, - &participant_listener_); - ASSERT_NE(participant_, nullptr); - participant_attr = participant_->getAttributes(); - subscriber_attr = subscriber_attr_; - } - } - if (participant_ == nullptr) - { - participant_attr_.domainId = (uint32_t)GET_PID() % 230; - - participant_attr = participant_attr_; - subscriber_attr = subscriber_attr_; - - participant_ = eprosima::fastrtps::Domain::createParticipant(participant_attr, &participant_listener_); - } - - if (participant_ != nullptr) - { - participant_guid_ = participant_->getGuid(); - - // Register type - ASSERT_EQ(eprosima::fastrtps::Domain::registerType(participant_, &type_), true); - - //Create subscribe r - subscriber_ = eprosima::fastrtps::Domain::createSubscriber(participant_, subscriber_attr, &listener_); - - if (subscriber_ != nullptr) - { - subscriber_guid_ = subscriber_->getGuid(); - std::cout << "Created subscriber " << subscriber_guid_ << " for topic " << - subscriber_attr_.topic.topicName << std::endl; - - initialized_ = true; - } - } - } - - bool isInitialized() const - { - return initialized_; - } - - void destroy() - { - if (participant_ != nullptr) - { - eprosima::fastrtps::Domain::removeParticipant(participant_); - participant_ = nullptr; - } - - initialized_ = false; - } - - std::list data_not_received() - { - std::unique_lock lock(mutex_); - return total_msgs_; - } - - void startReception( - const std::list& msgs) - { - mutex_.lock(); - total_msgs_ = msgs; - number_samples_expected_ = total_msgs_.size(); - current_processed_count_ = 0; - last_seq.clear(); - mutex_.unlock(); - - bool ret = false; - do - { - receive_one(subscriber_, ret); - } - while (ret); - - receiving_.store(true); - } - - void stopReception() - { - receiving_.store(false); - } - - template - bool wait_for_all_received( - const std::chrono::duration<_Rep, _Period>& max_wait, - size_t num_messages = 0) - { - if (num_messages == 0) - { - num_messages = number_samples_expected_; - } - std::unique_lock lock(message_receive_mutex_); - return message_receive_cv_.wait_for(lock, max_wait, [this, num_messages]() -> bool - { - return num_messages == message_receive_count_; - }); - } - - void block_for_all() - { - block([this]() -> bool - { - return number_samples_expected_ == current_processed_count_; - }); - } - - void block_for_seq( - eprosima::fastrtps::rtps::SequenceNumber_t seq) - { - block([this, seq]() -> bool - { - return get_last_sequence_received() == seq; - }); - } - - size_t block_for_at_least( - size_t at_least) - { - size_t read_count_locked; // solves TSan data race - block([this, &read_count_locked, at_least]() -> bool - { - read_count_locked = current_processed_count_; - return current_processed_count_ >= at_least; - }); - return read_count_locked; - } - - void block( - std::function checker) - { - std::unique_lock lock(mutex_); - cv_.wait(lock, checker); - } - - template - size_t block_for_all( - const std::chrono::duration<_Rep, _Period>& max_wait) - { - std::unique_lock lock(mutex_); - cv_.wait_for(lock, max_wait, [this]() -> bool - { - return number_samples_expected_ == current_processed_count_; - }); - - return current_processed_count_; - } - - void wait_discovery( - std::chrono::seconds timeout = std::chrono::seconds::zero(), - unsigned int min_writers = 1) - { - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Reader is waiting discovery..." << std::endl; - - if (timeout == std::chrono::seconds::zero()) - { - cvDiscovery_.wait(lock, [&]() - { - return matched_ >= min_writers; - }); - } - else - { - cvDiscovery_.wait_for(lock, timeout, [&]() - { - return matched_ >= min_writers; - }); - } - - std::cout << "Reader discovery finished..." << std::endl; - } - - bool wait_participant_discovery( - unsigned int min_participants = 1, - std::chrono::seconds timeout = std::chrono::seconds::zero()) - { - bool ret_value = true; - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Reader is waiting discovery of at least " << min_participants << " participants..." << std::endl; - - if (timeout == std::chrono::seconds::zero()) - { - cvDiscovery_.wait(lock, [&]() - { - return participant_matched_ >= min_participants; - }); - } - else - { - if (!cvDiscovery_.wait_for(lock, timeout, [&]() - { - return participant_matched_ >= min_participants; - })) - { - ret_value = false; - } - } - - if (ret_value) - { - std::cout << "Reader participant discovery finished successfully..." << std::endl; - } - else - { - std::cout << "Reader participant discovery finished unsuccessfully..." << std::endl; - } - - return ret_value; - } - - bool wait_participant_undiscovery( - std::chrono::seconds timeout = std::chrono::seconds::zero()) - { - bool ret_value = true; - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Reader is waiting undiscovery..." << std::endl; - - if (timeout == std::chrono::seconds::zero()) - { - cvDiscovery_.wait(lock, [&]() - { - return participant_matched_ == 0; - }); - } - else - { - if (!cvDiscovery_.wait_for(lock, timeout, [&]() - { - return participant_matched_ == 0; - })) - { - ret_value = false; - } - } - - if (ret_value) - { - std::cout << "Reader undiscovery finished successfully..." << std::endl; - } - else - { - std::cout << "Reader undiscovery finished unsuccessfully..." << std::endl; - } - - return ret_value; - } - - void wait_writer_undiscovery( - unsigned int matched = 0) - { - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Reader is waiting removal..." << std::endl; - - cvDiscovery_.wait(lock, [&]() - { - return matched_ <= matched; - }); - - std::cout << "Reader removal finished..." << std::endl; - } - - void wait_liveliness_recovered( - unsigned int times = 1) - { - std::unique_lock lock(liveliness_mutex_); - - liveliness_cv_.wait(lock, [&]() - { - return times_liveliness_recovered_ >= times; - }); - } - - void wait_liveliness_lost( - unsigned int times = 1) - { - std::unique_lock lock(liveliness_mutex_); - - liveliness_cv_.wait(lock, [&]() - { - return times_liveliness_lost_ >= times; - }); - } - -#if HAVE_SECURITY - void waitAuthorized() - { - std::unique_lock lock(mutexAuthentication_); - - std::cout << "Reader is waiting authorization..." << std::endl; - - cvAuthentication_.wait(lock, [&]() -> bool - { - return authorized_ > 0; - }); - - std::cout << "Reader authorization finished..." << std::endl; - } - - void waitUnauthorized() - { - std::unique_lock lock(mutexAuthentication_); - - std::cout << "Reader is waiting unauthorization..." << std::endl; - - cvAuthentication_.wait(lock, [&]() -> bool - { - return unauthorized_ > 0; - }); - - std::cout << "Reader unauthorization finished..." << std::endl; - } - -#endif // if HAVE_SECURITY - - size_t getReceivedCount() const - { - return current_processed_count_; - } - - eprosima::fastrtps::rtps::SequenceNumber_t get_last_sequence_received() - { - if (last_seq.empty()) - { - return eprosima::fastrtps::rtps::SequenceNumber_t(); - } - - using pair_type = typename decltype(last_seq)::value_type; - auto seq_comp = [](const pair_type& v1, const pair_type& v2) -> bool - { - return v1.second < v2.second; - }; - return std::max_element(last_seq.cbegin(), last_seq.cend(), seq_comp)->second; - } - - /*** Function to change QoS ***/ - PubSubReader& reliability( - const eprosima::fastrtps::ReliabilityQosPolicyKind kind) - { - subscriber_attr_.qos.m_reliability.kind = kind; - return *this; - } - - PubSubReader& mem_policy( - const eprosima::fastrtps::rtps::MemoryManagementPolicy mem_policy) - { - subscriber_attr_.historyMemoryPolicy = mem_policy; - return *this; - } - - PubSubReader& deadline_period( - const eprosima::fastrtps::Duration_t deadline_period) - { - subscriber_attr_.qos.m_deadline.period = deadline_period; - return *this; - } - - bool update_deadline_period( - const eprosima::fastrtps::Duration_t& deadline_period) - { - eprosima::fastrtps::SubscriberAttributes attr; - attr = subscriber_attr_; - attr.qos.m_deadline.period = deadline_period; - - return subscriber_->updateAttributes(attr); - } - - PubSubReader& liveliness_kind( - const eprosima::fastrtps::LivelinessQosPolicyKind& kind) - { - subscriber_attr_.qos.m_liveliness.kind = kind; - return *this; - } - - PubSubReader& liveliness_lease_duration( - const eprosima::fastrtps::Duration_t lease_duration) - { - subscriber_attr_.qos.m_liveliness.lease_duration = lease_duration; - return *this; - } - - PubSubReader& latency_budget_duration( - const eprosima::fastrtps::Duration_t& latency_duration) - { - subscriber_attr_.qos.m_latencyBudget.duration = latency_duration; - return *this; - } - - eprosima::fastrtps::Duration_t get_latency_budget_duration() - { - return subscriber_attr_.qos.m_latencyBudget.duration; - } - - PubSubReader& lifespan_period( - const eprosima::fastrtps::Duration_t lifespan_period) - { - subscriber_attr_.qos.m_lifespan.duration = lifespan_period; - return *this; - } - - PubSubReader& keep_duration( - const eprosima::fastrtps::Duration_t duration) - { - subscriber_attr_.qos.m_disablePositiveACKs.enabled = true; - subscriber_attr_.qos.m_disablePositiveACKs.duration = duration; - return *this; - } - - PubSubReader& history_kind( - const eprosima::fastrtps::HistoryQosPolicyKind kind) - { - subscriber_attr_.topic.historyQos.kind = kind; - return *this; - } - - PubSubReader& history_depth( - const int32_t depth) - { - subscriber_attr_.topic.historyQos.depth = depth; - return *this; - } - - PubSubReader& setup_transports( - eprosima::fastdds::rtps::BuiltinTransports transports) - { - participant_attr_.rtps.setup_transports(transports); - return *this; - } - - PubSubReader& setup_transports( - eprosima::fastdds::rtps::BuiltinTransports transports, - const eprosima::fastdds::rtps::BuiltinTransportsOptions& options) - { - participant_attr_.rtps.setup_transports(transports, options); - return *this; - } - - PubSubReader& setup_large_data_tcp( - bool v6 = false, - const uint16_t& port = 0, - const uint32_t& tcp_negotiation_timeout = 0) - { - participant_attr_.rtps.useBuiltinTransports = false; - - /* Transports configuration */ - // UDP transport for PDP over multicast - // TCP transport for EDP and application data (The listening port must to be unique for - // each participant in the same host) - uint16_t tcp_listening_port = port; - if (v6) - { - auto pdp_transport = std::make_shared(); - participant_attr_.rtps.userTransports.push_back(pdp_transport); - - auto data_transport = std::make_shared(); - data_transport->add_listener_port(tcp_listening_port); - data_transport->calculate_crc = false; - data_transport->check_crc = false; - data_transport->apply_security = false; - data_transport->enable_tcp_nodelay = true; - data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; - participant_attr_.rtps.userTransports.push_back(data_transport); - } - else - { - auto pdp_transport = std::make_shared(); - participant_attr_.rtps.userTransports.push_back(pdp_transport); - - auto data_transport = std::make_shared(); - data_transport->add_listener_port(tcp_listening_port); - data_transport->calculate_crc = false; - data_transport->check_crc = false; - data_transport->apply_security = false; - data_transport->enable_tcp_nodelay = true; - data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; - participant_attr_.rtps.userTransports.push_back(data_transport); - } - - /* Locators */ - eprosima::fastrtps::rtps::Locator_t pdp_locator; - eprosima::fastrtps::rtps::Locator_t tcp_locator; - if (v6) - { - // Define locator for PDP over multicast - pdp_locator.kind = LOCATOR_KIND_UDPv6; - eprosima::fastrtps::rtps::IPLocator::setIPv6(pdp_locator, "ff1e::ffff:efff:1"); - // Define locator for EDP and user data - tcp_locator.kind = LOCATOR_KIND_TCPv6; - eprosima::fastrtps::rtps::IPLocator::setIPv6(tcp_locator, "::"); - eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); - eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); - } - else - { - // Define locator for PDP over multicast - pdp_locator.kind = LOCATOR_KIND_UDPv4; - eprosima::fastrtps::rtps::IPLocator::setIPv4(pdp_locator, "239.255.0.1"); - // Define locator for EDP and user data - tcp_locator.kind = LOCATOR_KIND_TCPv4; - eprosima::fastrtps::rtps::IPLocator::setIPv4(tcp_locator, "0.0.0.0"); - eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); - eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); - } - - participant_attr_.rtps.builtin.metatrafficMulticastLocatorList.push_back(pdp_locator); - participant_attr_.rtps.builtin.metatrafficUnicastLocatorList.push_back(tcp_locator); - participant_attr_.rtps.defaultUnicastLocatorList.push_back(tcp_locator); - - return *this; - } - - PubSubReader& disable_builtin_transport() - { - participant_attr_.rtps.useBuiltinTransports = false; - return *this; - } - - PubSubReader& add_user_transport_to_pparams( - std::shared_ptr userTransportDescriptor) - { - participant_attr_.rtps.userTransports.push_back(userTransportDescriptor); - return *this; - } - - PubSubReader& resource_limits_allocated_samples( - const int32_t initial) - { - subscriber_attr_.topic.resourceLimitsQos.allocated_samples = initial; - return *this; - } - - PubSubReader& resource_limits_max_samples( - const int32_t max) - { - subscriber_attr_.topic.resourceLimitsQos.max_samples = max; - return *this; - } - - PubSubReader& resource_limits_max_instances( - const int32_t max) - { - subscriber_attr_.topic.resourceLimitsQos.max_instances = max; - return *this; - } - - PubSubReader& resource_limits_max_samples_per_instance( - const int32_t max) - { - subscriber_attr_.topic.resourceLimitsQos.max_samples_per_instance = max; - return *this; - } - - PubSubReader& matched_writers_allocation( - size_t initial, - size_t maximum) - { - subscriber_attr_.matched_publisher_allocation.initial = initial; - subscriber_attr_.matched_publisher_allocation.maximum = maximum; - return *this; - } - - PubSubReader& expect_no_allocs() - { - // TODO(Mcc): Add no allocations check code when feature is completely ready - return *this; - } - - PubSubReader& expect_inline_qos( - bool expect) - { - subscriber_attr_.expectsInlineQos = expect; - return *this; - } - - PubSubReader& heartbeatResponseDelay( - const int32_t secs, - const int32_t frac) - { - subscriber_attr_.times.heartbeatResponseDelay.seconds = secs; - subscriber_attr_.times.heartbeatResponseDelay.fraction(frac); - return *this; - } - - PubSubReader& unicastLocatorList( - eprosima::fastrtps::rtps::LocatorList_t unicastLocators) - { - subscriber_attr_.unicastLocatorList = unicastLocators; - return *this; - } - - PubSubReader& add_to_unicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - subscriber_attr_.unicastLocatorList.push_back(loc); - - return *this; - } - - PubSubReader& multicastLocatorList( - eprosima::fastrtps::rtps::LocatorList_t multicastLocators) - { - subscriber_attr_.multicastLocatorList = multicastLocators; - return *this; - } - - PubSubReader& add_to_multicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - subscriber_attr_.multicastLocatorList.push_back(loc); - - return *this; - } - - PubSubReader& metatraffic_unicast_locator_list( - eprosima::fastrtps::rtps::LocatorList_t unicastLocators) - { - participant_attr_.rtps.builtin.metatrafficUnicastLocatorList = unicastLocators; - return *this; - } - - PubSubReader& add_to_metatraffic_unicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - participant_attr_.rtps.builtin.metatrafficUnicastLocatorList.push_back(loc); - - return *this; - } - - PubSubReader& metatraffic_multicast_locator_list( - eprosima::fastrtps::rtps::LocatorList_t unicastLocators) - { - participant_attr_.rtps.builtin.metatrafficMulticastLocatorList = unicastLocators; - return *this; - } - - PubSubReader& add_to_metatraffic_multicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - participant_attr_.rtps.builtin.metatrafficMulticastLocatorList.push_back(loc); - - return *this; - } - - PubSubReader& set_default_unicast_locators( - const eprosima::fastrtps::rtps::LocatorList_t& locators) - { - participant_attr_.rtps.defaultUnicastLocatorList = locators; - return *this; - } - - PubSubReader& add_to_default_unicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - participant_attr_.rtps.defaultUnicastLocatorList.push_back(loc); - - return *this; - } - - PubSubReader& set_default_multicast_locators( - const eprosima::fastrtps::rtps::LocatorList_t& locators) - { - participant_attr_.rtps.defaultMulticastLocatorList = locators; - return *this; - } - - PubSubReader& add_to_default_multicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - participant_attr_.rtps.defaultMulticastLocatorList.push_back(loc); - - return *this; - } - - PubSubReader& initial_peers( - eprosima::fastrtps::rtps::LocatorList_t initial_peers) - { - participant_attr_.rtps.builtin.initialPeersList = initial_peers; - return *this; - } - - PubSubReader& ignore_participant_flags( - eprosima::fastrtps::rtps::ParticipantFilteringFlags_t flags) - { - participant_attr_.rtps.builtin.discovery_config.ignoreParticipantFlags = flags; - return *this; - } - - PubSubReader& socket_buffer_size( - uint32_t sockerBufferSize) - { - participant_attr_.rtps.listenSocketBufferSize = sockerBufferSize; - return *this; - } - - PubSubReader& durability_kind( - const eprosima::fastrtps::DurabilityQosPolicyKind kind) - { - subscriber_attr_.qos.m_durability.kind = kind; - return *this; - } - - PubSubReader& static_discovery( - const char* filename) - { - participant_attr_.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = false; - participant_attr_.rtps.builtin.discovery_config.use_STATIC_EndpointDiscoveryProtocol = true; - participant_attr_.rtps.builtin.discovery_config.static_edp_xml_config(filename); - return *this; - } - - PubSubReader& use_writer_liveliness_protocol( - bool use_wlp) - { - participant_attr_.rtps.builtin.use_WriterLivelinessProtocol = use_wlp; - return *this; - } - - PubSubReader& setSubscriberIDs( - uint8_t UserID, - uint8_t EntityID) - { - subscriber_attr_.setUserDefinedID(UserID); - subscriber_attr_.setEntityID(EntityID); - return *this; - - } - - PubSubReader& setManualTopicName( - std::string topicName) - { - subscriber_attr_.topic.topicName = topicName; - return *this; - } - - PubSubReader& disable_multicast( - int32_t participantId) - { - participant_attr_.rtps.participantID = participantId; - - eprosima::fastrtps::rtps::LocatorList_t default_unicast_locators; - eprosima::fastrtps::rtps::Locator_t default_unicast_locator; - eprosima::fastrtps::rtps::Locator_t loopback_locator; - if (!use_udpv4) - { - default_unicast_locator.kind = LOCATOR_KIND_UDPv6; - loopback_locator.kind = LOCATOR_KIND_UDPv6; - } - - default_unicast_locators.push_back(default_unicast_locator); - participant_attr_.rtps.builtin.metatrafficUnicastLocatorList = default_unicast_locators; - - if (!IPLocator::setIPv4(loopback_locator, 127, 0, 0, 1)) - { - IPLocator::setIPv6(loopback_locator, "::1"); - } - participant_attr_.rtps.builtin.initialPeersList.push_back(loopback_locator); - return *this; - } - - PubSubReader& avoid_builtin_multicast( - bool value) - { - participant_attr_.rtps.builtin.avoid_builtin_multicast = value; - return *this; - } - - PubSubReader& property_policy( - const eprosima::fastrtps::rtps::PropertyPolicy property_policy) - { - participant_attr_.rtps.properties = property_policy; - return *this; - } - - PubSubReader& entity_property_policy( - const eprosima::fastrtps::rtps::PropertyPolicy property_policy) - { - subscriber_attr_.properties = property_policy; - return *this; - } - - PubSubReader& partition( - const std::string& partition) - { - subscriber_attr_.qos.m_partition.push_back(partition.c_str()); - return *this; - } - - PubSubReader& userData( - std::vector user_data) - { - participant_attr_.rtps.userData = user_data; - return *this; - } - - PubSubReader& lease_duration( - eprosima::fastrtps::Duration_t lease_duration, - eprosima::fastrtps::Duration_t announce_period) - { - participant_attr_.rtps.builtin.discovery_config.leaseDuration = lease_duration; - participant_attr_.rtps.builtin.discovery_config.leaseDuration_announcementperiod = announce_period; - return *this; - } - - PubSubReader& initial_announcements( - uint32_t count, - const eprosima::fastrtps::Duration_t& period) - { - participant_attr_.rtps.builtin.discovery_config.initial_announcements.count = count; - participant_attr_.rtps.builtin.discovery_config.initial_announcements.period = period; - return *this; - } - - PubSubReader& ownership_exclusive() - { - subscriber_attr_.qos.m_ownership.kind = eprosima::fastdds::dds::EXCLUSIVE_OWNERSHIP_QOS; - return *this; - } - - PubSubReader& load_participant_attr( - const std::string& xml) - { - std::unique_ptr root; - if (eprosima::fastrtps::xmlparser::XMLParser::loadXML(xml.data(), xml.size(), - root) == eprosima::fastrtps::xmlparser::XMLP_ret::XML_OK) - { - for (const auto& profile : root->getChildren()) - { - if (profile->getType() == eprosima::fastrtps::xmlparser::NodeType::PARTICIPANT) - { - participant_attr_ = - *(dynamic_cast - *>( - profile.get())->get()); - } - } - } - return *this; - } - - PubSubReader& load_subscriber_attr( - const std::string& xml) - { - std::unique_ptr root; - if (eprosima::fastrtps::xmlparser::XMLParser::loadXML(xml.data(), xml.size(), - root) == eprosima::fastrtps::xmlparser::XMLP_ret::XML_OK) - { - for (const auto& profile : root->getChildren()) - { - if (profile->getType() == eprosima::fastrtps::xmlparser::NodeType::SUBSCRIBER) - { - subscriber_attr_ = - *(dynamic_cast - *>( - profile.get())->get()); - } - } - } - return *this; - } - - PubSubReader& max_initial_peers_range( - uint32_t maxInitialPeerRange) - { - participant_attr_.rtps.useBuiltinTransports = false; - std::shared_ptr descriptor; - if (use_udpv4) - { - descriptor = std::make_shared(); - } - else - { - descriptor = std::make_shared(); - } - descriptor->maxInitialPeersRange = maxInitialPeerRange; - participant_attr_.rtps.userTransports.push_back(descriptor); - return *this; - } - - PubSubReader& guid_prefix( - const eprosima::fastrtps::rtps::GuidPrefix_t& prefix) - { - participant_attr_.rtps.prefix = prefix; - return *this; - } - - PubSubReader& participant_id( - int32_t participantId) - { - participant_attr_.rtps.participantID = participantId; - return *this; - } - - PubSubReader& user_data_max_size( - size_t size) - { - participant_attr_.rtps.allocation.data_limits.max_user_data = size; - return *this; - } - - PubSubReader& properties_max_size( - size_t max_properties) - { - participant_attr_.rtps.allocation.data_limits.max_properties = max_properties; - return *this; - } - - PubSubReader& partitions_max_size( - size_t max_partitions) - { - participant_attr_.rtps.allocation.data_limits.max_partitions = max_partitions; - return *this; - } - - bool update_partition( - const std::string& partition) - { - subscriber_attr_.qos.m_partition.clear(); - subscriber_attr_.qos.m_partition.push_back(partition.c_str()); - return subscriber_->updateAttributes(subscriber_attr_); - } - - bool clear_partitions() - { - subscriber_attr_.qos.m_partition.clear(); - return subscriber_->updateAttributes(subscriber_attr_); - } - - /*** Function for discovery callback ***/ - - void wait_discovery_result() - { - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Reader is waiting discovery result..." << std::endl; - - cvDiscovery_.wait(lock, [&]() - { - return discovery_result_; - }); - - std::cout << "Reader gets discovery result..." << std::endl; - } - - void setOnDiscoveryFunction( - std::function f) - { - onDiscovery_ = f; - } - - void setOnEndpointDiscoveryFunction( - std::function f) - { - onEndpointDiscovery_ = f; - } - - bool take_first_data( - void* data) - { - return takeNextData(data); - } - - bool takeNextData( - void* data) - { - eprosima::fastrtps::SampleInfo_t info; - if (subscriber_->takeNextData(data, &info)) - { - current_processed_count_++; - return true; - } - return false; - } - - unsigned int missed_deadlines() const - { - return listener_.missed_deadlines(); - } - - void liveliness_lost() - { - std::unique_lock lock(liveliness_mutex_); - times_liveliness_lost_++; - liveliness_cv_.notify_one(); - } - - void liveliness_recovered() - { - std::unique_lock lock(liveliness_mutex_); - times_liveliness_recovered_++; - liveliness_cv_.notify_one(); - } - - void set_liveliness_changed_status( - const eprosima::fastrtps::LivelinessChangedStatus& status) - { - std::unique_lock lock(liveliness_mutex_); - - liveliness_changed_status_ = status; - } - - unsigned int times_liveliness_lost() - { - std::unique_lock lock(liveliness_mutex_); - - return times_liveliness_lost_; - } - - unsigned int times_liveliness_recovered() - { - std::unique_lock lock(liveliness_mutex_); - - return times_liveliness_recovered_; - } - - const eprosima::fastrtps::LivelinessChangedStatus& liveliness_changed_status() - { - std::unique_lock lock(liveliness_mutex_); - - return liveliness_changed_status_; - } - - bool is_matched() const - { - return matched_ > 0; - } - - unsigned int get_matched() const - { - return matched_; - } - - void set_xml_filename( - const std::string& name) - { - xml_file_ = name; - } - - void set_participant_profile( - const std::string& profile) - { - participant_profile_ = profile; - } - - const eprosima::fastrtps::rtps::GUID_t& participant_guid() const - { - return participant_guid_; - } - - const eprosima::fastrtps::rtps::GUID_t& datareader_guid() const - { - return subscriber_guid_; - } - -private: - - void receive_one( - eprosima::fastrtps::Subscriber* subscriber, - bool& returnedValue) - { - returnedValue = false; - type data; - eprosima::fastrtps::SampleInfo_t info; - - bool success = take_ ? - subscriber->takeNextData((void*)&data, &info) : - subscriber->readNextData((void*)&data, &info); - if (success) - { - returnedValue = true; - - std::unique_lock lock(mutex_); - - // Check order of changes. - ASSERT_LT(last_seq[info.iHandle], info.sample_identity.sequence_number()); - last_seq[info.iHandle] = info.sample_identity.sequence_number(); - - if (info.sampleKind == eprosima::fastrtps::rtps::ALIVE) - { - auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data); - ASSERT_NE(it, total_msgs_.end()); - total_msgs_.erase(it); - ++current_processed_count_; - default_receive_print(data); - cv_.notify_one(); - } - } - } - - void participant_matched() - { - std::unique_lock lock(mutexDiscovery_); - ++participant_matched_; - cvDiscovery_.notify_one(); - } - - void participant_unmatched() - { - std::unique_lock lock(mutexDiscovery_); - --participant_matched_; - cvDiscovery_.notify_one(); - } - - void matched() - { - std::unique_lock lock(mutexDiscovery_); - ++matched_; - cvDiscovery_.notify_one(); - } - - void unmatched() - { - std::unique_lock lock(mutexDiscovery_); - --matched_; - cvDiscovery_.notify_one(); - } - -#if HAVE_SECURITY - void authorized() - { - mutexAuthentication_.lock(); - ++authorized_; - mutexAuthentication_.unlock(); - cvAuthentication_.notify_all(); - } - - void unauthorized() - { - mutexAuthentication_.lock(); - ++unauthorized_; - mutexAuthentication_.unlock(); - cvAuthentication_.notify_all(); - } - -#endif // if HAVE_SECURITY - - PubSubReader& operator =( - const PubSubReader&) = delete; - - eprosima::fastrtps::Participant* participant_; - eprosima::fastrtps::ParticipantAttributes participant_attr_; - eprosima::fastrtps::Subscriber* subscriber_; - eprosima::fastrtps::SubscriberAttributes subscriber_attr_; - std::string topic_name_; - eprosima::fastrtps::rtps::GUID_t participant_guid_; - eprosima::fastrtps::rtps::GUID_t subscriber_guid_; - bool initialized_; - std::list total_msgs_; - std::mutex mutex_; - std::condition_variable cv_; - std::mutex mutexDiscovery_; - std::condition_variable cvDiscovery_; - std::atomic matched_; - unsigned int participant_matched_; - std::atomic receiving_; - type_support type_; - std::map last_seq; - std::atomic current_processed_count_; - std::atomic number_samples_expected_; - bool discovery_result_; - - std::string xml_file_ = ""; - std::string participant_profile_ = ""; - - std::function onDiscovery_; - std::function onEndpointDiscovery_; - - //! True to take data from history. False to read - bool take_; - -#if HAVE_SECURITY - std::mutex mutexAuthentication_; - std::condition_variable cvAuthentication_; - unsigned int authorized_; - unsigned int unauthorized_; -#endif // if HAVE_SECURITY - - //! A mutex for liveliness status - std::mutex liveliness_mutex_; - //! A condition variable to notify when liveliness was recovered - std::condition_variable liveliness_cv_; - //! Number of times liveliness was lost - unsigned int times_liveliness_lost_; - //! Number of times liveliness was recovered - unsigned int times_liveliness_recovered_; - //! The liveliness changed status - eprosima::fastrtps::LivelinessChangedStatus liveliness_changed_status_; - - //! A mutex for messages received but not yet processed by the application - std::mutex message_receive_mutex_; - //! A condition variable for messages received but not yet processed by the application - std::condition_variable message_receive_cv_; - //! Number of messages received but not yet processed by the application - std::atomic message_receive_count_; -}; - -#endif // _TEST_BLACKBOX_PUBSUBREADER_HPP_ diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp deleted file mode 100644 index 25cfb9a1be1..00000000000 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp +++ /dev/null @@ -1,1633 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file PubSubWriter.hpp - * - */ - -#ifndef _TEST_BLACKBOX_PUBSUBWRITER_HPP_ -#define _TEST_BLACKBOX_PUBSUBWRITER_HPP_ - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using eprosima::fastrtps::rtps::IPLocator; -using eprosima::fastrtps::rtps::UDPTransportDescriptor; -using eprosima::fastrtps::rtps::UDPv4TransportDescriptor; -using eprosima::fastrtps::rtps::UDPv6TransportDescriptor; - -template -class PubSubWriter -{ - class ParticipantListener : public eprosima::fastrtps::ParticipantListener - { - public: - - ParticipantListener( - PubSubWriter& writer) - : writer_(writer) - { - } - - ~ParticipantListener() - { - } - - void onParticipantDiscovery( - eprosima::fastrtps::Participant*, - eprosima::fastrtps::rtps::ParticipantDiscoveryInfo&& info) override - { - if (writer_.onDiscovery_ != nullptr) - { - writer_.discovery_result_ = writer_.onDiscovery_(info); - } - - if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT) - { - writer_.participant_matched(); - } - else if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT || - info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT) - { - writer_.participant_unmatched(); - } - } - -#if HAVE_SECURITY - void onParticipantAuthentication( - eprosima::fastrtps::Participant*, - eprosima::fastrtps::rtps::ParticipantAuthenticationInfo&& info) override - { - if (info.status == eprosima::fastrtps::rtps::ParticipantAuthenticationInfo::AUTHORIZED_PARTICIPANT) - { - writer_.authorized(); - } - else if (info.status == eprosima::fastrtps::rtps::ParticipantAuthenticationInfo::UNAUTHORIZED_PARTICIPANT) - { - writer_.unauthorized(); - } - } - -#endif // if HAVE_SECURITY - - void onSubscriberDiscovery( - eprosima::fastrtps::Participant*, - eprosima::fastrtps::rtps::ReaderDiscoveryInfo&& info) override - { - if (info.status == eprosima::fastrtps::rtps::ReaderDiscoveryInfo::DISCOVERED_READER) - { - writer_.add_reader_info(info.info); - - } - else if (info.status == eprosima::fastrtps::rtps::ReaderDiscoveryInfo::CHANGED_QOS_READER) - { - writer_.change_reader_info(info.info); - } - else if (info.status == eprosima::fastrtps::rtps::ReaderDiscoveryInfo::REMOVED_READER) - { - writer_.remove_reader_info(info.info); - } - } - - void onPublisherDiscovery( - eprosima::fastrtps::Participant*, - eprosima::fastrtps::rtps::WriterDiscoveryInfo&& info) override - { - if (info.status == eprosima::fastrtps::rtps::WriterDiscoveryInfo::DISCOVERED_WRITER) - { - writer_.add_writer_info(info.info); - } - else if (info.status == eprosima::fastrtps::rtps::WriterDiscoveryInfo::CHANGED_QOS_WRITER) - { - writer_.change_writer_info(info.info); - } - else if (info.status == eprosima::fastrtps::rtps::WriterDiscoveryInfo::REMOVED_WRITER) - { - writer_.remove_writer_info(info.info); - } - } - - private: - - ParticipantListener& operator =( - const ParticipantListener&) = delete; - - PubSubWriter& writer_; - - } - participant_listener_; - - class Listener : public eprosima::fastrtps::PublisherListener - { - public: - - Listener( - PubSubWriter& writer) - : writer_(writer) - , times_deadline_missed_(0) - , times_liveliness_lost_(0) - { - } - - ~Listener() - { - } - - void onPublicationMatched( - eprosima::fastrtps::Publisher* /*pub*/, - eprosima::fastrtps::rtps::MatchingInfo& info) override - { - if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING) - { - std::cout << "Publisher matched subscriber " << info.remoteEndpointGuid << std::endl; - writer_.matched(); - } - else - { - std::cout << "Publisher unmatched subscriber " << info.remoteEndpointGuid << std::endl; - writer_.unmatched(); - } - } - - void on_offered_deadline_missed( - eprosima::fastrtps::Publisher* pub, - const eprosima::fastrtps::OfferedDeadlineMissedStatus& status) override - { - (void)pub; - times_deadline_missed_ = status.total_count; - } - - void on_liveliness_lost( - eprosima::fastrtps::Publisher* pub, - const eprosima::fastrtps::LivelinessLostStatus& status) override - { - (void)pub; - times_liveliness_lost_ = status.total_count; - writer_.liveliness_lost(); - } - - unsigned int missed_deadlines() const - { - return times_deadline_missed_; - } - - unsigned int times_liveliness_lost() const - { - return times_liveliness_lost_; - } - - private: - - Listener& operator =( - const Listener&) = delete; - - PubSubWriter& writer_; - - //! The number of times deadline was missed - unsigned int times_deadline_missed_; - //! The number of times liveliness was lost - unsigned int times_liveliness_lost_; - - } - listener_; - -public: - - typedef TypeSupport type_support; - typedef typename type_support::type type; - - PubSubWriter( - const std::string& topic_name) - : participant_listener_(*this) - , listener_(*this) - , participant_(nullptr) - , publisher_(nullptr) - , initialized_(false) - , matched_(0) - , participant_matched_(0) - , discovery_result_(false) - , onDiscovery_(nullptr) - , times_liveliness_lost_(0) -#if HAVE_SECURITY - , authorized_(0) - , unauthorized_(0) -#endif // if HAVE_SECURITY - { - publisher_attr_.topic.topicDataType = type_.getName(); - // Generate topic name - std::ostringstream t; - t << topic_name << "_" << asio::ip::host_name() << "_" << GET_PID(); - publisher_attr_.topic.topicName = t.str(); - topic_name_ = t.str(); - publisher_attr_.topic.topicKind = - type_.m_isGetKeyDefined ? ::eprosima::fastrtps::rtps::WITH_KEY : ::eprosima::fastrtps::rtps::NO_KEY; - - // By default, memory mode is PREALLOCATED_WITH_REALLOC_MEMORY_MODE - publisher_attr_.historyMemoryPolicy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - - // By default, heartbeat period and nack response delay are 100 milliseconds. - publisher_attr_.times.heartbeatPeriod.seconds = 0; - publisher_attr_.times.heartbeatPeriod.nanosec = 100000000; - publisher_attr_.times.nackResponseDelay.seconds = 0; - publisher_attr_.times.nackResponseDelay.nanosec = 100000000; - - // Increase default max_blocking_time to 1 second, as our CI infrastructure shows some - // big CPU overhead sometimes - publisher_attr_.qos.m_reliability.max_blocking_time.seconds = 1; - publisher_attr_.qos.m_reliability.max_blocking_time.nanosec = 0; - } - - ~PubSubWriter() - { - if (participant_ != nullptr) - { - eprosima::fastrtps::Domain::removeParticipant(participant_); - } - } - - eprosima::fastrtps::Publisher& get_native_writer() const - { - return *publisher_; - } - - void init() - { - //Create participant - // Use local copies of attributes to catch #6507 issues with valgrind - eprosima::fastrtps::ParticipantAttributes participant_attr; - eprosima::fastrtps::PublisherAttributes publisher_attr; - - if (!xml_file_.empty()) - { - eprosima::fastrtps::Domain::loadXMLProfilesFile(xml_file_); - if (!participant_profile_.empty()) - { - // Need to specify ID in XML - participant_ = eprosima::fastrtps::Domain::createParticipant(participant_profile_, - &participant_listener_); - ASSERT_NE(participant_, nullptr); - participant_attr = participant_->getAttributes(); - publisher_attr = publisher_attr_; - } - } - if (participant_ == nullptr) - { - participant_attr_.domainId = (uint32_t)GET_PID() % 230; - - participant_attr = participant_attr_; - publisher_attr = publisher_attr_; - - participant_ = eprosima::fastrtps::Domain::createParticipant(participant_attr, &participant_listener_); - } - - if (participant_ != nullptr) - { - participant_guid_ = participant_->getGuid(); - - // Register type - eprosima::fastrtps::Domain::registerType(participant_, &type_); - - //Create publisher - publisher_ = eprosima::fastrtps::Domain::createPublisher(participant_, publisher_attr, &listener_); - - if (publisher_ != nullptr) - { - publisher_guid_ = publisher_->getGuid(); - std::cout << "Created publisher " << publisher_guid_ << " for topic " << - publisher_attr_.topic.topicName << std::endl; - initialized_ = true; - } - } - } - - void createPublisher() - { - if (participant_ != nullptr) - { - //Create publisher - publisher_ = eprosima::fastrtps::Domain::createPublisher(participant_, publisher_attr_, &listener_); - - if (publisher_ != nullptr) - { - publisher_guid_ = publisher_->getGuid(); - std::cout << "Created publisher " << publisher_guid_ << " for topic " << - publisher_attr_.topic.topicName << std::endl; - initialized_ = true; - return; - } - } - return; - } - - void removePublisher() - { - initialized_ = false; - eprosima::fastrtps::Domain::removePublisher(publisher_); - return; - } - - bool isInitialized() const - { - return initialized_; - } - - eprosima::fastrtps::Participant* getParticipant() - { - return participant_; - } - - void destroy() - { - if (participant_ != nullptr) - { - eprosima::fastrtps::Domain::removeParticipant(participant_); - participant_ = nullptr; - } - - initialized_ = false; - } - - void send( - std::list& msgs, - uint32_t milliseconds = 0) - { - auto it = msgs.begin(); - - while (it != msgs.end()) - { - if (publisher_->write((void*)&(*it))) - { - default_send_print(*it); - it = msgs.erase(it); - if (milliseconds > 0) - { - std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds)); - } - } - else - { - break; - } - } - } - - eprosima::fastrtps::rtps::InstanceHandle_t register_instance( - type& msg) - { - return publisher_->register_instance((void*)&msg); - } - - bool unregister_instance( - type& msg, - const eprosima::fastrtps::rtps::InstanceHandle_t& instance_handle) - { - return publisher_->unregister_instance((void*)&msg, instance_handle); - } - - bool dispose( - type& msg, - const eprosima::fastrtps::rtps::InstanceHandle_t& instance_handle) - { - return publisher_->dispose((void*)&msg, instance_handle); - } - - bool send_sample( - type& msg) - { - return publisher_->write((void*)&msg); - } - - void assert_liveliness() - { - publisher_->assert_liveliness(); - } - - void wait_discovery( - std::chrono::seconds timeout = std::chrono::seconds::zero()) - { - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Writer is waiting discovery..." << std::endl; - - if (timeout == std::chrono::seconds::zero()) - { - cv_.wait(lock, [&]() - { - return matched_ != 0; - }); - } - else - { - cv_.wait_for(lock, timeout, [&]() - { - return matched_ != 0; - }); - } - - std::cout << "Writer discovery finished..." << std::endl; - } - - void wait_discovery( - unsigned int expected_match, - std::chrono::seconds timeout = std::chrono::seconds::zero()) - { - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Writer is waiting discovery..." << std::endl; - - if (timeout == std::chrono::seconds::zero()) - { - cv_.wait(lock, [&]() - { - return matched_ == expected_match; - }); - } - else - { - cv_.wait_for(lock, timeout, [&]() - { - return matched_ == expected_match; - }); - } - - std::cout << "Writer discovery finished..." << std::endl; - } - - bool wait_participant_undiscovery( - std::chrono::seconds timeout = std::chrono::seconds::zero()) - { - bool ret_value = true; - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Writer is waiting undiscovery..." << std::endl; - - if (timeout == std::chrono::seconds::zero()) - { - cv_.wait(lock, [&]() - { - return participant_matched_ == 0; - }); - } - else - { - if (!cv_.wait_for(lock, timeout, [&]() - { - return participant_matched_ == 0; - })) - { - ret_value = false; - } - } - - if (ret_value) - { - std::cout << "Writer undiscovery finished successfully..." << std::endl; - } - else - { - std::cout << "Writer undiscovery finished unsuccessfully..." << std::endl; - } - - return ret_value; - } - - void wait_reader_undiscovery() - { - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Writer is waiting removal..." << std::endl; - - cv_.wait(lock, [&]() - { - return matched_ == 0; - }); - - std::cout << "Writer removal finished..." << std::endl; - } - - void wait_liveliness_lost( - unsigned int times = 1) - { - std::unique_lock lock(liveliness_mutex_); - liveliness_cv_.wait(lock, [&]() - { - return times_liveliness_lost_ >= times; - }); - } - - void liveliness_lost() - { - std::unique_lock lock(liveliness_mutex_); - times_liveliness_lost_++; - liveliness_cv_.notify_one(); - } - -#if HAVE_SECURITY - void waitAuthorized() - { - std::unique_lock lock(mutexAuthentication_); - - std::cout << "Writer is waiting authorization..." << std::endl; - - cvAuthentication_.wait(lock, [&]() -> bool - { - return authorized_ > 0; - }); - - std::cout << "Writer authorization finished..." << std::endl; - } - - void waitUnauthorized() - { - std::unique_lock lock(mutexAuthentication_); - - std::cout << "Writer is waiting unauthorization..." << std::endl; - - cvAuthentication_.wait(lock, [&]() -> bool - { - return unauthorized_ > 0; - }); - - std::cout << "Writer unauthorization finished..." << std::endl; - } - -#endif // if HAVE_SECURITY - - template - bool waitForAllAcked( - const std::chrono::duration<_Rep, _Period>& max_wait) - { - auto nsecs = std::chrono::duration_cast(max_wait); - auto secs = std::chrono::duration_cast(nsecs); - nsecs -= secs; - eprosima::fastrtps::Duration_t timeout {static_cast(secs.count()), - static_cast(nsecs.count())}; - return publisher_->wait_for_all_acked(timeout); - } - - void block_until_discover_topic( - const std::string& topicName, - int repeatedTimes) - { - std::unique_lock lock(mutexEntitiesInfoList_); - - cvEntitiesInfoList_.wait(lock, [&]() - { - int times = mapTopicCountList_.count(topicName) == 0 ? 0 : mapTopicCountList_[topicName]; - return times == repeatedTimes; - }); - } - - void block_until_discover_partition( - const std::string& partition, - int repeatedTimes) - { - std::unique_lock lock(mutexEntitiesInfoList_); - - cvEntitiesInfoList_.wait(lock, [&]() - { - int times = mapPartitionCountList_.count(partition) == 0 ? 0 : mapPartitionCountList_[partition]; - return times == repeatedTimes; - }); - } - - /*** Function to change QoS ***/ - PubSubWriter& reliability( - const eprosima::fastrtps::ReliabilityQosPolicyKind kind) - { - publisher_attr_.qos.m_reliability.kind = kind; - return *this; - } - - PubSubWriter& mem_policy( - const eprosima::fastrtps::rtps::MemoryManagementPolicy mem_policy) - { - publisher_attr_.historyMemoryPolicy = mem_policy; - return *this; - } - - PubSubWriter& deadline_period( - const eprosima::fastrtps::Duration_t deadline_period) - { - publisher_attr_.qos.m_deadline.period = deadline_period; - return *this; - } - - PubSubWriter& liveliness_kind( - const eprosima::fastrtps::LivelinessQosPolicyKind kind) - { - publisher_attr_.qos.m_liveliness.kind = kind; - return *this; - } - - PubSubWriter& liveliness_lease_duration( - const eprosima::fastrtps::Duration_t lease_duration) - { - publisher_attr_.qos.m_liveliness.lease_duration = lease_duration; - return *this; - } - - PubSubWriter& latency_budget_duration( - const eprosima::fastrtps::Duration_t& latency_duration) - { - publisher_attr_.qos.m_latencyBudget.duration = latency_duration; - return *this; - } - - eprosima::fastrtps::Duration_t get_latency_budget_duration() - { - return publisher_attr_.qos.m_latencyBudget.duration; - } - - PubSubWriter& liveliness_announcement_period( - const eprosima::fastrtps::Duration_t announcement_period) - { - publisher_attr_.qos.m_liveliness.announcement_period = announcement_period; - return *this; - } - - PubSubWriter& lifespan_period( - const eprosima::fastrtps::Duration_t lifespan_period) - { - publisher_attr_.qos.m_lifespan.duration = lifespan_period; - return *this; - } - - PubSubWriter& keep_duration( - const eprosima::fastrtps::Duration_t duration) - { - publisher_attr_.qos.m_disablePositiveACKs.enabled = true; - publisher_attr_.qos.m_disablePositiveACKs.duration = duration; - return *this; - } - - PubSubWriter& disable_heartbeat_piggyback( - bool value) - { - publisher_attr_.qos.disable_heartbeat_piggyback = value; - return *this; - } - - PubSubWriter& max_blocking_time( - const eprosima::fastrtps::Duration_t time) - { - publisher_attr_.qos.m_reliability.max_blocking_time = time; - return *this; - } - - PubSubWriter& add_throughput_controller_descriptor_to_pparams( - eprosima::fastdds::rtps::FlowControllerSchedulerPolicy, - uint32_t bytesPerPeriod, - uint32_t periodInMs) - { - eprosima::fastrtps::rtps::ThroughputControllerDescriptor descriptor {bytesPerPeriod, periodInMs}; - publisher_attr_.throughputController = descriptor; - - return *this; - } - - PubSubWriter& asynchronously( - const eprosima::fastrtps::PublishModeQosPolicyKind kind) - { - publisher_attr_.qos.m_publishMode.kind = kind; - return *this; - } - - PubSubWriter& history_kind( - const eprosima::fastrtps::HistoryQosPolicyKind kind) - { - publisher_attr_.topic.historyQos.kind = kind; - return *this; - } - - PubSubWriter& history_depth( - const int32_t depth) - { - publisher_attr_.topic.historyQos.depth = depth; - return *this; - } - - PubSubWriter& setup_transports( - eprosima::fastdds::rtps::BuiltinTransports transports) - { - participant_attr_.rtps.setup_transports(transports); - return *this; - } - - PubSubWriter& setup_transports( - eprosima::fastdds::rtps::BuiltinTransports transports, - const eprosima::fastdds::rtps::BuiltinTransportsOptions& options) - { - participant_attr_.rtps.setup_transports(transports, options); - return *this; - } - - PubSubWriter& setup_large_data_tcp( - bool v6 = false, - const uint16_t& port = 0, - const uint32_t& tcp_negotiation_timeout = 0) - { - participant_attr_.rtps.useBuiltinTransports = false; - - /* Transports configuration */ - // UDP transport for PDP over multicast - // TCP transport for EDP and application data (The listening port must to be unique for - // each participant in the same host) - uint16_t tcp_listening_port = port; - if (v6) - { - auto pdp_transport = std::make_shared(); - participant_attr_.rtps.userTransports.push_back(pdp_transport); - - auto data_transport = std::make_shared(); - data_transport->add_listener_port(tcp_listening_port); - data_transport->calculate_crc = false; - data_transport->check_crc = false; - data_transport->apply_security = false; - data_transport->enable_tcp_nodelay = true; - data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; - participant_attr_.rtps.userTransports.push_back(data_transport); - } - else - { - auto pdp_transport = std::make_shared(); - participant_attr_.rtps.userTransports.push_back(pdp_transport); - - auto data_transport = std::make_shared(); - data_transport->add_listener_port(tcp_listening_port); - data_transport->calculate_crc = false; - data_transport->check_crc = false; - data_transport->apply_security = false; - data_transport->enable_tcp_nodelay = true; - data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout; - participant_attr_.rtps.userTransports.push_back(data_transport); - } - - /* Locators */ - eprosima::fastrtps::rtps::Locator_t pdp_locator; - eprosima::fastrtps::rtps::Locator_t tcp_locator; - if (v6) - { - // Define locator for PDP over multicast - pdp_locator.kind = LOCATOR_KIND_UDPv6; - eprosima::fastrtps::rtps::IPLocator::setIPv6(pdp_locator, "ff1e::ffff:efff:1"); - // Define locator for EDP and user data - tcp_locator.kind = LOCATOR_KIND_TCPv6; - eprosima::fastrtps::rtps::IPLocator::setIPv6(tcp_locator, "::"); - eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); - eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); - } - else - { - // Define locator for PDP over multicast - pdp_locator.kind = LOCATOR_KIND_UDPv4; - eprosima::fastrtps::rtps::IPLocator::setIPv4(pdp_locator, "239.255.0.1"); - // Define locator for EDP and user data - tcp_locator.kind = LOCATOR_KIND_TCPv4; - eprosima::fastrtps::rtps::IPLocator::setIPv4(tcp_locator, "0.0.0.0"); - eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); - eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); - } - - participant_attr_.rtps.builtin.metatrafficMulticastLocatorList.push_back(pdp_locator); - participant_attr_.rtps.builtin.metatrafficUnicastLocatorList.push_back(tcp_locator); - participant_attr_.rtps.defaultUnicastLocatorList.push_back(tcp_locator); - - return *this; - } - - PubSubWriter& disable_builtin_transport() - { - participant_attr_.rtps.useBuiltinTransports = false; - return *this; - } - - PubSubWriter& add_user_transport_to_pparams( - std::shared_ptr userTransportDescriptor) - { - participant_attr_.rtps.userTransports.push_back(userTransportDescriptor); - return *this; - } - - PubSubWriter& durability_kind( - const eprosima::fastrtps::DurabilityQosPolicyKind kind) - { - publisher_attr_.qos.m_durability.kind = kind; - return *this; - } - - PubSubWriter& resource_limits_allocated_samples( - const int32_t initial) - { - publisher_attr_.topic.resourceLimitsQos.allocated_samples = initial; - return *this; - } - - PubSubWriter& resource_limits_max_samples( - const int32_t max) - { - publisher_attr_.topic.resourceLimitsQos.max_samples = max; - return *this; - } - - PubSubWriter& resource_limits_max_instances( - const int32_t max) - { - publisher_attr_.topic.resourceLimitsQos.max_instances = max; - return *this; - } - - PubSubWriter& resource_limits_max_samples_per_instance( - const int32_t max) - { - publisher_attr_.topic.resourceLimitsQos.max_samples_per_instance = max; - return *this; - } - - PubSubWriter& resource_limits_extra_samples( - const int32_t extra) - { - publisher_attr_.topic.resourceLimitsQos.extra_samples = extra; - return *this; - } - - PubSubWriter& matched_readers_allocation( - size_t initial, - size_t maximum) - { - publisher_attr_.matched_subscriber_allocation.initial = initial; - publisher_attr_.matched_subscriber_allocation.maximum = maximum; - return *this; - } - - PubSubWriter& expect_no_allocs() - { - // TODO(Mcc): Add no allocations check code when feature is completely ready - return *this; - } - - PubSubWriter& heartbeat_period_seconds( - int32_t sec) - { - publisher_attr_.times.heartbeatPeriod.seconds = sec; - return *this; - } - - PubSubWriter& heartbeat_period_nanosec( - uint32_t nanosec) - { - publisher_attr_.times.heartbeatPeriod.nanosec = nanosec; - return *this; - } - - PubSubWriter& unicastLocatorList( - eprosima::fastrtps::rtps::LocatorList_t unicastLocators) - { - publisher_attr_.unicastLocatorList = unicastLocators; - return *this; - } - - PubSubWriter& add_to_unicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - publisher_attr_.unicastLocatorList.push_back(loc); - - return *this; - } - - PubSubWriter& multicastLocatorList( - eprosima::fastrtps::rtps::LocatorList_t multicastLocators) - { - publisher_attr_.multicastLocatorList = multicastLocators; - return *this; - } - - PubSubWriter& add_to_multicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - publisher_attr_.multicastLocatorList.push_back(loc); - - return *this; - } - - PubSubWriter& metatraffic_unicast_locator_list( - eprosima::fastrtps::rtps::LocatorList_t unicastLocators) - { - participant_attr_.rtps.builtin.metatrafficUnicastLocatorList = unicastLocators; - return *this; - } - - PubSubWriter& add_to_metatraffic_unicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - participant_attr_.rtps.builtin.metatrafficUnicastLocatorList.push_back(loc); - - return *this; - } - - PubSubWriter& metatraffic_multicast_locator_list( - eprosima::fastrtps::rtps::LocatorList_t unicastLocators) - { - participant_attr_.rtps.builtin.metatrafficMulticastLocatorList = unicastLocators; - return *this; - } - - PubSubWriter& add_to_metatraffic_multicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - participant_attr_.rtps.builtin.metatrafficMulticastLocatorList.push_back(loc); - - return *this; - } - - PubSubWriter& set_default_unicast_locators( - const eprosima::fastrtps::rtps::LocatorList_t& locators) - { - participant_attr_.rtps.defaultUnicastLocatorList = locators; - return *this; - } - - PubSubWriter& add_to_default_unicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - participant_attr_.rtps.defaultUnicastLocatorList.push_back(loc); - - return *this; - } - - PubSubWriter& set_default_multicast_locators( - const eprosima::fastrtps::rtps::LocatorList_t& locators) - { - participant_attr_.rtps.defaultMulticastLocatorList = locators; - return *this; - } - - PubSubWriter& add_to_default_multicast_locator_list( - const std::string& ip, - uint32_t port) - { - eprosima::fastrtps::rtps::Locator_t loc; - if (!IPLocator::setIPv4(loc, ip)) - { - loc.kind = LOCATOR_KIND_UDPv6; - if (!IPLocator::setIPv6(loc, ip)) - { - return *this; - } - } - - loc.port = port; - participant_attr_.rtps.defaultMulticastLocatorList.push_back(loc); - - return *this; - } - - PubSubWriter& initial_peers( - eprosima::fastrtps::rtps::LocatorList_t initial_peers) - { - participant_attr_.rtps.builtin.initialPeersList = initial_peers; - return *this; - } - - PubSubWriter& static_discovery( - const char* filename) - { - participant_attr_.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = false; - participant_attr_.rtps.builtin.discovery_config.use_STATIC_EndpointDiscoveryProtocol = true; - participant_attr_.rtps.builtin.discovery_config.static_edp_xml_config(filename); - return *this; - } - - PubSubWriter& use_writer_liveliness_protocol( - bool use_wlp) - { - participant_attr_.rtps.builtin.use_WriterLivelinessProtocol = use_wlp; - return *this; - } - - PubSubWriter& avoid_builtin_multicast( - bool value) - { - participant_attr_.rtps.builtin.avoid_builtin_multicast = value; - return *this; - } - - PubSubWriter& property_policy( - const eprosima::fastrtps::rtps::PropertyPolicy& property_policy) - { - participant_attr_.rtps.properties = property_policy; - return *this; - } - - PubSubWriter& entity_property_policy( - const eprosima::fastrtps::rtps::PropertyPolicy& property_policy) - { - publisher_attr_.properties = property_policy; - return *this; - } - - PubSubWriter& setPublisherIDs( - uint8_t UserID, - uint8_t EntityID) - { - publisher_attr_.setUserDefinedID(UserID); - publisher_attr_.setEntityID(EntityID); - return *this; - } - - PubSubWriter& setManualTopicName( - std::string topicName) - { - publisher_attr_.topic.topicName = topicName; - return *this; - } - - PubSubWriter& disable_multicast( - int32_t participantId) - { - participant_attr_.rtps.participantID = participantId; - - eprosima::fastrtps::rtps::LocatorList_t default_unicast_locators; - eprosima::fastrtps::rtps::Locator_t default_unicast_locator; - eprosima::fastrtps::rtps::Locator_t loopback_locator; - if (!use_udpv4) - { - default_unicast_locator.kind = LOCATOR_KIND_UDPv6; - loopback_locator.kind = LOCATOR_KIND_UDPv6; - } - - default_unicast_locators.push_back(default_unicast_locator); - participant_attr_.rtps.builtin.metatrafficUnicastLocatorList = default_unicast_locators; - - if (!IPLocator::setIPv4(loopback_locator, 127, 0, 0, 1)) - { - IPLocator::setIPv6(loopback_locator, "::1"); - } - participant_attr_.rtps.builtin.initialPeersList.push_back(loopback_locator); - return *this; - } - - PubSubWriter& partition( - const std::string& partition) - { - publisher_attr_.qos.m_partition.push_back(partition.c_str()); - return *this; - } - - PubSubWriter& userData( - std::vector user_data) - { - participant_attr_.rtps.userData = user_data; - return *this; - } - - PubSubWriter& endpoint_userData( - std::vector user_data) - { - publisher_attr_.qos.m_userData = user_data; - return *this; - } - - PubSubWriter& lease_duration( - eprosima::fastrtps::Duration_t lease_duration, - eprosima::fastrtps::Duration_t announce_period) - { - participant_attr_.rtps.builtin.discovery_config.leaseDuration = lease_duration; - participant_attr_.rtps.builtin.discovery_config.leaseDuration_announcementperiod = announce_period; - return *this; - } - - PubSubWriter& initial_announcements( - uint32_t count, - const eprosima::fastrtps::Duration_t& period) - { - participant_attr_.rtps.builtin.discovery_config.initial_announcements.count = count; - participant_attr_.rtps.builtin.discovery_config.initial_announcements.period = period; - return *this; - } - - PubSubWriter& ownership_strength( - uint32_t strength) - { - publisher_attr_.qos.m_ownership.kind = eprosima::fastdds::dds::EXCLUSIVE_OWNERSHIP_QOS; - publisher_attr_.qos.m_ownershipStrength.value = strength; - return *this; - } - - PubSubWriter& load_publisher_attr( - const std::string& xml) - { - std::unique_ptr root; - if (eprosima::fastrtps::xmlparser::XMLParser::loadXML(xml.data(), xml.size(), - root) == eprosima::fastrtps::xmlparser::XMLP_ret::XML_OK) - { - for (const auto& profile : root->getChildren()) - { - if (profile->getType() == eprosima::fastrtps::xmlparser::NodeType::PUBLISHER) - { - publisher_attr_ = - *(dynamic_cast - *>( - profile.get())->get()); - } - } - } - return *this; - } - - PubSubWriter& max_initial_peers_range( - uint32_t maxInitialPeerRange) - { - participant_attr_.rtps.useBuiltinTransports = false; - std::shared_ptr descriptor; - if (use_udpv4) - { - descriptor = std::make_shared(); - } - else - { - descriptor = std::make_shared(); - } - descriptor->maxInitialPeersRange = maxInitialPeerRange; - participant_attr_.rtps.userTransports.push_back(descriptor); - return *this; - } - - PubSubWriter& socket_buffer_size( - uint32_t sockerBufferSize) - { - participant_attr_.rtps.listenSocketBufferSize = sockerBufferSize; - return *this; - } - - PubSubWriter& guid_prefix( - const eprosima::fastrtps::rtps::GuidPrefix_t& prefix) - { - participant_attr_.rtps.prefix = prefix; - return *this; - } - - PubSubWriter& participant_id( - int32_t participantId) - { - participant_attr_.rtps.participantID = participantId; - return *this; - } - - const std::string& topic_name() const - { - return topic_name_; - } - - eprosima::fastrtps::rtps::GUID_t participant_guid() - { - return participant_guid_; - } - - eprosima::fastrtps::rtps::GUID_t datawriter_guid() - { - return publisher_guid_; - } - - bool update_partition( - const std::string& partition) - { - publisher_attr_.qos.m_partition.clear(); - publisher_attr_.qos.m_partition.push_back(partition.c_str()); - return publisher_->updateAttributes(publisher_attr_); - } - - bool set_qos() - { - return publisher_->updateAttributes(publisher_attr_); - } - - bool remove_all_changes( - size_t* number_of_changes_removed) - { - return publisher_->removeAllChange(number_of_changes_removed); - } - - bool is_matched() const - { - return matched_ > 0; - } - - unsigned int get_matched() const - { - return matched_; - } - - void set_xml_filename( - const std::string& name) - { - xml_file_ = name; - } - - void set_participant_profile( - const std::string& profile) - { - participant_profile_ = profile; - } - - unsigned int missed_deadlines() const - { - return listener_.missed_deadlines(); - } - - unsigned int times_liveliness_lost() const - { - return listener_.times_liveliness_lost(); - } - -private: - - void participant_matched() - { - std::unique_lock lock(mutexDiscovery_); - ++participant_matched_; - cv_.notify_one(); - } - - void participant_unmatched() - { - std::unique_lock lock(mutexDiscovery_); - --participant_matched_; - cv_.notify_one(); - } - - void matched() - { - std::unique_lock lock(mutexDiscovery_); - ++matched_; - cv_.notify_one(); - } - - void unmatched() - { - std::unique_lock lock(mutexDiscovery_); - --matched_; - cv_.notify_one(); - } - -#if HAVE_SECURITY - void authorized() - { - mutexAuthentication_.lock(); - ++authorized_; - mutexAuthentication_.unlock(); - cvAuthentication_.notify_all(); - } - - void unauthorized() - { - mutexAuthentication_.lock(); - ++unauthorized_; - mutexAuthentication_.unlock(); - cvAuthentication_.notify_all(); - } - -#endif // if HAVE_SECURITY - - void add_writer_info( - const eprosima::fastrtps::rtps::WriterProxyData& writer_data) - { - mutexEntitiesInfoList_.lock(); - auto ret = mapWriterInfoList_.insert(std::make_pair(writer_data.guid(), writer_data)); - - if (!ret.second) - { - ret.first->second = writer_data; - } - - auto ret_topic = mapTopicCountList_.insert(std::make_pair(writer_data.topicName().to_string(), 1)); - - if (!ret_topic.second) - { - ++ret_topic.first->second; - } - - for (auto partition : writer_data.m_qos.m_partition.names()) - { - auto ret_partition = mapPartitionCountList_.insert(std::make_pair(partition, 1)); - - if (!ret_partition.second) - { - ++ret_partition.first->second; - } - } - - mutexEntitiesInfoList_.unlock(); - cvEntitiesInfoList_.notify_all(); - } - - void change_writer_info( - const eprosima::fastrtps::rtps::WriterProxyData& writer_data) - { - mutexEntitiesInfoList_.lock(); - auto ret = mapWriterInfoList_.insert(std::make_pair(writer_data.guid(), writer_data)); - - ASSERT_FALSE(ret.second); - eprosima::fastrtps::rtps::WriterProxyData old_writer_data = ret.first->second; - ret.first->second = writer_data; - - ASSERT_GT(mapTopicCountList_.count(writer_data.topicName().to_string()), 0ul); - - // Remove previous partitions - for (auto partition : old_writer_data.m_qos.m_partition.names()) - { - auto partition_it = mapPartitionCountList_.find(partition); - ASSERT_TRUE(partition_it != mapPartitionCountList_.end()); - --(*partition_it).second; - if ((*partition_it).second == 0) - { - mapPartitionCountList_.erase(partition); - } - } - - // Add new partitions - for (auto partition : writer_data.m_qos.m_partition.names()) - { - auto ret_partition = mapPartitionCountList_.insert(std::make_pair(partition, 1)); - - if (!ret_partition.second) - { - ++ret_partition.first->second; - } - } - - mutexEntitiesInfoList_.unlock(); - cvEntitiesInfoList_.notify_all(); - } - - void add_reader_info( - const eprosima::fastrtps::rtps::ReaderProxyData& reader_data) - { - mutexEntitiesInfoList_.lock(); - auto ret = mapReaderInfoList_.insert(std::make_pair(reader_data.guid(), reader_data)); - - if (!ret.second) - { - ret.first->second = reader_data; - } - - auto ret_topic = mapTopicCountList_.insert(std::make_pair(reader_data.topicName().to_string(), 1)); - - if (!ret_topic.second) - { - ++ret_topic.first->second; - } - - for (auto partition : reader_data.m_qos.m_partition.names()) - { - auto ret_partition = mapPartitionCountList_.insert(std::make_pair(partition, 1)); - - if (!ret_partition.second) - { - ++ret_partition.first->second; - } - } - - mutexEntitiesInfoList_.unlock(); - cvEntitiesInfoList_.notify_all(); - } - - void change_reader_info( - const eprosima::fastrtps::rtps::ReaderProxyData& reader_data) - { - mutexEntitiesInfoList_.lock(); - auto ret = mapReaderInfoList_.insert(std::make_pair(reader_data.guid(), reader_data)); - - ASSERT_FALSE(ret.second); - eprosima::fastrtps::rtps::ReaderProxyData old_reader_data = ret.first->second; - ret.first->second = reader_data; - - ASSERT_GT(mapTopicCountList_.count(reader_data.topicName().to_string()), 0ul); - - // Remove previous partitions - for (auto partition : old_reader_data.m_qos.m_partition.names()) - { - auto partition_it = mapPartitionCountList_.find(partition); - ASSERT_TRUE(partition_it != mapPartitionCountList_.end()); - --(*partition_it).second; - if ((*partition_it).second == 0) - { - mapPartitionCountList_.erase(partition); - } - } - - for (auto partition : reader_data.m_qos.m_partition.names()) - { - auto ret_partition = mapPartitionCountList_.insert(std::make_pair(partition, 1)); - - if (!ret_partition.second) - { - ++ret_partition.first->second; - } - } - - mutexEntitiesInfoList_.unlock(); - cvEntitiesInfoList_.notify_all(); - } - - void remove_writer_info( - const eprosima::fastrtps::rtps::WriterProxyData& writer_data) - { - std::unique_lock lock(mutexEntitiesInfoList_); - - ASSERT_GT(mapWriterInfoList_.count(writer_data.guid()), 0ul); - - mapWriterInfoList_.erase(writer_data.guid()); - - ASSERT_GT(mapTopicCountList_.count(writer_data.topicName().to_string()), 0ul); - - --mapTopicCountList_[writer_data.topicName().to_string()]; - - for (auto partition : writer_data.m_qos.m_partition.names()) - { - auto partition_it = mapPartitionCountList_.find(partition); - ASSERT_TRUE(partition_it != mapPartitionCountList_.end()); - --(*partition_it).second; - if ((*partition_it).second == 0) - { - mapPartitionCountList_.erase(partition); - } - } - - lock.unlock(); - cvEntitiesInfoList_.notify_all(); - } - - void remove_reader_info( - const eprosima::fastrtps::rtps::ReaderProxyData& reader_data) - { - std::unique_lock lock(mutexEntitiesInfoList_); - - ASSERT_GT(mapReaderInfoList_.count(reader_data.guid()), 0ul); - - mapReaderInfoList_.erase(reader_data.guid()); - - ASSERT_GT(mapTopicCountList_.count(reader_data.topicName().to_string()), 0ul); - - --mapTopicCountList_[reader_data.topicName().to_string()]; - - for (auto partition : reader_data.m_qos.m_partition.names()) - { - auto partition_it = mapPartitionCountList_.find(partition); - ASSERT_TRUE(partition_it != mapPartitionCountList_.end()); - --(*partition_it).second; - if ((*partition_it).second == 0) - { - mapPartitionCountList_.erase(partition); - } - } - - lock.unlock(); - cvEntitiesInfoList_.notify_all(); - } - - PubSubWriter& operator =( - const PubSubWriter&) = delete; - - eprosima::fastrtps::Participant* participant_; - eprosima::fastrtps::ParticipantAttributes participant_attr_; - eprosima::fastrtps::Publisher* publisher_; - eprosima::fastrtps::PublisherAttributes publisher_attr_; - std::string topic_name_; - eprosima::fastrtps::rtps::GUID_t participant_guid_; - eprosima::fastrtps::rtps::GUID_t publisher_guid_; - bool initialized_; - std::mutex mutexDiscovery_; - std::condition_variable cv_; - std::atomic matched_; - unsigned int participant_matched_; - type_support type_; - std::mutex mutexEntitiesInfoList_; - std::condition_variable cvEntitiesInfoList_; - std::map mapWriterInfoList_; - std::map mapReaderInfoList_; - std::map mapTopicCountList_; - std::map mapPartitionCountList_; - bool discovery_result_; - - std::string xml_file_ = ""; - std::string participant_profile_ = ""; - - std::function onDiscovery_; - - //! A mutex for liveliness - std::mutex liveliness_mutex_; - //! A condition variable for liveliness - std::condition_variable liveliness_cv_; - //! The number of times liveliness was lost - unsigned int times_liveliness_lost_; - -#if HAVE_SECURITY - std::mutex mutexAuthentication_; - std::condition_variable cvAuthentication_; - unsigned int authorized_; - unsigned int unauthorized_; -#endif // if HAVE_SECURITY -}; - -#endif // _TEST_BLACKBOX_PUBSUBWRITER_HPP_ diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp deleted file mode 100644 index 11621ee7e76..00000000000 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriterReader.hpp +++ /dev/null @@ -1,869 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file PubSubWriterReader.hpp - * - */ - -#ifndef _TEST_BLACKBOX_PUBSUBWRITERREADER_HPP_ -#define _TEST_BLACKBOX_PUBSUBWRITERREADER_HPP_ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -template -class PubSubWriterReader -{ - class ParticipantListener : public eprosima::fastrtps::ParticipantListener - { - public: - - ParticipantListener( - PubSubWriterReader& wreader) - : wreader_(wreader) - { - } - - ~ParticipantListener() - { - } - -#if HAVE_SECURITY - void onParticipantAuthentication( - eprosima::fastrtps::Participant*, - eprosima::fastrtps::rtps::ParticipantAuthenticationInfo&& info) override - { - if (info.status == eprosima::fastrtps::rtps::ParticipantAuthenticationInfo::AUTHORIZED_PARTICIPANT) - { - wreader_.authorized(); - } - else if (info.status == eprosima::fastrtps::rtps::ParticipantAuthenticationInfo::UNAUTHORIZED_PARTICIPANT) - { - wreader_.unauthorized(); - } - } - -#endif // if HAVE_SECURITY - void onParticipantDiscovery( - eprosima::fastrtps::Participant* participant, - eprosima::fastrtps::rtps::ParticipantDiscoveryInfo&& info) override - { - (void)participant; - - switch (info.status) - { - case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT: - info_add(discovered_participants_, info.info.m_guid); - break; - - case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT: - info_remove(discovered_participants_, info.info.m_guid); - break; - - case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT: - std::cout << "Participant " << info.info.m_guid << " has been dropped"; - info_remove(discovered_participants_, info.info.m_guid); - break; - - default: - break; - } - } - - void onSubscriberDiscovery( - eprosima::fastrtps::Participant* participant, - eprosima::fastrtps::rtps::ReaderDiscoveryInfo&& info) override - { - (void)participant; - - switch (info.status) - { - case eprosima::fastrtps::rtps::ReaderDiscoveryInfo::DISCOVERED_READER: - info_add(discovered_subscribers_, info.info.guid()); - break; - - case eprosima::fastrtps::rtps::ReaderDiscoveryInfo::REMOVED_READER: - info_remove(discovered_subscribers_, info.info.guid()); - break; - - default: - break; - } - } - - void onPublisherDiscovery( - eprosima::fastrtps::Participant* participant, - eprosima::fastrtps::rtps::WriterDiscoveryInfo&& info) override - { - (void)participant; - - switch (info.status) - { - case eprosima::fastrtps::rtps::WriterDiscoveryInfo::DISCOVERED_WRITER: - info_add(discovered_publishers_, info.info.guid()); - break; - - case eprosima::fastrtps::rtps::WriterDiscoveryInfo::REMOVED_WRITER: - info_remove(discovered_publishers_, info.info.guid()); - break; - - default: - break; - } - } - - size_t get_num_discovered_participants() const - { - std::lock_guard guard(info_mutex_); - return discovered_participants_.size(); - } - - size_t get_num_discovered_publishers() const - { - std::lock_guard guard(info_mutex_); - return discovered_publishers_.size(); - } - - size_t get_num_discovered_subscribers() const - { - std::lock_guard guard(info_mutex_); - return discovered_subscribers_.size(); - } - - private: - - //! Mutex guarding all info collections - mutable std::mutex info_mutex_; - //! The discovered participants excluding the participant this listener is listening to - std::set discovered_participants_; - //! Number of subscribers discovered - std::set discovered_subscribers_; - //! Number of publishers discovered - std::set discovered_publishers_; - - void info_add( - std::set& collection, - const eprosima::fastrtps::rtps::GUID_t& item) - { - std::lock_guard guard(info_mutex_); - collection.insert(item); - } - - void info_remove( - std::set& collection, - const eprosima::fastrtps::rtps::GUID_t& item) - { - std::lock_guard guard(info_mutex_); - collection.erase(item); - } - - //! Deleted assignment operator - ParticipantListener& operator =( - const ParticipantListener&) = delete; - //! Pointer to the pub sub writer reader - PubSubWriterReader& wreader_; - - } - participant_listener_; - - class PubListener : public eprosima::fastrtps::PublisherListener - { - public: - - PubListener( - PubSubWriterReader& wreader) - : wreader_(wreader) - { - } - - ~PubListener() - { - } - - void onPublicationMatched( - eprosima::fastrtps::Publisher* /*pub*/, - eprosima::fastrtps::rtps::MatchingInfo& info) - { - if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING) - { - wreader_.publication_matched(info); - } - else - { - wreader_.publication_unmatched(info); - } - } - - private: - - PubListener& operator =( - const PubListener&) = delete; - - PubSubWriterReader& wreader_; - - } - pub_listener_; - - class SubListener : public eprosima::fastrtps::SubscriberListener - { - public: - - SubListener( - PubSubWriterReader& wreader) - : wreader_(wreader) - { - } - - ~SubListener() - { - } - - void onNewDataMessage( - eprosima::fastrtps::Subscriber* sub) - { - ASSERT_NE(sub, nullptr); - - if (wreader_.receiving_.load()) - { - bool ret = false; - do - { - wreader_.receive_one(sub, ret); - } while (ret); - } - } - - void onSubscriptionMatched( - eprosima::fastrtps::Subscriber* /*sub*/, - eprosima::fastrtps::rtps::MatchingInfo& info) - { - if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING) - { - wreader_.subscription_matched(info); - } - else - { - wreader_.subscription_unmatched(info); - } - } - - private: - - SubListener& operator =( - const SubListener&) = delete; - - PubSubWriterReader& wreader_; - } - sub_listener_; - - friend class PubListener; - friend class SubListener; - -public: - - typedef TypeSupport type_support; - typedef typename type_support::type type; - - PubSubWriterReader( - const std::string& topic_name) - : participant_listener_(*this) - , pub_listener_(*this) - , sub_listener_(*this) - , participant_(nullptr) - , publisher_(nullptr) - , subscriber_(nullptr) - , initialized_(false) - , receiving_(false) - , current_received_count_(0) - , number_samples_expected_(0) -#if HAVE_SECURITY - , authorized_(0) - , unauthorized_(0) -#endif // if HAVE_SECURITY - { - publisher_attr_.topic.topicDataType = type_.getName(); - subscriber_attr_.topic.topicDataType = type_.getName(); - // Generate topic name - std::ostringstream t; - t << topic_name << "_" << asio::ip::host_name() << "_" << GET_PID(); - - publisher_attr_.topic.topicName = t.str(); - subscriber_attr_.topic.topicName = t.str(); - topic_name_ = t.str(); - publisher_attr_.topic.topicKind = - type_.m_isGetKeyDefined ? ::eprosima::fastrtps::rtps::WITH_KEY : ::eprosima::fastrtps::rtps::NO_KEY; - subscriber_attr_.topic.topicKind = - type_.m_isGetKeyDefined ? ::eprosima::fastrtps::rtps::WITH_KEY : ::eprosima::fastrtps::rtps::NO_KEY; - - // By default, memory mode is PREALLOCATED_WITH_REALLOC_MEMORY_MODE - publisher_attr_.historyMemoryPolicy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - subscriber_attr_.historyMemoryPolicy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - - // By default, heartbeat period and nack response delay are 100 milliseconds. - publisher_attr_.times.heartbeatPeriod.seconds = 0; - publisher_attr_.times.heartbeatPeriod.nanosec = 100000000; - publisher_attr_.times.nackResponseDelay.seconds = 0; - publisher_attr_.times.nackResponseDelay.nanosec = 100000000; - - // Increase default max_blocking_time to 1 second, as our CI infrastructure shows some - // big CPU overhead sometimes - publisher_attr_.qos.m_reliability.max_blocking_time.seconds = 1; - publisher_attr_.qos.m_reliability.max_blocking_time.nanosec = 0; - - // By default, heartbeat period delay is 100 milliseconds. - subscriber_attr_.times.heartbeatResponseDelay.seconds = 0; - subscriber_attr_.times.heartbeatResponseDelay.nanosec = 100000000; - } - - ~PubSubWriterReader() - { - if (participant_ != nullptr) - { - eprosima::fastrtps::Domain::removeParticipant(participant_); - } - } - - void init( - bool avoid_multicast = true, - uint32_t initial_pdp_count = 5) - { - //Create participant - participant_attr_.rtps.builtin.avoid_builtin_multicast = avoid_multicast; - participant_attr_.rtps.builtin.discovery_config.initial_announcements.count = initial_pdp_count; - participant_attr_.domainId = (uint32_t)GET_PID() % 230; - participant_ = eprosima::fastrtps::Domain::createParticipant(participant_attr_, &participant_listener_); - - if (participant_ != nullptr) - { - // Register type - eprosima::fastrtps::Domain::registerType(participant_, &type_); - - //Create publisher - publisher_ = eprosima::fastrtps::Domain::createPublisher(participant_, publisher_attr_, &pub_listener_); - - if (publisher_ != nullptr) - { - //Create subscribe r - subscriber_ = eprosima::fastrtps::Domain::createSubscriber(participant_, subscriber_attr_, - &sub_listener_); - - if (subscriber_ != nullptr) - { - initialized_ = true; - return; - } - } - - eprosima::fastrtps::Domain::removeParticipant(participant_); - } - } - - bool create_additional_topics( - size_t num_topics, - const char* suffix, - const eprosima::fastrtps::rtps::PropertySeq& writer_properties = eprosima::fastrtps::rtps::PropertySeq()) - { - (void)writer_properties; - - bool ret_val = initialized_; - if (ret_val) - { - std::string topic_name = publisher_attr_.topic.topicName.c_str(); - - for (size_t i = 0; ret_val && (i < num_topics); i++) - { - topic_name += suffix; - publisher_attr_.topic.topicName = topic_name; - ret_val &= - nullptr != eprosima::fastrtps::Domain::createPublisher(participant_, publisher_attr_, - &pub_listener_); - } - - topic_name = subscriber_attr_.topic.topicName.c_str(); - - for (size_t i = 0; ret_val && (i < num_topics); i++) - { - topic_name += suffix; - subscriber_attr_.topic.topicName = topic_name; - ret_val &= - nullptr != eprosima::fastrtps::Domain::createSubscriber(participant_, subscriber_attr_, - &sub_listener_); - } - } - - return ret_val; - } - - bool isInitialized() const - { - return initialized_; - } - - void destroy() - { - if (participant_ != nullptr) - { - eprosima::fastrtps::Domain::removeParticipant(participant_); - participant_ = nullptr; - } - } - - void send( - std::list& msgs) - { - auto it = msgs.begin(); - - while (it != msgs.end()) - { - if (publisher_->write((void*)&(*it))) - { - default_send_print(*it); - it = msgs.erase(it); - } - else - { - break; - } - } - } - - std::list data_not_received() - { - std::unique_lock lock(mutex_); - return total_msgs_; - } - - void startReception( - std::list& msgs) - { - mutex_.lock(); - total_msgs_ = msgs; - number_samples_expected_ = total_msgs_.size(); - current_received_count_ = 0; - mutex_.unlock(); - - bool ret = false; - do - { - receive_one(subscriber_, ret); - } - while (ret); - - receiving_.store(true); - } - - void stopReception() - { - receiving_.store(false); - } - - void block_for_all() - { - block([this]() -> bool - { - return number_samples_expected_ == current_received_count_; - }); - } - - void block( - std::function checker) - { - std::unique_lock lock(mutex_); - cv_.wait(lock, checker); - } - - void wait_discovery() - { - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Waiting discovery..." << std::endl; - - if (matched_readers_.size() < 1 || matched_writers_.size() < 1) - { - cvDiscovery_.wait(lock); - } - - ASSERT_GE(matched_readers_.size() + matched_writers_.size(), 2u); - std::cout << "Discovery finished..." << std::endl; - } - - void waitRemoval() - { - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Waiting removal..." << std::endl; - - if (matched_writers_.size() != 0 || matched_readers_.size() != 0) - { - cvDiscovery_.wait(lock); - } - - ASSERT_EQ(matched_readers_.size() + matched_writers_.size(), 0u); - std::cout << "Removal finished..." << std::endl; - } - -#if HAVE_SECURITY - void waitAuthorized( - unsigned int how_many = 1) - { - std::unique_lock lock(mutexAuthentication_); - - std::cout << "WReader is waiting authorization..." << std::endl; - - while (authorized_ != how_many) - { - cvAuthentication_.wait(lock); - } - - ASSERT_EQ(authorized_, how_many); - std::cout << "WReader authorization finished..." << std::endl; - } - - void waitUnauthorized( - unsigned int how_many = 1) - { - std::unique_lock lock(mutexAuthentication_); - - std::cout << "WReader is waiting unauthorization..." << std::endl; - - while (unauthorized_ != how_many) - { - cvAuthentication_.wait(lock); - } - - ASSERT_EQ(unauthorized_, how_many); - std::cout << "WReader unauthorization finished..." << std::endl; - } - -#endif // if HAVE_SECURITY - - PubSubWriterReader& pub_durability_kind( - const eprosima::fastrtps::DurabilityQosPolicyKind kind) - { - publisher_attr_.qos.m_durability.kind = kind; - return *this; - } - - PubSubWriterReader& sub_durability_kind( - const eprosima::fastrtps::DurabilityQosPolicyKind kind) - { - subscriber_attr_.qos.m_durability.kind = kind; - return *this; - } - - PubSubWriterReader& pub_reliability( - const eprosima::fastrtps::ReliabilityQosPolicyKind kind) - { - publisher_attr_.qos.m_reliability.kind = kind; - return *this; - } - - PubSubWriterReader& sub_reliability( - const eprosima::fastrtps::ReliabilityQosPolicyKind kind) - { - subscriber_attr_.qos.m_reliability.kind = kind; - return *this; - } - - PubSubWriterReader& pub_history_kind( - const eprosima::fastrtps::HistoryQosPolicyKind kind) - { - publisher_attr_.topic.historyQos.kind = kind; - return *this; - } - - PubSubWriterReader& sub_history_kind( - const eprosima::fastrtps::HistoryQosPolicyKind kind) - { - subscriber_attr_.topic.historyQos.kind = kind; - return *this; - } - - PubSubWriterReader& pub_history_depth( - const int32_t depth) - { - publisher_attr_.topic.historyQos.depth = depth; - return *this; - } - - PubSubWriterReader& sub_history_depth( - const int32_t depth) - { - subscriber_attr_.topic.historyQos.depth = depth; - return *this; - } - - PubSubWriterReader& disable_builtin_transport() - { - participant_attr_.rtps.useBuiltinTransports = false; - return *this; - } - - PubSubWriterReader& add_user_transport_to_pparams( - std::shared_ptr userTransportDescriptor) - { - participant_attr_.rtps.userTransports.push_back(userTransportDescriptor); - return *this; - } - - PubSubWriterReader& property_policy( - const eprosima::fastrtps::rtps::PropertyPolicy property_policy) - { - participant_attr_.rtps.properties = property_policy; - return *this; - } - - PubSubWriterReader& pub_property_policy( - const eprosima::fastrtps::rtps::PropertyPolicy property_policy) - { - publisher_attr_.properties = property_policy; - return *this; - } - - PubSubWriterReader& sub_property_policy( - const eprosima::fastrtps::rtps::PropertyPolicy property_policy) - { - subscriber_attr_.properties = property_policy; - return *this; - } - - PubSubWriterReader& pub_liveliness_kind( - const eprosima::fastrtps::LivelinessQosPolicyKind kind) - { - publisher_attr_.qos.m_liveliness.kind = kind; - return *this; - } - - PubSubWriterReader& sub_liveliness_kind( - const eprosima::fastrtps::LivelinessQosPolicyKind kind) - { - subscriber_attr_.qos.m_liveliness.kind = kind; - return *this; - } - - PubSubWriterReader& pub_liveliness_announcement_period( - const eprosima::fastrtps::Duration_t announcement_period) - { - publisher_attr_.qos.m_liveliness.announcement_period = announcement_period; - return *this; - } - - PubSubWriterReader& sub_liveliness_announcement_period( - const eprosima::fastrtps::Duration_t announcement_period) - { - subscriber_attr_.qos.m_liveliness.announcement_period = announcement_period; - return *this; - } - - PubSubWriterReader& pub_liveliness_lease_duration( - const eprosima::fastrtps::Duration_t lease_duration) - { - publisher_attr_.qos.m_liveliness.lease_duration = lease_duration; - return *this; - } - - PubSubWriterReader& sub_liveliness_lease_duration( - const eprosima::fastrtps::Duration_t lease_duration) - { - subscriber_attr_.qos.m_liveliness.lease_duration = lease_duration; - return *this; - } - - void assert_liveliness() - { - publisher_->assert_liveliness(); - } - - size_t get_num_discovered_participants() const - { - return participant_listener_.get_num_discovered_participants(); - } - - size_t get_num_discovered_publishers() const - { - return participant_listener_.get_num_discovered_publishers(); - } - - size_t get_num_discovered_subscribers() const - { - return participant_listener_.get_num_discovered_subscribers(); - } - - size_t get_publication_matched() - { - std::lock_guard guard(mutexDiscovery_); - return matched_writers_.size(); - } - - size_t get_subscription_matched() - { - std::lock_guard guard(mutexDiscovery_); - return matched_readers_.size(); - } - - PubSubWriterReader& add_throughput_controller_descriptor_to_pparams( - eprosima::fastdds::rtps::FlowControllerSchedulerPolicy, - uint32_t bytesPerPeriod, - uint32_t periodInMs) - { - eprosima::fastrtps::rtps::ThroughputControllerDescriptor descriptor {bytesPerPeriod, periodInMs}; - publisher_attr_.throughputController = descriptor; - - return *this; - } - - PubSubWriterReader& asynchronously( - const eprosima::fastrtps::PublishModeQosPolicyKind kind) - { - publisher_attr_.qos.m_publishMode.kind = kind; - return *this; - } - -private: - - void receive_one( - eprosima::fastrtps::Subscriber* subscriber, - bool& returnedValue) - { - returnedValue = false; - type data; - eprosima::fastrtps::SampleInfo_t info; - - if (subscriber->takeNextData((void*)&data, &info)) - { - returnedValue = true; - - std::unique_lock lock(mutex_); - - // Check order of changes. - ASSERT_LT(last_seq, info.sample_identity.sequence_number()); - last_seq = info.sample_identity.sequence_number(); - - if (info.sampleKind == eprosima::fastrtps::rtps::ALIVE) - { - auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data); - ASSERT_NE(it, total_msgs_.end()); - total_msgs_.erase(it); - ++current_received_count_; - default_receive_print(data); - cv_.notify_one(); - } - } - } - - void publication_matched( - eprosima::fastrtps::rtps::MatchingInfo& info) - { - std::lock_guard guard(mutexDiscovery_); - matched_writers_.insert(info.remoteEndpointGuid); - cvDiscovery_.notify_one(); - } - - void publication_unmatched( - eprosima::fastrtps::rtps::MatchingInfo& info) - { - std::lock_guard guard(mutexDiscovery_); - matched_writers_.erase(info.remoteEndpointGuid); - cvDiscovery_.notify_one(); - } - - void subscription_matched( - eprosima::fastrtps::rtps::MatchingInfo& info) - { - std::lock_guard guard(mutexDiscovery_); - matched_readers_.insert(info.remoteEndpointGuid); - cvDiscovery_.notify_one(); - } - - void subscription_unmatched( - eprosima::fastrtps::rtps::MatchingInfo& info) - { - std::lock_guard guard(mutexDiscovery_); - matched_readers_.erase(info.remoteEndpointGuid); - cvDiscovery_.notify_one(); - } - -#if HAVE_SECURITY - void authorized() - { - mutexAuthentication_.lock(); - ++authorized_; - mutexAuthentication_.unlock(); - cvAuthentication_.notify_all(); - } - - void unauthorized() - { - mutexAuthentication_.lock(); - ++unauthorized_; - mutexAuthentication_.unlock(); - cvAuthentication_.notify_all(); - } - -#endif // if HAVE_SECURITY - - PubSubWriterReader& operator =( - const PubSubWriterReader&) = delete; - - eprosima::fastrtps::Participant* participant_; - eprosima::fastrtps::ParticipantAttributes participant_attr_; - - eprosima::fastrtps::Publisher* publisher_; - eprosima::fastrtps::PublisherAttributes publisher_attr_; - - eprosima::fastrtps::Subscriber* subscriber_; - eprosima::fastrtps::SubscriberAttributes subscriber_attr_; - - std::string topic_name_; - bool initialized_; - std::list total_msgs_; - std::mutex mutex_; - std::condition_variable cv_; - std::mutex mutexDiscovery_; - std::condition_variable cvDiscovery_; - std::set matched_writers_; - std::set matched_readers_; - std::atomic receiving_; - type_support type_; - eprosima::fastrtps::rtps::SequenceNumber_t last_seq; - size_t current_received_count_; - size_t number_samples_expected_; -#if HAVE_SECURITY - std::mutex mutexAuthentication_; - std::condition_variable cvAuthentication_; - unsigned int authorized_; - unsigned int unauthorized_; -#endif // if HAVE_SECURITY -}; - -#endif // _TEST_BLACKBOX_PUBSUBWRITER_HPP_ diff --git a/test/blackbox/api/fastrtps_deprecated/ReqRepAsReliableHelloWorldReplier.hpp b/test/blackbox/api/fastrtps_deprecated/ReqRepAsReliableHelloWorldReplier.hpp deleted file mode 100644 index fcd3f46a4b9..00000000000 --- a/test/blackbox/api/fastrtps_deprecated/ReqRepAsReliableHelloWorldReplier.hpp +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file ReqRepAsReliableHelloWorldReplier.hpp - * - */ - -#ifndef _TEST_BLACKBOX_REQREPASRELIABLEHELLOWORLDREPLIER_HPP_ -#define _TEST_BLACKBOX_REQREPASRELIABLEHELLOWORLDREPLIER_HPP_ - -#include "ReqRepHelloWorldReplier.hpp" -#include - -class ReqRepAsReliableHelloWorldReplier : public ReqRepHelloWorldReplier -{ - public: - void configSubscriber(const std::string& suffix) - { - sattr.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - - std::ostringstream t; - - t << "ReqRepAsReliableHelloworld_" << asio::ip::host_name() << "_" << GET_PID() << "_" << suffix; - - sattr.topic.topicName = t.str(); - }; - - void configPublisher(const std::string& suffix) - { - puattr.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - - // Increase default max_blocking_time to 1 second, as our CI infrastructure shows some - // big CPU overhead sometimes - puattr.qos.m_reliability.max_blocking_time.seconds = 1; - puattr.qos.m_reliability.max_blocking_time.nanosec = 0; - - std::ostringstream t; - - t << "ReqRepAsReliableHelloworld_" << asio::ip::host_name() << "_" << GET_PID() << "_" << suffix; - - puattr.topic.topicName = t.str(); - } -}; - -#endif // _TEST_BLACKBOX_REQREPASRELIABLEHELLOWORLDREPLIER_HPP_ diff --git a/test/blackbox/api/fastrtps_deprecated/ReqRepAsReliableHelloWorldRequester.hpp b/test/blackbox/api/fastrtps_deprecated/ReqRepAsReliableHelloWorldRequester.hpp deleted file mode 100644 index d31bd8d207b..00000000000 --- a/test/blackbox/api/fastrtps_deprecated/ReqRepAsReliableHelloWorldRequester.hpp +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file ReqRepAsReliableHelloWorldRequester.hpp - * - */ - -#ifndef _TEST_BLACKBOX_REQREPASRELIABLEHELLOWORLDREQUESTER_HPP_ -#define _TEST_BLACKBOX_REQREPASRELIABLEHELLOWORLDREQUESTER_HPP_ - -#include "ReqRepHelloWorldRequester.hpp" -#include - - -#if defined(_WIN32) -#define GET_PID _getpid -#else -#define GET_PID getpid -#endif - -class ReqRepAsReliableHelloWorldRequester : public ReqRepHelloWorldRequester -{ - public: - void configSubscriber(const std::string& suffix) - { - sattr.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - - std::ostringstream t; - - t << "ReqRepAsReliableHelloworld_" << asio::ip::host_name() << "_" << GET_PID() << "_" << suffix; - - sattr.topic.topicName = t.str(); - }; - - void configPublisher(const std::string& suffix) - { - puattr.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - - // Increase default max_blocking_time to 1 second, as our CI infrastructure shows some - // big CPU overhead sometimes - puattr.qos.m_reliability.max_blocking_time.seconds = 1; - puattr.qos.m_reliability.max_blocking_time.nanosec = 0; - - std::ostringstream t; - - t << "ReqRepAsReliableHelloworld_" << asio::ip::host_name() << "_" << GET_PID() << "_" << suffix; - - puattr.topic.topicName = t.str(); - } - - ReqRepAsReliableHelloWorldRequester& durability_kind(const eprosima::fastrtps::DurabilityQosPolicyKind kind) - { - puattr.qos.m_durability.kind = kind; - sattr.qos.m_durability.kind = kind; - return *this; - } -}; - -#endif // _TEST_BLACKBOX_REQREPASRELIABLEHELLOWORLDREQUESTER_HPP_ diff --git a/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldReplier.cpp b/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldReplier.cpp deleted file mode 100644 index 80fbfbde013..00000000000 --- a/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldReplier.cpp +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file ReqRepHelloWorldReplier.cpp - * - */ - -#include "ReqRepHelloWorldReplier.hpp" - -#include -#include -#include - -#include -#include - -#include - -#include - -using namespace eprosima::fastrtps; -using namespace eprosima::fastrtps::rtps; - -ReqRepHelloWorldReplier::ReqRepHelloWorldReplier() - : request_listener_(*this) - , reply_listener_(*this) - , participant_(nullptr) - , request_subscriber_(nullptr) - , reply_publisher_(nullptr) - , initialized_(false) - , matched_(0) -{ - // By default, memory mode is PREALLOCATED_WITH_REALLOC_MEMORY_MODE - sattr.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - puattr.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; -} - -ReqRepHelloWorldReplier::~ReqRepHelloWorldReplier() -{ - if (participant_ != nullptr) - { - Domain::removeParticipant(participant_); - } -} - -void ReqRepHelloWorldReplier::init() -{ - ParticipantAttributes pattr; - pattr.domainId = (uint32_t)GET_PID() % 230; - participant_ = Domain::createParticipant(pattr); - ASSERT_NE(participant_, nullptr); - - // Register type - ASSERT_EQ(Domain::registerType(participant_, &type_), true); - - //Create subscriber - sattr.topic.topicKind = NO_KEY; - sattr.topic.topicDataType = type_.getName(); - configSubscriber("Request"); - request_subscriber_ = Domain::createSubscriber(participant_, sattr, &request_listener_); - ASSERT_NE(request_subscriber_, nullptr); - - //Create publisher - puattr.topic.topicKind = NO_KEY; - puattr.topic.topicDataType = type_.getName(); - puattr.topic.topicName = "HelloWorldTopicReply"; - configPublisher("Reply"); - reply_publisher_ = Domain::createPublisher(participant_, puattr, &reply_listener_); - ASSERT_NE(reply_publisher_, nullptr); - - initialized_ = true; -} - -void ReqRepHelloWorldReplier::newNumber( - SampleIdentity sample_identity, - uint16_t number) -{ - WriteParams wparams; - HelloWorld hello; - hello.index(number); - hello.message("GoodBye"); - wparams.related_sample_identity(sample_identity); - ASSERT_EQ(reply_publisher_->write((void*)&hello, wparams), true); -} - -void ReqRepHelloWorldReplier::wait_discovery() -{ - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Replier is waiting discovery..." << std::endl; - - cvDiscovery_.wait(lock, [&]() - { - return matched_ > 1; - }); - - std::cout << "Replier discovery finished..." << std::endl; -} - -void ReqRepHelloWorldReplier::matched() -{ - std::unique_lock lock(mutexDiscovery_); - ++matched_; - if (matched_ > 1) - { - cvDiscovery_.notify_one(); - } -} - -void ReqRepHelloWorldReplier::ReplyListener::onNewDataMessage( - Subscriber* sub) -{ - ASSERT_NE(sub, nullptr); - - HelloWorld hello; - SampleInfo_t info; - - if (sub->takeNextData((void*)&hello, &info)) - { - if (info.sampleKind == ALIVE) - { - ASSERT_EQ(hello.message().compare("HelloWorld"), 0); - replier_.newNumber(info.sample_identity, hello.index()); - } - } -} diff --git a/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldReplier.hpp b/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldReplier.hpp deleted file mode 100644 index ca4cd43998b..00000000000 --- a/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldReplier.hpp +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file ReqRepHelloWorldReplier.hpp - * - */ - -#ifndef _TEST_BLACKBOX_REQREPHELLOWORLDREPLIER_HPP_ -#define _TEST_BLACKBOX_REQREPHELLOWORLDREPLIER_HPP_ - -#include "../../types/HelloWorldPubSubTypes.h" - -#include -#include -#include -#include -#include - -#include -#include - -#if defined(_WIN32) -#define GET_PID _getpid -#include -#else -#define GET_PID getpid -#endif // if defined(_WIN32) - - - -class ReqRepHelloWorldReplier -{ -public: - - class ReplyListener : public eprosima::fastrtps::SubscriberListener - { - public: - - ReplyListener( - ReqRepHelloWorldReplier& replier) - : replier_(replier) - { - } - - ~ReplyListener() - { - } - - void onNewDataMessage( - eprosima::fastrtps::Subscriber* sub); - void onSubscriptionMatched( - eprosima::fastrtps::Subscriber* /*sub*/, - eprosima::fastrtps::rtps::MatchingInfo& info) - { - if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING) - { - replier_.matched(); - } - } - - private: - - ReplyListener& operator =( - const ReplyListener&) = delete; - - ReqRepHelloWorldReplier& replier_; - } - request_listener_; - - class RequestListener : public eprosima::fastrtps::PublisherListener - { - public: - - RequestListener( - ReqRepHelloWorldReplier& replier) - : replier_(replier) - { - } - - ~RequestListener() - { - } - - void onPublicationMatched( - eprosima::fastrtps::Publisher* /*pub*/, - eprosima::fastrtps::rtps::MatchingInfo& info) - { - if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING) - { - replier_.matched(); - } - } - - private: - - RequestListener& operator =( - const RequestListener&) = delete; - - ReqRepHelloWorldReplier& replier_; - - } - reply_listener_; - - ReqRepHelloWorldReplier(); - virtual ~ReqRepHelloWorldReplier(); - void init(); - bool isInitialized() const - { - return initialized_; - } - - void newNumber( - eprosima::fastrtps::rtps::SampleIdentity sample_identity, - uint16_t number); - void wait_discovery(); - void matched(); - virtual void configSubscriber( - const std::string& suffix) = 0; - virtual void configPublisher( - const std::string& suffix) = 0; - -protected: - - eprosima::fastrtps::SubscriberAttributes sattr; - eprosima::fastrtps::PublisherAttributes puattr; - -private: - - ReqRepHelloWorldReplier& operator =( - const ReqRepHelloWorldReplier&) = delete; - - eprosima::fastrtps::Participant* participant_; - eprosima::fastrtps::Subscriber* request_subscriber_; - eprosima::fastrtps::Publisher* reply_publisher_; - bool initialized_; - std::mutex mutexDiscovery_; - std::condition_variable cvDiscovery_; - unsigned int matched_; - HelloWorldPubSubType type_; -}; - -#endif // _TEST_BLACKBOX_REQREPHELLOWORLDREPLIER_HPP_ diff --git a/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldRequester.cpp b/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldRequester.cpp deleted file mode 100644 index 1186a444197..00000000000 --- a/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldRequester.cpp +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file ReqRepHelloWorldRequester.cpp - * - */ - -#include "ReqRepHelloWorldRequester.hpp" - -#include -#include -#include - -#include -#include - -#include - -#include - -using namespace eprosima::fastrtps; -using namespace eprosima::fastrtps::rtps; - -ReqRepHelloWorldRequester::ReqRepHelloWorldRequester() - : reply_listener_(*this) - , request_listener_(*this) - , current_number_(std::numeric_limits::max()) - , number_received_(std::numeric_limits::max()) - , participant_(nullptr) - , reply_subscriber_(nullptr) - , request_publisher_(nullptr) - , initialized_(false) - , matched_(0) -{ - // By default, memory mode is PREALLOCATED_WITH_REALLOC_MEMORY_MODE - sattr.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - puattr.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; -} - -ReqRepHelloWorldRequester::~ReqRepHelloWorldRequester() -{ - if (participant_ != nullptr) - { - Domain::removeParticipant(participant_); - } -} - -void ReqRepHelloWorldRequester::init() -{ - ParticipantAttributes pattr; - pattr.domainId = (uint32_t)GET_PID() % 230; - participant_ = Domain::createParticipant(pattr); - ASSERT_NE(participant_, nullptr); - - // Register type - ASSERT_EQ(Domain::registerType(participant_, &type_), true); - - //Create subscriber - sattr.topic.topicKind = NO_KEY; - sattr.topic.topicDataType = type_.getName(); - configSubscriber("Reply"); - reply_subscriber_ = Domain::createSubscriber(participant_, sattr, &reply_listener_); - ASSERT_NE(reply_subscriber_, nullptr); - - //Create publisher - puattr.topic.topicKind = NO_KEY; - puattr.topic.topicDataType = type_.getName(); - configPublisher("Request"); - request_publisher_ = Domain::createPublisher(participant_, puattr, &request_listener_); - ASSERT_NE(request_publisher_, nullptr); - - initialized_ = true; -} - -void ReqRepHelloWorldRequester::init_with_latency( - const eprosima::fastrtps::Duration_t& latency_budget_duration_pub, - const eprosima::fastrtps::Duration_t& latency_budget_duration_sub) -{ - ParticipantAttributes pattr; - participant_ = Domain::createParticipant(pattr); - ASSERT_NE(participant_, nullptr); - - // Register type - ASSERT_EQ(Domain::registerType(participant_, &type_), true); - - //Create subscriber - sattr.topic.topicKind = NO_KEY; - sattr.topic.topicDataType = type_.getName(); - sattr.qos.m_latencyBudget.duration = latency_budget_duration_sub; - reply_subscriber_ = Domain::createSubscriber(participant_, sattr, &reply_listener_); - ASSERT_NE(reply_subscriber_, nullptr); - - //Create publisher - puattr.topic.topicKind = NO_KEY; - puattr.topic.topicDataType = type_.getName(); - puattr.qos.m_latencyBudget.duration = latency_budget_duration_pub; - request_publisher_ = Domain::createPublisher(participant_, puattr, &request_listener_); - ASSERT_NE(request_publisher_, nullptr); - - initialized_ = true; -} - -void ReqRepHelloWorldRequester::newNumber( - SampleIdentity related_sample_identity, - uint16_t number) -{ - std::unique_lock lock(mutex_); - received_sample_identity_ = related_sample_identity; - number_received_ = number; - ASSERT_EQ(current_number_, number_received_); - if (current_number_ == number_received_) - { - cv_.notify_one(); - } -} - -void ReqRepHelloWorldRequester::block( - const std::chrono::seconds& seconds) -{ - std::unique_lock lock(mutex_); - - bool timeout = cv_.wait_for(lock, seconds, [&]() -> bool - { - return current_number_ == number_received_; - }); - - ASSERT_TRUE(timeout); - ASSERT_EQ(current_number_, number_received_); - ASSERT_EQ(related_sample_identity_, received_sample_identity_); -} - -void ReqRepHelloWorldRequester::wait_discovery() -{ - std::unique_lock lock(mutexDiscovery_); - - std::cout << "Requester is waiting discovery..." << std::endl; - - cvDiscovery_.wait(lock, [&]() - { - return matched_ > 1; - }); - - std::cout << "Requester discovery finished..." << std::endl; -} - -void ReqRepHelloWorldRequester::matched() -{ - std::unique_lock lock(mutexDiscovery_); - ++matched_; - if (matched_ > 1) - { - cvDiscovery_.notify_one(); - } -} - -void ReqRepHelloWorldRequester::ReplyListener::onNewDataMessage( - Subscriber* sub) -{ - ASSERT_NE(sub, nullptr); - - HelloWorld hello; - SampleInfo_t info; - - if (sub->takeNextData((void*)&hello, &info)) - { - if (info.sampleKind == ALIVE) - { - ASSERT_EQ(hello.message().compare("GoodBye"), 0); - requester_.newNumber(info.related_sample_identity, hello.index()); - } - } -} - -void ReqRepHelloWorldRequester::send( - const uint16_t number) -{ - WriteParams wparams; - HelloWorld hello; - hello.index(number); - hello.message("HelloWorld"); - - { - std::unique_lock lock(mutex_); - current_number_ = number; - } - - ASSERT_EQ(request_publisher_->write((void*)&hello, wparams), true); - related_sample_identity_ = wparams.sample_identity(); - ASSERT_NE(related_sample_identity_.sequence_number(), SequenceNumber_t()); -} diff --git a/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldRequester.hpp b/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldRequester.hpp deleted file mode 100644 index 54969d83e75..00000000000 --- a/test/blackbox/api/fastrtps_deprecated/ReqRepHelloWorldRequester.hpp +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file ReqRepHelloWorldRequester.hpp - * - */ - -#ifndef _TEST_BLACKBOX_REQREPHELLOWORLDREQUESTER_HPP_ -#define _TEST_BLACKBOX_REQREPHELLOWORLDREQUESTER_HPP_ - -#include "../../types/HelloWorldPubSubTypes.h" - -#include -#include -#include -#include -#include -#include -#include - -#include -#include - - -#if defined(_WIN32) -#include -#define GET_PID _getpid -#else -#define GET_PID getpid -#endif // if defined(_WIN32) - - - -class ReqRepHelloWorldRequester -{ -public: - - class ReplyListener : public eprosima::fastrtps::SubscriberListener - { - public: - - ReplyListener( - ReqRepHelloWorldRequester& requester) - : requester_(requester) - { - } - - ~ReplyListener() - { - } - - void onNewDataMessage( - eprosima::fastrtps::Subscriber* sub); - void onSubscriptionMatched( - eprosima::fastrtps::Subscriber* /*sub*/, - eprosima::fastrtps::rtps::MatchingInfo& info) - { - if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING) - { - requester_.matched(); - } - } - - private: - - ReplyListener& operator =( - const ReplyListener&) = delete; - - ReqRepHelloWorldRequester& requester_; - } - reply_listener_; - - class RequestListener : public eprosima::fastrtps::PublisherListener - { - public: - - RequestListener( - ReqRepHelloWorldRequester& requester) - : requester_(requester) - { - } - - ~RequestListener() - { - } - - void onPublicationMatched( - eprosima::fastrtps::Publisher* /*pub*/, - eprosima::fastrtps::rtps::MatchingInfo& info) - { - if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING) - { - requester_.matched(); - } - } - - private: - - RequestListener& operator =( - const RequestListener&) = delete; - - ReqRepHelloWorldRequester& requester_; - - } - request_listener_; - - ReqRepHelloWorldRequester(); - virtual ~ReqRepHelloWorldRequester(); - void init(); - void init_with_latency( - const eprosima::fastrtps::Duration_t& latency_budget_duration_pub, - const eprosima::fastrtps::Duration_t& latency_budget_duration_sub); - bool isInitialized() const - { - return initialized_; - } - - void newNumber( - eprosima::fastrtps::rtps::SampleIdentity related_sample_identity, - uint16_t number); - void block( - const std::chrono::seconds& seconds); - void wait_discovery(); - void matched(); - void send( - const uint16_t number); - - const eprosima::fastrtps::Duration_t datawriter_latency_budget_duration() const - { - return request_publisher_->getAttributes().qos.m_latencyBudget.duration; - } - - const eprosima::fastrtps::Duration_t datareader_latency_budget_duration() const - { - return reply_subscriber_->getAttributes().qos.m_latencyBudget.duration; - } - - virtual void configSubscriber( - const std::string& suffix) - { - (void) suffix; - } - - virtual void configPublisher( - const std::string& suffix) - { - (void) suffix; - } - -protected: - - eprosima::fastrtps::PublisherAttributes puattr; - eprosima::fastrtps::SubscriberAttributes sattr; - -private: - - ReqRepHelloWorldRequester& operator =( - const ReqRepHelloWorldRequester&) = delete; - - uint16_t current_number_; - uint16_t number_received_; - eprosima::fastrtps::Participant* participant_; - eprosima::fastrtps::Subscriber* reply_subscriber_; - eprosima::fastrtps::Publisher* request_publisher_; - bool initialized_; - std::mutex mutex_; - std::condition_variable cv_; - std::mutex mutexDiscovery_; - std::condition_variable cvDiscovery_; - unsigned int matched_; - HelloWorldPubSubType type_; - eprosima::fastrtps::rtps::SampleIdentity related_sample_identity_; - eprosima::fastrtps::rtps::SampleIdentity received_sample_identity_; -}; - -#endif // _TEST_BLACKBOX_REQREPHELLOWORLDREQUESTER_HPP_ diff --git a/versions.md b/versions.md index eea4008c163..6d9f2fc6223 100644 --- a/versions.md +++ b/versions.md @@ -4,6 +4,8 @@ Forthcoming * Rename project to `fastdds`. * Rename environment variable to `FASTDDS_DEFAULT_PROFILES_FILE` and rename default XML profiles file to `FASTDDS_DEFAULT_PROFILES`. * Remove API marked as deprecated. +* Removed deprecated FastRTPS API tests. +* Removed no longer supported `FASTRTPS_API_TESTS` CMake options. Version 2.14.0 --------------