From fdc3f8926684b1f7edf74ab56d219de6d395e1ad Mon Sep 17 00:00:00 2001 From: elianalf <62831776+elianalf@users.noreply.github.com> Date: Thu, 18 Jul 2024 07:32:45 +0200 Subject: [PATCH] Examples refactor: Flow Controller (#4999) * Refs #21185: Move dds/FlowControlExample in flow_control Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Refactor main, cliparser and application Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Change extension, name and guard of publisher app and subscriber app Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Add Application Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Publisher refactor Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Subscriber refactor Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Rename FlowControlExample FlowControl Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Add CLiParser Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Main refactor Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Add flags to configure FlowControllerDescriptor Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Disable data sharing and add publish function Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Publisher sends infinite number of samples Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Add samples flag to send or receive a finite number of samples Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Add bandwidth_reservation and priority property Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Update version.md Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Add test Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Uncrustify Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Apply Eduardo's suggestions Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Suggestions: Add mutex to control access to matched_ Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Suggestions: Add is_stopped condition on_data_available in xtype example Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Add XML file to set some QoS for the test Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Send User Data to share the information about the kind of writer Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Change type and regenerate type: remove unused field and add index Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Fix compilation after rebase Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Add explanation about User Data in Readme Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Fix windows warning Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Uncrustify Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Apply suggestions Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Throw error if a publication fail Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Manage receiving messages from multiple publishers Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Uncrustify Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Remove initialization Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Copy xml file in the build folder Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: Change on_data_writer_discovery after refactor Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #21185: adjust because write returns ret_code Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> --------- Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> --- examples/CMakeLists.txt | 1 + examples/cpp/dds/CMakeLists.txt | 1 - .../cpp/dds/FlowControlExample/.gitignore | 2 - .../FlowControlExample/FlowControlExample.idl | 5 - .../FlowControlExamplePubSubMain.cxx | 75 --- .../FlowControlExamplePublisher.cxx | 214 --------- .../FlowControlExamplePublisher.h | 87 ---- .../FlowControlExampleSubscriber.cxx | 154 ------ .../FlowControlExampleSubscriber.h | 87 ---- .../cpp/dds/FlowControlExample/README.txt | 32 -- examples/cpp/flow_control/Application.cpp | 57 +++ examples/cpp/flow_control/Application.hpp | 55 +++ examples/cpp/flow_control/CLIParser.hpp | 453 ++++++++++++++++++ .../CMakeLists.txt | 27 +- .../FlowControl.hpp} | 140 +++--- examples/cpp/flow_control/FlowControl.idl | 5 + .../FlowControlCdrAux.hpp} | 17 +- .../FlowControlCdrAux.ipp} | 31 +- .../FlowControlPubSubTypes.cxx} | 56 +-- .../FlowControlPubSubTypes.hpp} | 29 +- .../FlowControlTypeObjectSupport.cxx} | 108 ++--- .../FlowControlTypeObjectSupport.hpp} | 12 +- examples/cpp/flow_control/PublisherApp.cpp | 249 ++++++++++ examples/cpp/flow_control/PublisherApp.hpp | 107 +++++ examples/cpp/flow_control/README.md | 109 +++++ examples/cpp/flow_control/SubscriberApp.cpp | 242 ++++++++++ examples/cpp/flow_control/SubscriberApp.hpp | 111 +++++ .../cpp/flow_control/flow_control_profile.xml | 46 ++ examples/cpp/flow_control/main.cpp | 110 +++++ examples/cpp/xtypes/SubscriberApp.cpp | 2 +- test/examples/flow_control.compose.yml | 43 ++ test/examples/test_flow_control.py | 54 +++ versions.md | 1 + 33 files changed, 1859 insertions(+), 863 deletions(-) delete mode 100644 examples/cpp/dds/FlowControlExample/.gitignore delete mode 100644 examples/cpp/dds/FlowControlExample/FlowControlExample.idl delete mode 100644 examples/cpp/dds/FlowControlExample/FlowControlExamplePubSubMain.cxx delete mode 100644 examples/cpp/dds/FlowControlExample/FlowControlExamplePublisher.cxx delete mode 100644 examples/cpp/dds/FlowControlExample/FlowControlExamplePublisher.h delete mode 100644 examples/cpp/dds/FlowControlExample/FlowControlExampleSubscriber.cxx delete mode 100644 examples/cpp/dds/FlowControlExample/FlowControlExampleSubscriber.h delete mode 100644 examples/cpp/dds/FlowControlExample/README.txt create mode 100644 examples/cpp/flow_control/Application.cpp create mode 100644 examples/cpp/flow_control/Application.hpp create mode 100644 examples/cpp/flow_control/CLIParser.hpp rename examples/cpp/{dds/FlowControlExample => flow_control}/CMakeLists.txt (57%) rename examples/cpp/{dds/FlowControlExample/FlowControlExample.hpp => flow_control/FlowControl.hpp} (57%) create mode 100644 examples/cpp/flow_control/FlowControl.idl rename examples/cpp/{dds/FlowControlExample/FlowControlExampleCdrAux.hpp => flow_control/FlowControlCdrAux.hpp} (70%) rename examples/cpp/{dds/FlowControlExample/FlowControlExampleCdrAux.ipp => flow_control/FlowControlCdrAux.ipp} (84%) rename examples/cpp/{dds/FlowControlExample/FlowControlExamplePubSubTypes.cxx => flow_control/FlowControlPubSubTypes.cxx} (78%) rename examples/cpp/{dds/FlowControlExample/FlowControlExamplePubSubTypes.hpp => flow_control/FlowControlPubSubTypes.hpp} (81%) rename examples/cpp/{dds/FlowControlExample/FlowControlExampleTypeObjectSupport.cxx => flow_control/FlowControlTypeObjectSupport.cxx} (68%) rename examples/cpp/{dds/FlowControlExample/FlowControlExampleTypeObjectSupport.hpp => flow_control/FlowControlTypeObjectSupport.hpp} (81%) create mode 100644 examples/cpp/flow_control/PublisherApp.cpp create mode 100644 examples/cpp/flow_control/PublisherApp.hpp create mode 100644 examples/cpp/flow_control/README.md create mode 100644 examples/cpp/flow_control/SubscriberApp.cpp create mode 100644 examples/cpp/flow_control/SubscriberApp.hpp create mode 100644 examples/cpp/flow_control/flow_control_profile.xml create mode 100644 examples/cpp/flow_control/main.cpp create mode 100644 test/examples/flow_control.compose.yml create mode 100644 test/examples/test_flow_control.py diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 384448f6ca3..cfef3d94807 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -20,6 +20,7 @@ add_subdirectory(cpp/custom_payload_pool) add_subdirectory(cpp/dds) add_subdirectory(cpp/delivery_mechanisms) add_subdirectory(cpp/discovery_server) +add_subdirectory(cpp/flow_control) add_subdirectory(cpp/hello_world) add_subdirectory(cpp/request_reply) add_subdirectory(cpp/rtps) diff --git a/examples/cpp/dds/CMakeLists.txt b/examples/cpp/dds/CMakeLists.txt index 3549d8ba6e6..207dd6f14cb 100644 --- a/examples/cpp/dds/CMakeLists.txt +++ b/examples/cpp/dds/CMakeLists.txt @@ -13,7 +13,6 @@ # limitations under the License. add_subdirectory(DynamicHelloWorldExample) -add_subdirectory(FlowControlExample) add_subdirectory(HelloWorldExampleDataSharing) add_subdirectory(HelloWorldExampleSharedMem) add_subdirectory(HelloWorldExampleTCP) diff --git a/examples/cpp/dds/FlowControlExample/.gitignore b/examples/cpp/dds/FlowControlExample/.gitignore deleted file mode 100644 index ffc5570a9c3..00000000000 --- a/examples/cpp/dds/FlowControlExample/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -bin -output diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExample.idl b/examples/cpp/dds/FlowControlExample/FlowControlExample.idl deleted file mode 100644 index 10dba2168bc..00000000000 --- a/examples/cpp/dds/FlowControlExample/FlowControlExample.idl +++ /dev/null @@ -1,5 +0,0 @@ -struct FlowControlExample -{ - char message[600000]; - char wasFast; -}; diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExamplePubSubMain.cxx b/examples/cpp/dds/FlowControlExample/FlowControlExamplePubSubMain.cxx deleted file mode 100644 index f86f9ef6b5f..00000000000 --- a/examples/cpp/dds/FlowControlExample/FlowControlExamplePubSubMain.cxx +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/************************************************************************* - * @file FlowControlExamplePubSubMain.cpp - * This file acts as a main entry point to the application. - * - * This file was generated by the tool fastcdrgen. - */ - -#include "FlowControlExamplePublisher.h" -#include "FlowControlExampleSubscriber.h" - -using namespace eprosima; -using namespace eprosima::fastdds; - -int main(int argc, char** argv) -{ - std::cout << "Starting " << std::endl; - int type = 1; - if (argc > 1) - { - if (strcmp(argv[1], "publisher") == 0) - { - type = 1; - } - else if (strcmp(argv[1], "subscriber") == 0) - { - type = 2; - } - } - else - { - std::cout << "publisher OR subscriber argument needed" << std::endl; - return 0; - } - - // Register the type being used - - - switch(type) - { - case 1: - { - FlowControlExamplePublisher mypub; - if (mypub.init()) - { - mypub.run(); - } - break; - } - case 2: - { - FlowControlExampleSubscriber mysub; - if (mysub.init()) - { - mysub.run(); - } - break; - } - } - - return 0; -} diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExamplePublisher.cxx b/examples/cpp/dds/FlowControlExample/FlowControlExamplePublisher.cxx deleted file mode 100644 index 4a59468de5f..00000000000 --- a/examples/cpp/dds/FlowControlExample/FlowControlExamplePublisher.cxx +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/************************************************************************* - * @file FlowControlExamplePublisher.cpp - * This file contains the implementation of the publisher functions. - * - * This file was generated by the tool fastcdrgen. - */ - -#include "FlowControlExamplePublisher.h" - -#include -#include - -#include - -using namespace eprosima::fastdds::dds; -using namespace eprosima::fastdds::rtps; -using namespace eprosima::fastdds::rtps; - -FlowControlExamplePublisher::FlowControlExamplePublisher() - : participant_(nullptr) - , fast_publisher_(nullptr) - , slow_publisher_(nullptr) - , topic_(nullptr) - , fast_writer_(nullptr) - , slow_writer_(nullptr) - , myType(new FlowControlExamplePubSubType()) -{ -} - -FlowControlExamplePublisher::~FlowControlExamplePublisher() -{ - if (fast_writer_ != nullptr) - { - fast_publisher_->delete_datawriter(fast_writer_); - } - if (slow_writer_ != nullptr) - { - slow_publisher_->delete_datawriter(slow_writer_); - } - if (fast_publisher_ != nullptr) - { - participant_->delete_publisher(fast_publisher_); - } - if (slow_publisher_ != nullptr) - { - participant_->delete_publisher(slow_publisher_); - } - if (topic_ != nullptr) - { - participant_->delete_topic(topic_); - } - DomainParticipantFactory::get_instance()->delete_participant(participant_); -} - -bool FlowControlExamplePublisher::init() -{ - // Create Participant - DomainParticipantQos pqos; - pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastdds::c_TimeInfinite; - pqos.name("Participant_publisher"); //You can put here the name you want - - // This controller allows 300kb per second. - auto slow_flow_controller_descriptor = std::make_shared(); - slow_flow_controller_descriptor->name = "slow_flow_controller_descriptor"; - slow_flow_controller_descriptor->max_bytes_per_period = 300000; - slow_flow_controller_descriptor->period_ms = static_cast(1000); - pqos.flow_controllers().push_back(slow_flow_controller_descriptor); - - participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos); - - if (participant_ == nullptr) - { - return false; - } - - //Register the type - myType.register_type(participant_); - - // Create fast Publisher, which has no controller of its own. - fast_publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT); - - if (fast_publisher_ == nullptr) - { - return false; - } - - // Create Topic - topic_ = participant_->create_topic("FlowControlExamplePubSubTopic", myType.get_type_name(), TOPIC_QOS_DEFAULT); - - if (topic_ == nullptr) - { - return false; - } - - // Create fast DataWriter - DataWriterQos wfqos; - wfqos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE; - - fast_writer_ = fast_publisher_->create_datawriter(topic_, wfqos, &m_listener); - - if (fast_writer_ == nullptr) - { - return false; - } - std::cout << "Fast publisher created, waiting for Subscribers." << std::endl; - - // Create slow Publisher, with its own controller - slow_publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT); - - if (slow_publisher_ == nullptr) - { - return false; - } - - // Create slow DataWriter - DataWriterQos wsqos; - wsqos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE; - wsqos.publish_mode().flow_controller_name = slow_flow_controller_descriptor->name; - - slow_writer_ = slow_publisher_->create_datawriter(topic_, wsqos, &m_listener); - - if (slow_writer_ == nullptr) - { - return false; - } - std::cout << "Slow publisher created, waiting for Subscribers." << std::endl; - return true; -} - -void FlowControlExamplePublisher::PubListener::on_publication_matched( - eprosima::fastdds::dds::DataWriter*, - const eprosima::fastdds::dds::PublicationMatchedStatus& info) -{ - if (info.current_count_change == 1) - { - n_matched = info.total_count; - std::cout << "Publisher matched." << std::endl; - } - else if (info.current_count_change == -1) - { - n_matched = info.total_count; - std::cout << "Publisher unmatched." << std::endl; - } - else - { - std::cout << info.current_count_change - << " is not a valid value for PublicationMatchedStatus current count change" << std::endl; - } -} - -void FlowControlExamplePublisher::run() -{ - while (m_listener.n_matched == 0) - { - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } - - // Publication code - - FlowControlExample st; - - /* Initialize your structure here */ - - int msgsent_fast = 0; - int msgsent_slow = 0; - char ch; - std::cout << "Flow Control example." << std::endl; - std::cout << "Press \"f\" to send a sample through the fast writer, which has unlimited bandwidth" << std::endl; - std::cout << - "Press \"s\" to send a sample through the slow writer, which is also limited by its own Flow Controller" << - std::endl; - std::cout << "Press \"q\" quit" << std::endl; - while (std::cin >> ch) - { - if (ch == 'f') - { - st.wasFast(true); - fast_writer_->write(&st); ++msgsent_fast; - std::cout << "Sending sample, count=" << msgsent_fast << - " through the fast writer. Send another sample? (f-fast,s-slow,q-quit): "; - } - else if (ch == 's') - { - st.wasFast(false); - slow_writer_->write(&st); ++msgsent_slow; - std::cout << "Sending sample, count=" << msgsent_slow << - " through the slow writer. Send another sample? (f-fast,s-slow,q-quit): "; - } - else if (ch == 'q') - { - std::cout << "Finishing Flow Control example" << std::endl; - break; - } - else - { - std::cout << "Command " << ch << " not recognized, please enter \"f/s/q\":"; - } - - } -} diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExamplePublisher.h b/examples/cpp/dds/FlowControlExample/FlowControlExamplePublisher.h deleted file mode 100644 index a68a21c5039..00000000000 --- a/examples/cpp/dds/FlowControlExample/FlowControlExamplePublisher.h +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/************************************************************************* - * @file FlowControlExamplePublisher.h - * This header file contains the declaration of the publisher functions. - * - * This file was generated by the tool fastcdrgen. - */ - - -#ifndef _FLOWCONTROLEXAMPLE_PUBLISHER_H_ -#define _FLOWCONTROLEXAMPLE_PUBLISHER_H_ - -#include -#include -#include -#include -#include - -#include "FlowControlExamplePubSubTypes.hpp" - - - -class FlowControlExamplePublisher -{ -public: - - FlowControlExamplePublisher(); - - virtual ~FlowControlExamplePublisher(); - - bool init(); - - void run(); - -private: - - eprosima::fastdds::dds::DomainParticipant* participant_; - - eprosima::fastdds::dds::Publisher* fast_publisher_; - - eprosima::fastdds::dds::Publisher* slow_publisher_; - - eprosima::fastdds::dds::Topic* topic_; - - eprosima::fastdds::dds::DataWriter* fast_writer_; - - eprosima::fastdds::dds::DataWriter* slow_writer_; - - class PubListener : public eprosima::fastdds::dds::DataWriterListener - { - public: - - PubListener() - : n_matched(0) - { - } - - ~PubListener() override - { - } - - void on_publication_matched( - eprosima::fastdds::dds::DataWriter* writer, - const eprosima::fastdds::dds::PublicationMatchedStatus& info) override; - - int n_matched; - - } - m_listener; - - eprosima::fastdds::dds::TypeSupport myType; -}; - -#endif // _FLOWCONTROLEXAMPLE_PUBLISHER_H_ diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExampleSubscriber.cxx b/examples/cpp/dds/FlowControlExample/FlowControlExampleSubscriber.cxx deleted file mode 100644 index 51561bd045e..00000000000 --- a/examples/cpp/dds/FlowControlExample/FlowControlExampleSubscriber.cxx +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/************************************************************************* - * @file FlowControlExampleSubscriber.cpp - * This file contains the implementation of the subscriber functions. - * - * This file was generated by the tool fastcdrgen. - */ - -#include -#include -#include - -#include "FlowControlExampleSubscriber.h" - -using namespace eprosima::fastdds::dds; -using namespace eprosima::fastdds::rtps; - -FlowControlExampleSubscriber::FlowControlExampleSubscriber() - : participant_(nullptr) - , subscriber_(nullptr) - , topic_(nullptr) - , reader_(nullptr) - , myType(new FlowControlExamplePubSubType()) -{ -} - -FlowControlExampleSubscriber::~FlowControlExampleSubscriber() -{ - if (reader_ != nullptr) - { - subscriber_->delete_datareader(reader_); - } - if (topic_ != nullptr) - { - participant_->delete_topic(topic_); - } - if (subscriber_ != nullptr) - { - participant_->delete_subscriber(subscriber_); - } - DomainParticipantFactory::get_instance()->delete_participant(participant_); -} - -bool FlowControlExampleSubscriber::init() -{ - // Create Participant - DomainParticipantQos pqos; - pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastdds::c_TimeInfinite; - pqos.name("Participant_subscriber"); //You can put the name you want - - participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos); - - if (participant_ == nullptr) - { - return false; - } - - //Register the type - myType.register_type(participant_); - - // Create Subscriber - subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT); - - if (subscriber_ == nullptr) - { - return false; - } - - // Create Topic - topic_ = participant_->create_topic("FlowControlExamplePubSubTopic", myType.get_type_name(), TOPIC_QOS_DEFAULT); - - if (topic_ == nullptr) - { - return false; - } - - // Create DataReader - reader_ = subscriber_->create_datareader(topic_, DATAREADER_QOS_DEFAULT, &m_listener); - - if (reader_ == nullptr) - { - return false; - } - - return true; -} - -void FlowControlExampleSubscriber::SubListener::on_subscription_matched( - DataReader*, - const SubscriptionMatchedStatus& info) -{ - if (info.current_count_change == 1) - { - n_matched = info.total_count; - std::cout << "Subscriber matched." << std::endl; - } - else if (info.current_count_change == -1) - { - n_matched = info.total_count; - std::cout << "Subscriber unmatched." << std::endl; - } - else - { - std::cout << info.current_count_change - << " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl; - } -} - -void FlowControlExampleSubscriber::SubListener::on_data_available( - DataReader* reader) -{ - SampleInfo info; - FlowControlExample st; - if (reader->take_next_sample(&st, &info) == RETCODE_OK) - { - if (info.valid_data) - { - ++n_msg; - static unsigned int fastMessages = 0; - static unsigned int slowMessages = 0; - // Print your structure data here. - if (st.wasFast()) - { - fastMessages++; - std::cout << "Sample received from fast writer, count=" << fastMessages << std::endl; - } - else - { - slowMessages++; - std::cout << "Sample received from slow writer, count=" << slowMessages << std::endl; - } - } - } -} - -void FlowControlExampleSubscriber::run() -{ - std::cout << "Waiting for Data, press Enter to stop the Subscriber. " << std::endl; - std::cin.ignore(); - std::cout << "Shutting down the Subscriber." << std::endl; -} diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExampleSubscriber.h b/examples/cpp/dds/FlowControlExample/FlowControlExampleSubscriber.h deleted file mode 100644 index 9391848206e..00000000000 --- a/examples/cpp/dds/FlowControlExample/FlowControlExampleSubscriber.h +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/************************************************************************* - * @file FlowControlExampleSubscriber.h - * This header file contains the declaration of the subscriber functions. - * - * This file was generated by the tool fastcdrgen. - */ - - -#ifndef _FLOWCONTROLEXAMPLE_SUBSCRIBER_H_ -#define _FLOWCONTROLEXAMPLE_SUBSCRIBER_H_ - -#include -#include -#include -#include -#include - -#include "FlowControlExamplePubSubTypes.hpp" - -class FlowControlExampleSubscriber -{ -public: - - FlowControlExampleSubscriber(); - - virtual ~FlowControlExampleSubscriber(); - - bool init(); - - void run(); - -private: - - eprosima::fastdds::dds::DomainParticipant* participant_; - - eprosima::fastdds::dds::Subscriber* subscriber_; - - eprosima::fastdds::dds::Topic* topic_; - - eprosima::fastdds::dds::DataReader* reader_; - - class SubListener : public eprosima::fastdds::dds::DataReaderListener - { - public: - - SubListener() - : n_matched(0) - , n_msg(0) - { - } - - ~SubListener() override - { - } - - void on_data_available( - eprosima::fastdds::dds::DataReader* reader) override; - - void on_subscription_matched( - eprosima::fastdds::dds::DataReader* reader, - const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) override; - - int n_matched; - - int n_msg; - - } - m_listener; - - eprosima::fastdds::dds::TypeSupport myType; -}; - -#endif // _FlowControlExample_SUBSCRIBER_H_ diff --git a/examples/cpp/dds/FlowControlExample/README.txt b/examples/cpp/dds/FlowControlExample/README.txt deleted file mode 100644 index 10c47ecc7c0..00000000000 --- a/examples/cpp/dds/FlowControlExample/README.txt +++ /dev/null @@ -1,32 +0,0 @@ -To launch this example open two consoles: - - 1) "$ ./DDSFlowControlExample subscriber" (or "DDSFlowControlExample.exe subscriber" in Windows). - - 2..*) "$ ./DDSFlowControlExample publisher" (or "DDSFlowControlExample.exe publisher" in Windows). - -This example illustrates the flow control feature. - - ================ - = Flow Control = - ================ - -In Fast DDS, Flow Control is implemented through objects called Flow Controllers. In -particular, we will be looking at the simplest kind, the Flow Controller. - -A flow controller is univocally defined by a Flow Controller Descriptor, -which is a simple struct that includes two values: - -> A size in bytes. - -> A period in milliseconds. - -Once instantiated from this descriptor, a flow controller will make sure there is a -limit on the data it processes, so that no more than the specified size gets -through it in the specified time. In other words, it limits data throughput. - -Flow filters can be placed at different points in the system. In this example, you -can see a controller being placed on a particular Writer. Controllers allocated in this -way display a hierarchical behaviour, so in order for data to be sent, it must clear -both the Participant filter and the Writer filter, if available. - -Looking at FlowControlExamplePublisher::init(), you can see the steps involved in -adding a size filter to the publisher parameters. - diff --git a/examples/cpp/flow_control/Application.cpp b/examples/cpp/flow_control/Application.cpp new file mode 100644 index 00000000000..c3abb3edcd8 --- /dev/null +++ b/examples/cpp/flow_control/Application.cpp @@ -0,0 +1,57 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file Application.cpp + * + */ + +#include "Application.hpp" + +#include "CLIParser.hpp" +#include "SubscriberApp.hpp" +#include "PublisherApp.hpp" + +using namespace eprosima::fastdds::dds; + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace flow_control { + +//! Factory method to create a publisher or subscriber +std::shared_ptr Application::make_app( + const CLIParser::flow_control_config& config) +{ + std::shared_ptr entity; + switch (config.entity) + { + case CLIParser::EntityKind::PUBLISHER: + entity = std::make_shared(config); + break; + case CLIParser::EntityKind::SUBSCRIBER: + entity = std::make_shared(config); + break; + case CLIParser::EntityKind::UNDEFINED: + default: + throw std::runtime_error("Entity initialization failed"); + break; + } + return entity; +} + +} // namespace flow_control +} // namespace examples +} // namespace fastdds +} // namespace eprosima diff --git a/examples/cpp/flow_control/Application.hpp b/examples/cpp/flow_control/Application.hpp new file mode 100644 index 00000000000..f5956a31689 --- /dev/null +++ b/examples/cpp/flow_control/Application.hpp @@ -0,0 +1,55 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file Application.hpp + * + */ + +#ifndef FASTDDS_FLOW_CONTROL_APPLICATION_HPP +#define FASTDDS_FLOW_CONTROL_APPLICATION_HPP + +#include + +#include "CLIParser.hpp" + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace flow_control { + +class Application +{ +public: + + //! Virtual destructor + virtual ~Application() = default; + + //! Run application + virtual void run() = 0; + + //! Trigger the end of execution + virtual void stop() = 0; + + //! Factory method to create applications based on configuration + static std::shared_ptr make_app( + const CLIParser::flow_control_config& config); +}; + +} // namespace flow_control +} // namespace examples +} // namespace fastdds +} // namespace eprosima + +#endif /* FASTDDS_FLOW_CONTROL_APPLICATION_HPP */ diff --git a/examples/cpp/flow_control/CLIParser.hpp b/examples/cpp/flow_control/CLIParser.hpp new file mode 100644 index 00000000000..db39f4f4aba --- /dev/null +++ b/examples/cpp/flow_control/CLIParser.hpp @@ -0,0 +1,453 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include +#include + +#ifndef FASTDDS_FLOW_CONTROL_CLI_PARSER_HPP +#define FASTDDS_FLOW_CONTROL_CLI_PARSER_HPP + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace flow_control { + +using dds::Log; + +class CLIParser +{ +public: + + CLIParser() = delete; + + //! Entity kind enumeration + enum class EntityKind : uint8_t + { + PUBLISHER, + SUBSCRIBER, + UNDEFINED + }; + + //! Configuration structure for the application + struct flow_control_config + { + CLIParser::EntityKind entity = CLIParser::EntityKind::UNDEFINED; + uint16_t samples = 0; + uint64_t period = 500; + int32_t max_bytes_per_period = 300000; + std::string bandwidth = "0"; + std::string priority = "10"; + rtps::FlowControllerSchedulerPolicy scheduler = rtps::FlowControllerSchedulerPolicy::FIFO; + }; + + /** + * @brief Print usage help message and exit with the given return code + * + * @param return_code return code to exit with + * + * @warning This method finishes the execution of the program with the input return code + */ + static void print_help( + uint8_t return_code) + { + std::cout << "Usage: flow_control [options]" << std::endl; + std::cout << "" << std::endl; + std::cout << "Entities:" << std::endl; + std::cout << " publisher Run a publisher entity" << std::endl; + std::cout << " subscriber Run a subscriber entity" << std::endl; + std::cout << "" << std::endl; + std::cout << "Common options:" << std::endl; + std::cout << "" << std::endl; + std::cout << " -h, --help Print this help message" << std::endl; + std::cout << " -s , --samples Number of samples to send or receive" << std::endl; + std::cout << " [0 <= <= 65535]" << std::endl; + std::cout << " (Default: 0 [unlimited])" << std::endl; + std::cout << "" << std::endl; + std::cout << "Slow Publisher options:" << std::endl; + std::cout << " --max-bytes Maximum number of bytes to be sent" << std::endl; + std::cout << " per period [0 <= <= 2147483647]" << std::endl; + std::cout << " 0 = no limits." << std::endl; + std::cout << " (Default: 300kB)" << std::endl; + std::cout << " --period Period of time (ms) in which the" << std::endl; + std::cout << " flow controller is allowed" << std::endl; + std::cout << " to send the max bytes per period." << std::endl; + std::cout << " (Default: 1000ms)" << std::endl; + std::cout << " --scheduler Scheduler policy [FIFO, ROUND-ROBIN," << std::endl; + std::cout << " HIGH-PRIORITY, PRIORITY-RESERVATION]" << std::endl; + std::cout << " (Default: FIFO)" << std::endl; + std::cout << " --bandwidth Bandwidth that the DataWriter can" << std::endl; + std::cout << " request for PRIORITY_WITH_RESERVATION" << std::endl; + std::cout << " express as a percentage of the total " << std::endl; + std::cout << " flow controller limit: [0; 100]" << std::endl; + std::cout << " (Default: 0)" << std::endl; + std::cout << " --priority Priority for HIGH_PRIORITY and" << std::endl; + std::cout << " PRIORITY_WITH_RESERVATION schedulers" << std::endl; + std::cout << " [-10 (highest) ; 10 (lowest)]" << std::endl; + std::cout << " (Default: 10)" << std::endl; + std::exit(return_code); + } + + /** + * @brief Parse the command line options and return the flow_control_config object + * + * @param argc number of arguments + * @param argv array of arguments + * @return flow_control_config object with the parsed options + * + * @warning This method finishes the execution of the program if the input arguments are invalid + */ + static flow_control_config parse_cli_options( + int argc, + char* argv[]) + { + flow_control_config config; + + if (argc < 2) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "missing entity argument"); + print_help(EXIT_FAILURE); + } + + std::string first_argument = argv[1]; + + if (first_argument == "publisher" ) + { + config.entity = CLIParser::EntityKind::PUBLISHER; + } + else if (first_argument == "subscriber") + { + config.entity = CLIParser::EntityKind::SUBSCRIBER; + } + else if (first_argument == "-h" || first_argument == "--help") + { + print_help(EXIT_SUCCESS); + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing entity argument " + first_argument); + print_help(EXIT_FAILURE); + } + + for (int i = 2; i < argc; ++i) + { + std::string arg = argv[i]; + + if (arg == "-h" || arg == "--help") + { + print_help(EXIT_SUCCESS); + } + else if (arg == "-s" || arg == "--samples") + { + if (++i < argc) + { + try + { + unsigned long input = std::stoul(argv[i]); + if (input > std::numeric_limits::max()) + { + throw std::out_of_range("sample argument " + std::string( + argv[i]) + " out of range [0, 65535]."); + } + config.samples = static_cast(input); + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "invalid sample argument " + std::string( + argv[i]) + ": " + std::string(e.what())); + print_help(EXIT_FAILURE); + } + catch (const std::out_of_range& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, std::string(e.what())); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing samples argument"); + print_help(EXIT_FAILURE); + } + } + else if (arg == "--max-bytes") + { + if (config.entity == CLIParser::EntityKind::PUBLISHER) + { + if (++i < argc) + { + try + { + unsigned long input = std::stoul(argv[i]); + + if (input > static_cast(std::numeric_limits::max())) + { + throw std::out_of_range("max-bytes argument " + std::string( + argv[i]) + " out of range [0, 2147483647]."); + } + config.max_bytes_per_period = static_cast(input); + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "invalid max-bytes argument " + std::string( + argv[i]) + ": " + std::string(e.what())); + print_help(EXIT_FAILURE); + } + catch (const std::out_of_range& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, std::string(e.what())); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing max-bytes argument"); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "max-bytes argument is only valid for publisher entity"); + print_help(EXIT_FAILURE); + } + } + else if (arg == "--period") + { + if (config.entity != CLIParser::EntityKind::PUBLISHER) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "period argument is only valid for publisher entity"); + print_help(EXIT_FAILURE); + } + + if (++i < argc) + { + try + { + uint64_t input = std::stoull(argv[i]); + if (input > std::numeric_limits::max()) + { + throw std::out_of_range("period argument " + std::string( + argv[i]) + " out of range."); + } + config.period = input; + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "invalid period argument " + std::string( + argv[i]) + ": " + std::string(e.what())); + print_help(EXIT_FAILURE); + } + catch (const std::out_of_range& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, std::string(e.what())); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing period argument"); + print_help(EXIT_FAILURE); + } + + } + else if (arg == "--scheduler") + { + if (++i < argc) + { + std::string scheduler = argv[i]; + if (config.entity == CLIParser::EntityKind::PUBLISHER) + { + if (scheduler == "FIFO") + { + config.scheduler = rtps::FlowControllerSchedulerPolicy::FIFO; + } + else if (scheduler == "ROUND-ROBIN") + { + config.scheduler = rtps::FlowControllerSchedulerPolicy::ROUND_ROBIN; + } + else if (scheduler == "HIGH-PRIORITY") + { + config.scheduler = rtps::FlowControllerSchedulerPolicy::HIGH_PRIORITY; + } + else if (scheduler == "PRIORITY-RESERVATION") + { + config.scheduler = rtps::FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION; + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "unknown argument "); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "scheduler argument is only valid for publisher entity"); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "missing argument for " + arg); + print_help(EXIT_FAILURE); + } + } + else if (arg == "--bandwidth") + { + if (++i < argc) + { + try + { + unsigned long input = std::stoul(argv[i]); + if (input > 100) + { + throw std::out_of_range("bandwidth argument " + std::string( + argv[i]) + " out of range."); + } + + if (config.entity == CLIParser::EntityKind::PUBLISHER) + { + config.bandwidth = argv[i]; + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "bandwidth argument is only valid for publisher entity"); + print_help(EXIT_FAILURE); + } + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "invalid bandwidth argument " + std::string( + argv[i]) + ": " + std::string(e.what())); + print_help(EXIT_FAILURE); + } + catch (const std::out_of_range& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, std::string(e.what())); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "missing argument for " + arg); + print_help(EXIT_FAILURE); + } + } + else if (arg == "--priority") + { + if (++i < argc) + { + try + { + unsigned long input = std::stoul(argv[i]); + if (input > 10) + { + throw std::out_of_range("priority argument " + std::string( + argv[i]) + " out of range."); + } + + if (config.entity == CLIParser::EntityKind::PUBLISHER) + { + config.priority = argv[i]; + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "priority argument is only valid for publisher entity"); + print_help(EXIT_FAILURE); + } + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "invalid priority argument " + std::string( + argv[i]) + ": " + std::string(e.what())); + print_help(EXIT_FAILURE); + } + catch (const std::out_of_range& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, std::string(e.what())); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "missing argument for " + arg); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing argument: " + arg); + print_help(EXIT_FAILURE); + } + } + + return config; + } + + /** + * @brief Parse the signal number into the signal name + * + * @param signum signal number + * @return std::string signal name + */ + static std::string parse_signal( + const int& signum) + { + switch (signum) + { + case SIGINT: + return "SIGINT"; + case SIGTERM: + return "SIGTERM"; +#ifndef _WIN32 + case SIGQUIT: + return "SIGQUIT"; + case SIGHUP: + return "SIGHUP"; +#endif // _WIN32 + default: + return "UNKNOWN SIGNAL"; + } + } + + /** + * @brief Parse the entity kind into std::string + * + * @param entity entity kind + * @return std::string entity kind + */ + static std::string parse_entity_kind( + const EntityKind& entity) + { + switch (entity) + { + case EntityKind::PUBLISHER: + return "Publisher"; + case EntityKind::SUBSCRIBER: + return "Subscriber"; + case EntityKind::UNDEFINED: + default: + return "Undefined entity"; + } + } + +}; + +} // namespace flow_control +} // namespace examples +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_FLOW_CONTROL_CLI_PARSER_HPP_ diff --git a/examples/cpp/dds/FlowControlExample/CMakeLists.txt b/examples/cpp/flow_control/CMakeLists.txt similarity index 57% rename from examples/cpp/dds/FlowControlExample/CMakeLists.txt rename to examples/cpp/flow_control/CMakeLists.txt index 09fddb094bf..056172f9f8d 100644 --- a/examples/cpp/dds/FlowControlExample/CMakeLists.txt +++ b/examples/cpp/flow_control/CMakeLists.txt @@ -14,7 +14,7 @@ cmake_minimum_required(VERSION 3.20) -project(FlowControlExample VERSION 1 LANGUAGES CXX) +project(fastdds_flow_control_example VERSION 1 LANGUAGES CXX) # Find requirements if(NOT fastcdr_FOUND) @@ -42,11 +42,26 @@ message(STATUS "Configuring DDS Flow Control example...") file(GLOB DDS_FLOWCONTROL_EXAMPLE_SOURCES_CXX "*.cxx") file(GLOB DDS_FLOWCONTROL_EXAMPLE_SOURCES_CPP "*.cpp") -add_executable(DDSFlowControlExample ${DDS_FLOWCONTROL_EXAMPLE_SOURCES_CXX} ${DDS_FLOWCONTROL_EXAMPLE_SOURCES_CPP}) -target_compile_definitions(DDSFlowControlExample PRIVATE +add_executable(flow_control ${DDS_FLOWCONTROL_EXAMPLE_SOURCES_CXX} ${DDS_FLOWCONTROL_EXAMPLE_SOURCES_CPP}) +target_compile_definitions(flow_control PRIVATE $<$>,$>:__DEBUG> $<$:__INTERNALDEBUG> # Internal debug activated. ) -target_link_libraries(DDSFlowControlExample fastdds fastcdr foonathan_memory) -install(TARGETS DDSFlowControlExample - RUNTIME DESTINATION examples/cpp/dds/FlowControlExample/${BIN_INSTALL_DIR}) +target_link_libraries(flow_control fastdds fastcdr foonathan_memory) +install(TARGETS flow_control + RUNTIME DESTINATION ${DATA_INSTALL_DIR}/fastdds/examples/cpp/flow_control/${BIN_INSTALL_DIR}) + +# Copy the XML files over to the build directory +file(GLOB_RECURSE XML_FILES ${CMAKE_CURRENT_SOURCE_DIR}/*.xml) +# for each xml file detected +foreach(XML_FILE_COMPLETE_PATH ${XML_FILES}) + # obtain the file name + get_filename_component(XML_FILE ${XML_FILE_COMPLETE_PATH} NAME_WE) + # copy the file from src to build folders + configure_file( + ${XML_FILE_COMPLETE_PATH} # from full src path + ${CMAKE_CURRENT_BINARY_DIR}/${XML_FILE}.xml # to relative build path + COPYONLY) + install(FILES ${XML_FILE_COMPLETE_PATH} + DESTINATION ${DATA_INSTALL_DIR}/fastdds/examples/cpp/flow_control/${BIN_INSTALL_DIR}) +endforeach() diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExample.hpp b/examples/cpp/flow_control/FlowControl.hpp similarity index 57% rename from examples/cpp/dds/FlowControlExample/FlowControlExample.hpp rename to examples/cpp/flow_control/FlowControl.hpp index cb0f0da9bb3..9a543be80fc 100644 --- a/examples/cpp/dds/FlowControlExample/FlowControlExample.hpp +++ b/examples/cpp/flow_control/FlowControl.hpp @@ -13,14 +13,14 @@ // limitations under the License. /*! - * @file FlowControlExample.hpp + * @file FlowControl.hpp * This header file contains the declaration of the described types in the IDL file. * * This file was generated by the tool fastddsgen. */ -#ifndef FAST_DDS_GENERATED__FLOWCONTROLEXAMPLE_HPP -#define FAST_DDS_GENERATED__FLOWCONTROLEXAMPLE_HPP +#ifndef FAST_DDS_GENERATED__FLOWCONTROL_HPP +#define FAST_DDS_GENERATED__FLOWCONTROL_HPP #include #include @@ -38,113 +38,142 @@ #if defined(_WIN32) #if defined(EPROSIMA_USER_DLL_EXPORT) -#if defined(FLOWCONTROLEXAMPLE_SOURCE) -#define FLOWCONTROLEXAMPLE_DllAPI __declspec( dllexport ) +#if defined(FLOWCONTROL_SOURCE) +#define FLOWCONTROL_DllAPI __declspec( dllexport ) #else -#define FLOWCONTROLEXAMPLE_DllAPI __declspec( dllimport ) -#endif // FLOWCONTROLEXAMPLE_SOURCE +#define FLOWCONTROL_DllAPI __declspec( dllimport ) +#endif // FLOWCONTROL_SOURCE #else -#define FLOWCONTROLEXAMPLE_DllAPI +#define FLOWCONTROL_DllAPI #endif // EPROSIMA_USER_DLL_EXPORT #else -#define FLOWCONTROLEXAMPLE_DllAPI +#define FLOWCONTROL_DllAPI #endif // _WIN32 /*! - * @brief This class represents the structure FlowControlExample defined by the user in the IDL file. - * @ingroup FlowControlExample + * @brief This class represents the structure FlowControl defined by the user in the IDL file. + * @ingroup FlowControl */ -class FlowControlExample +class FlowControl { public: /*! * @brief Default constructor. */ - eProsima_user_DllExport FlowControlExample() + eProsima_user_DllExport FlowControl() { } /*! * @brief Default destructor. */ - eProsima_user_DllExport ~FlowControlExample() + eProsima_user_DllExport ~FlowControl() { } /*! * @brief Copy constructor. - * @param x Reference to the object FlowControlExample that will be copied. + * @param x Reference to the object FlowControl that will be copied. */ - eProsima_user_DllExport FlowControlExample( - const FlowControlExample& x) + eProsima_user_DllExport FlowControl( + const FlowControl& x) { - m_message = x.m_message; + m_index = x.m_index; - m_wasFast = x.m_wasFast; + m_message = x.m_message; } /*! * @brief Move constructor. - * @param x Reference to the object FlowControlExample that will be copied. + * @param x Reference to the object FlowControl that will be copied. */ - eProsima_user_DllExport FlowControlExample( - FlowControlExample&& x) noexcept + eProsima_user_DllExport FlowControl( + FlowControl&& x) noexcept { + m_index = x.m_index; m_message = std::move(x.m_message); - m_wasFast = x.m_wasFast; } /*! * @brief Copy assignment. - * @param x Reference to the object FlowControlExample that will be copied. + * @param x Reference to the object FlowControl that will be copied. */ - eProsima_user_DllExport FlowControlExample& operator =( - const FlowControlExample& x) + eProsima_user_DllExport FlowControl& operator =( + const FlowControl& x) { - m_message = x.m_message; + m_index = x.m_index; - m_wasFast = x.m_wasFast; + m_message = x.m_message; return *this; } /*! * @brief Move assignment. - * @param x Reference to the object FlowControlExample that will be copied. + * @param x Reference to the object FlowControl that will be copied. */ - eProsima_user_DllExport FlowControlExample& operator =( - FlowControlExample&& x) noexcept + eProsima_user_DllExport FlowControl& operator =( + FlowControl&& x) noexcept { + m_index = x.m_index; m_message = std::move(x.m_message); - m_wasFast = x.m_wasFast; return *this; } /*! * @brief Comparison operator. - * @param x FlowControlExample object to compare. + * @param x FlowControl object to compare. */ eProsima_user_DllExport bool operator ==( - const FlowControlExample& x) const + const FlowControl& x) const { - return (m_message == x.m_message && - m_wasFast == x.m_wasFast); + return (m_index == x.m_index && + m_message == x.m_message); } /*! * @brief Comparison operator. - * @param x FlowControlExample object to compare. + * @param x FlowControl object to compare. */ eProsima_user_DllExport bool operator !=( - const FlowControlExample& x) const + const FlowControl& x) const { return !(*this == x); } + /*! + * @brief This function sets a value in member index + * @param _index New value for member index + */ + eProsima_user_DllExport void index( + uint32_t _index) + { + m_index = _index; + } + + /*! + * @brief This function returns the value of member index + * @return Value of member index + */ + eProsima_user_DllExport uint32_t index() const + { + return m_index; + } + + /*! + * @brief This function returns a reference to member index + * @return Reference to member index + */ + eProsima_user_DllExport uint32_t& index() + { + return m_index; + } + + /*! * @brief This function copies the value in member message * @param _message New value to be copied in member message @@ -184,43 +213,12 @@ class FlowControlExample } - /*! - * @brief This function sets a value in member wasFast - * @param _wasFast New value for member wasFast - */ - eProsima_user_DllExport void wasFast( - char _wasFast) - { - m_wasFast = _wasFast; - } - - /*! - * @brief This function returns the value of member wasFast - * @return Value of member wasFast - */ - eProsima_user_DllExport char wasFast() const - { - return m_wasFast; - } - - /*! - * @brief This function returns a reference to member wasFast - * @return Reference to member wasFast - */ - eProsima_user_DllExport char& wasFast() - { - return m_wasFast; - } - - private: + uint32_t m_index{0}; std::array m_message{0}; - char m_wasFast{0}; }; -#endif // _FAST_DDS_GENERATED_FLOWCONTROLEXAMPLE_HPP_ - - +#endif // FAST_DDS_GENERATED__FLOWCONTROL_HPP diff --git a/examples/cpp/flow_control/FlowControl.idl b/examples/cpp/flow_control/FlowControl.idl new file mode 100644 index 00000000000..27c1222a81a --- /dev/null +++ b/examples/cpp/flow_control/FlowControl.idl @@ -0,0 +1,5 @@ +struct FlowControl +{ + unsigned long index; + char message[600000]; +}; diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExampleCdrAux.hpp b/examples/cpp/flow_control/FlowControlCdrAux.hpp similarity index 70% rename from examples/cpp/dds/FlowControlExample/FlowControlExampleCdrAux.hpp rename to examples/cpp/flow_control/FlowControlCdrAux.hpp index f68a328b359..596b199d4c0 100644 --- a/examples/cpp/dds/FlowControlExample/FlowControlExampleCdrAux.hpp +++ b/examples/cpp/flow_control/FlowControlCdrAux.hpp @@ -13,19 +13,19 @@ // limitations under the License. /*! - * @file FlowControlExampleCdrAux.hpp + * @file FlowControlCdrAux.hpp * This source file contains some definitions of CDR related functions. * * This file was generated by the tool fastddsgen. */ -#ifndef FAST_DDS_GENERATED__FLOWCONTROLEXAMPLECDRAUX_HPP -#define FAST_DDS_GENERATED__FLOWCONTROLEXAMPLECDRAUX_HPP +#ifndef FAST_DDS_GENERATED__FLOWCONTROLCDRAUX_HPP +#define FAST_DDS_GENERATED__FLOWCONTROLCDRAUX_HPP -#include "FlowControlExample.hpp" +#include "FlowControl.hpp" -constexpr uint32_t FlowControlExample_max_cdr_typesize {600005UL}; -constexpr uint32_t FlowControlExample_max_key_cdr_typesize {0UL}; +constexpr uint32_t FlowControl_max_cdr_typesize {600008UL}; +constexpr uint32_t FlowControl_max_key_cdr_typesize {0UL}; namespace eprosima { @@ -36,11 +36,10 @@ class CdrSizeCalculator; eProsima_user_DllExport void serialize_key( eprosima::fastcdr::Cdr& scdr, - const FlowControlExample& data); + const FlowControl& data); } // namespace fastcdr } // namespace eprosima -#endif // FAST_DDS_GENERATED__FLOWCONTROLEXAMPLECDRAUX_HPP - +#endif // FAST_DDS_GENERATED__FLOWCONTROLCDRAUX_HPP diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExampleCdrAux.ipp b/examples/cpp/flow_control/FlowControlCdrAux.ipp similarity index 84% rename from examples/cpp/dds/FlowControlExample/FlowControlExampleCdrAux.ipp rename to examples/cpp/flow_control/FlowControlCdrAux.ipp index aed583c6162..481f6e2135f 100644 --- a/examples/cpp/dds/FlowControlExample/FlowControlExampleCdrAux.ipp +++ b/examples/cpp/flow_control/FlowControlCdrAux.ipp @@ -13,16 +13,16 @@ // limitations under the License. /*! - * @file FlowControlExampleCdrAux.ipp + * @file FlowControlCdrAux.ipp * This source file contains some declarations of CDR related functions. * * This file was generated by the tool fastddsgen. */ -#ifndef FAST_DDS_GENERATED__FLOWCONTROLEXAMPLECDRAUX_IPP -#define FAST_DDS_GENERATED__FLOWCONTROLEXAMPLECDRAUX_IPP +#ifndef FAST_DDS_GENERATED__FLOWCONTROLCDRAUX_IPP +#define FAST_DDS_GENERATED__FLOWCONTROLCDRAUX_IPP -#include "FlowControlExampleCdrAux.hpp" +#include "FlowControlCdrAux.hpp" #include #include @@ -37,7 +37,7 @@ namespace fastcdr { template<> eProsima_user_DllExport size_t calculate_serialized_size( eprosima::fastcdr::CdrSizeCalculator& calculator, - const FlowControlExample& data, + const FlowControl& data, size_t& current_alignment) { static_cast(data); @@ -51,10 +51,10 @@ eProsima_user_DllExport size_t calculate_serialized_size( calculated_size += calculator.calculate_member_serialized_size(eprosima::fastcdr::MemberId(0), - data.message(), current_alignment); + data.index(), current_alignment); calculated_size += calculator.calculate_member_serialized_size(eprosima::fastcdr::MemberId(1), - data.wasFast(), current_alignment); + data.message(), current_alignment); calculated_size += calculator.end_calculate_type_serialized_size(previous_encoding, current_alignment); @@ -65,7 +65,7 @@ eProsima_user_DllExport size_t calculate_serialized_size( template<> eProsima_user_DllExport void serialize( eprosima::fastcdr::Cdr& scdr, - const FlowControlExample& data) + const FlowControl& data) { eprosima::fastcdr::Cdr::state current_state(scdr); scdr.begin_serialize_type(current_state, @@ -74,8 +74,8 @@ eProsima_user_DllExport void serialize( eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR); scdr - << eprosima::fastcdr::MemberId(0) << data.message() - << eprosima::fastcdr::MemberId(1) << data.wasFast() + << eprosima::fastcdr::MemberId(0) << data.index() + << eprosima::fastcdr::MemberId(1) << data.message() ; scdr.end_serialize_type(current_state); } @@ -83,7 +83,7 @@ eProsima_user_DllExport void serialize( template<> eProsima_user_DllExport void deserialize( eprosima::fastcdr::Cdr& cdr, - FlowControlExample& data) + FlowControl& data) { cdr.deserialize_type(eprosima::fastcdr::CdrVersion::XCDRv2 == cdr.get_cdr_version() ? eprosima::fastcdr::EncodingAlgorithmFlag::DELIMIT_CDR2 : @@ -94,11 +94,11 @@ eProsima_user_DllExport void deserialize( switch (mid.id) { case 0: - dcdr >> data.message(); + dcdr >> data.index(); break; case 1: - dcdr >> data.wasFast(); + dcdr >> data.message(); break; default: @@ -111,7 +111,7 @@ eProsima_user_DllExport void deserialize( void serialize_key( eprosima::fastcdr::Cdr& scdr, - const FlowControlExample& data) + const FlowControl& data) { static_cast(scdr); static_cast(data); @@ -122,5 +122,4 @@ void serialize_key( } // namespace fastcdr } // namespace eprosima -#endif // FAST_DDS_GENERATED__FLOWCONTROLEXAMPLECDRAUX_IPP - +#endif // FAST_DDS_GENERATED__FLOWCONTROLCDRAUX_IPP diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExamplePubSubTypes.cxx b/examples/cpp/flow_control/FlowControlPubSubTypes.cxx similarity index 78% rename from examples/cpp/dds/FlowControlExample/FlowControlExamplePubSubTypes.cxx rename to examples/cpp/flow_control/FlowControlPubSubTypes.cxx index 91ce6f29ef7..6bacf3ded72 100644 --- a/examples/cpp/dds/FlowControlExample/FlowControlExamplePubSubTypes.cxx +++ b/examples/cpp/flow_control/FlowControlPubSubTypes.cxx @@ -13,42 +13,42 @@ // limitations under the License. /*! - * @file FlowControlExamplePubSubTypes.cpp + * @file FlowControlPubSubTypes.cpp * This header file contains the implementation of the serialization functions. * * This file was generated by the tool fastddsgen. */ -#include "FlowControlExamplePubSubTypes.hpp" +#include "FlowControlPubSubTypes.hpp" #include #include -#include "FlowControlExampleCdrAux.hpp" -#include "FlowControlExampleTypeObjectSupport.hpp" +#include "FlowControlCdrAux.hpp" +#include "FlowControlTypeObjectSupport.hpp" using SerializedPayload_t = eprosima::fastdds::rtps::SerializedPayload_t; using InstanceHandle_t = eprosima::fastdds::rtps::InstanceHandle_t; using DataRepresentationId_t = eprosima::fastdds::dds::DataRepresentationId_t; -FlowControlExamplePubSubType::FlowControlExamplePubSubType() +FlowControlPubSubType::FlowControlPubSubType() { - setName("FlowControlExample"); + setName("FlowControl"); uint32_t type_size = #if FASTCDR_VERSION_MAJOR == 1 - static_cast(FlowControlExample::getMaxCdrSerializedSize()); + static_cast(FlowControl::getMaxCdrSerializedSize()); #else - FlowControlExample_max_cdr_typesize; + FlowControl_max_cdr_typesize; #endif type_size += static_cast(eprosima::fastcdr::Cdr::alignment(type_size, 4)); /* possible submessage alignment */ m_typeSize = type_size + 4; /*encapsulation*/ m_isGetKeyDefined = false; - uint32_t keyLength = FlowControlExample_max_key_cdr_typesize > 16 ? FlowControlExample_max_key_cdr_typesize : 16; + uint32_t keyLength = FlowControl_max_key_cdr_typesize > 16 ? FlowControl_max_key_cdr_typesize : 16; m_keyBuffer = reinterpret_cast(malloc(keyLength)); memset(m_keyBuffer, 0, keyLength); } -FlowControlExamplePubSubType::~FlowControlExamplePubSubType() +FlowControlPubSubType::~FlowControlPubSubType() { if (m_keyBuffer != nullptr) { @@ -56,12 +56,12 @@ FlowControlExamplePubSubType::~FlowControlExamplePubSubType() } } -bool FlowControlExamplePubSubType::serialize( +bool FlowControlPubSubType::serialize( const void* const data, SerializedPayload_t* payload, DataRepresentationId_t data_representation) { - const FlowControlExample* p_type = static_cast(data); + const FlowControl* p_type = static_cast(data); // Object that manages the raw buffer. eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(payload->data), payload->max_size); @@ -98,14 +98,14 @@ bool FlowControlExamplePubSubType::serialize( return true; } -bool FlowControlExamplePubSubType::deserialize( +bool FlowControlPubSubType::deserialize( SerializedPayload_t* payload, void* data) { try { // Convert DATA to pointer of your type - FlowControlExample* p_type = static_cast(data); + FlowControl* p_type = static_cast(data); // Object that manages the raw buffer. eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(payload->data), payload->length); @@ -132,7 +132,7 @@ bool FlowControlExamplePubSubType::deserialize( return true; } -std::function FlowControlExamplePubSubType::getSerializedSizeProvider( +std::function FlowControlPubSubType::getSerializedSizeProvider( const void* const data, DataRepresentationId_t data_representation) { @@ -140,7 +140,7 @@ std::function FlowControlExamplePubSubType::getSerializedSizeProvide { #if FASTCDR_VERSION_MAJOR == 1 static_cast(data_representation); - return static_cast(type::getCdrSerializedSize(*static_cast(data))) + + return static_cast(type::getCdrSerializedSize(*static_cast(data))) + 4u /*encapsulation*/; #else try @@ -150,7 +150,7 @@ std::function FlowControlExamplePubSubType::getSerializedSizeProvide eprosima::fastcdr::CdrVersion::XCDRv1 :eprosima::fastcdr::CdrVersion::XCDRv2); size_t current_alignment {0}; return static_cast(calculator.calculate_serialized_size( - *static_cast(data), current_alignment)) + + *static_cast(data), current_alignment)) + 4u /*encapsulation*/; } catch (eprosima::fastcdr::exception::Exception& /*exception*/) @@ -161,18 +161,18 @@ std::function FlowControlExamplePubSubType::getSerializedSizeProvide }; } -void* FlowControlExamplePubSubType::createData() +void* FlowControlPubSubType::createData() { - return reinterpret_cast(new FlowControlExample()); + return reinterpret_cast(new FlowControl()); } -void FlowControlExamplePubSubType::deleteData( +void FlowControlPubSubType::deleteData( void* data) { - delete(reinterpret_cast(data)); + delete(reinterpret_cast(data)); } -bool FlowControlExamplePubSubType::getKey( +bool FlowControlPubSubType::getKey( const void* const data, InstanceHandle_t* handle, bool force_md5) @@ -182,11 +182,11 @@ bool FlowControlExamplePubSubType::getKey( return false; } - const FlowControlExample* p_type = static_cast(data); + const FlowControl* p_type = static_cast(data); // Object that manages the raw buffer. eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(m_keyBuffer), - FlowControlExample_max_key_cdr_typesize); + FlowControl_max_key_cdr_typesize); // Object that serializes the data. eprosima::fastcdr::Cdr ser(fastbuffer, eprosima::fastcdr::Cdr::BIG_ENDIANNESS, eprosima::fastcdr::CdrVersion::XCDRv1); @@ -195,7 +195,7 @@ bool FlowControlExamplePubSubType::getKey( #else eprosima::fastcdr::serialize_key(ser, *p_type); #endif // FASTCDR_VERSION_MAJOR == 1 - if (force_md5 || FlowControlExample_max_key_cdr_typesize > 16) + if (force_md5 || FlowControl_max_key_cdr_typesize > 16) { m_md5.init(); #if FASTCDR_VERSION_MAJOR == 1 @@ -219,11 +219,11 @@ bool FlowControlExamplePubSubType::getKey( return true; } -void FlowControlExamplePubSubType::register_type_object_representation() +void FlowControlPubSubType::register_type_object_representation() { - register_FlowControlExample_type_identifier(type_identifiers_); + register_FlowControl_type_identifier(type_identifiers_); } // Include auxiliary functions like for serializing/deserializing. -#include "FlowControlExampleCdrAux.ipp" +#include "FlowControlCdrAux.ipp" diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExamplePubSubTypes.hpp b/examples/cpp/flow_control/FlowControlPubSubTypes.hpp similarity index 81% rename from examples/cpp/dds/FlowControlExample/FlowControlExamplePubSubTypes.hpp rename to examples/cpp/flow_control/FlowControlPubSubTypes.hpp index e62c1e29594..65c48d35fd6 100644 --- a/examples/cpp/dds/FlowControlExample/FlowControlExamplePubSubTypes.hpp +++ b/examples/cpp/flow_control/FlowControlPubSubTypes.hpp @@ -13,15 +13,15 @@ // limitations under the License. /*! - * @file FlowControlExamplePubSubTypes.hpp + + * @file FlowControlPubSubTypes.h * This header file contains the declaration of the serialization functions. * * This file was generated by the tool fastddsgen. */ - -#ifndef FAST_DDS_GENERATED__FLOWCONTROLEXAMPLE_PUBSUBTYPES_HPP -#define FAST_DDS_GENERATED__FLOWCONTROLEXAMPLE_PUBSUBTYPES_HPP +#ifndef FAST_DDS_GENERATED__FLOWCONTROL_PUBSUBTYPES_HPP +#define FAST_DDS_GENERATED__FLOWCONTROL_PUBSUBTYPES_HPP #include #include @@ -29,28 +29,28 @@ #include #include -#include "FlowControlExample.hpp" +#include "FlowControl.hpp" #if !defined(GEN_API_VER) || (GEN_API_VER != 2) #error \ - Generated FlowControlExample is not compatible with current installed Fast DDS. Please, regenerate it with fastddsgen. + Generated FlowControl is not compatible with current installed Fast DDS. Please, regenerate it with fastddsgen. #endif // GEN_API_VER /*! - * @brief This class represents the TopicDataType of the type FlowControlExample defined by the user in the IDL file. - * @ingroup FlowControlExample + * @brief This class represents the TopicDataType of the type FlowControl defined by the user in the IDL file. + * @ingroup FlowControl */ -class FlowControlExamplePubSubType : public eprosima::fastdds::dds::TopicDataType +class FlowControlPubSubType : public eprosima::fastdds::dds::TopicDataType { public: - typedef FlowControlExample type; + typedef FlowControl type; - eProsima_user_DllExport FlowControlExamplePubSubType(); + eProsima_user_DllExport FlowControlPubSubType(); - eProsima_user_DllExport ~FlowControlExamplePubSubType() override; + eProsima_user_DllExport ~FlowControlPubSubType() override; eProsima_user_DllExport bool serialize( const void* const data, @@ -106,7 +106,7 @@ class FlowControlExamplePubSubType : public eprosima::fastdds::dds::TopicDataTyp } eProsima_user_DllExport inline bool is_plain( - eprosima::fastdds::dds::DataRepresentationId_t data_representation) const override + eprosima::fastdds::dds::DataRepresentationId_t data_representation) const override { static_cast(data_representation); return false; @@ -129,5 +129,4 @@ class FlowControlExamplePubSubType : public eprosima::fastdds::dds::TopicDataTyp }; -#endif // FAST_DDS_GENERATED__FLOWCONTROLEXAMPLE_PUBSUBTYPES_HPP - +#endif // FAST_DDS_GENERATED__FLOWCONTROL_PUBSUBTYPES_HPP diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExampleTypeObjectSupport.cxx b/examples/cpp/flow_control/FlowControlTypeObjectSupport.cxx similarity index 68% rename from examples/cpp/dds/FlowControlExample/FlowControlExampleTypeObjectSupport.cxx rename to examples/cpp/flow_control/FlowControlTypeObjectSupport.cxx index 5625b56e76a..b6bd4eb53f3 100644 --- a/examples/cpp/dds/FlowControlExample/FlowControlExampleTypeObjectSupport.cxx +++ b/examples/cpp/flow_control/FlowControlTypeObjectSupport.cxx @@ -13,13 +13,13 @@ // limitations under the License. /*! - * @file FlowControlExampleTypeObjectSupport.cxx + * @file FlowControlTypeObjectSupport.cxx * Source file containing the implementation to register the TypeObject representation of the described types in the IDL file * * This file was generated by the tool fastddsgen. */ -#include "FlowControlExampleTypeObjectSupport.hpp" +#include "FlowControlTypeObjectSupport.hpp" #include #include @@ -33,31 +33,61 @@ #include #include -#include "FlowControlExample.hpp" +#include "FlowControl.hpp" using namespace eprosima::fastdds::dds::xtypes; // TypeIdentifier is returned by reference: dependent structures/unions are registered in this same method -void register_FlowControlExample_type_identifier( - TypeIdentifierPair& type_ids_FlowControlExample) +void register_FlowControl_type_identifier( + TypeIdentifierPair& type_ids_FlowControl) { - ReturnCode_t return_code_FlowControlExample {eprosima::fastdds::dds::RETCODE_OK}; - return_code_FlowControlExample = + ReturnCode_t return_code_FlowControl {eprosima::fastdds::dds::RETCODE_OK}; + return_code_FlowControl = eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->type_object_registry().get_type_identifiers( - "FlowControlExample", type_ids_FlowControlExample); - if (eprosima::fastdds::dds::RETCODE_OK != return_code_FlowControlExample) + "FlowControl", type_ids_FlowControl); + if (eprosima::fastdds::dds::RETCODE_OK != return_code_FlowControl) { - StructTypeFlag struct_flags_FlowControlExample = TypeObjectUtils::build_struct_type_flag(eprosima::fastdds::dds::xtypes::ExtensibilityKind::APPENDABLE, + StructTypeFlag struct_flags_FlowControl = TypeObjectUtils::build_struct_type_flag(eprosima::fastdds::dds::xtypes::ExtensibilityKind::APPENDABLE, false, false); - QualifiedTypeName type_name_FlowControlExample = "FlowControlExample"; - eprosima::fastcdr::optional type_ann_builtin_FlowControlExample; - eprosima::fastcdr::optional ann_custom_FlowControlExample; - CompleteTypeDetail detail_FlowControlExample = TypeObjectUtils::build_complete_type_detail(type_ann_builtin_FlowControlExample, ann_custom_FlowControlExample, type_name_FlowControlExample.to_string()); - CompleteStructHeader header_FlowControlExample; - header_FlowControlExample = TypeObjectUtils::build_complete_struct_header(TypeIdentifier(), detail_FlowControlExample); - CompleteStructMemberSeq member_seq_FlowControlExample; + QualifiedTypeName type_name_FlowControl = "FlowControl"; + eprosima::fastcdr::optional type_ann_builtin_FlowControl; + eprosima::fastcdr::optional ann_custom_FlowControl; + CompleteTypeDetail detail_FlowControl = TypeObjectUtils::build_complete_type_detail(type_ann_builtin_FlowControl, ann_custom_FlowControl, type_name_FlowControl.to_string()); + CompleteStructHeader header_FlowControl; + header_FlowControl = TypeObjectUtils::build_complete_struct_header(TypeIdentifier(), detail_FlowControl); + CompleteStructMemberSeq member_seq_FlowControl; + { + TypeIdentifierPair type_ids_index; + ReturnCode_t return_code_index {eprosima::fastdds::dds::RETCODE_OK}; + return_code_index = + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->type_object_registry().get_type_identifiers( + "_uint32_t", type_ids_index); + + if (eprosima::fastdds::dds::RETCODE_OK != return_code_index) + { + EPROSIMA_LOG_ERROR(XTYPES_TYPE_REPRESENTATION, + "index Structure member TypeIdentifier unknown to TypeObjectRegistry."); + return; + } + StructMemberFlag member_flags_index = TypeObjectUtils::build_struct_member_flag(eprosima::fastdds::dds::xtypes::TryConstructFailAction::DISCARD, + false, false, false, false); + MemberId member_id_index = 0x00000000; + bool common_index_ec {false}; + CommonStructMember common_index {TypeObjectUtils::build_common_struct_member(member_id_index, member_flags_index, TypeObjectUtils::retrieve_complete_type_identifier(type_ids_index, common_index_ec))}; + if (!common_index_ec) + { + EPROSIMA_LOG_ERROR(XTYPES_TYPE_REPRESENTATION, "Structure index member TypeIdentifier inconsistent."); + return; + } + MemberName name_index = "index"; + eprosima::fastcdr::optional member_ann_builtin_index; + ann_custom_FlowControl.reset(); + CompleteMemberDetail detail_index = TypeObjectUtils::build_complete_member_detail(name_index, member_ann_builtin_index, ann_custom_FlowControl); + CompleteStructMember member_index = TypeObjectUtils::build_complete_struct_member(common_index, detail_index); + TypeObjectUtils::add_complete_struct_member(member_seq_FlowControl, member_index); + } { TypeIdentifierPair type_ids_message; ReturnCode_t return_code_message {eprosima::fastdds::dds::RETCODE_OK}; @@ -107,7 +137,7 @@ void register_FlowControlExample_type_identifier( } StructMemberFlag member_flags_message = TypeObjectUtils::build_struct_member_flag(eprosima::fastdds::dds::xtypes::TryConstructFailAction::DISCARD, false, false, false, false); - MemberId member_id_message = 0x00000000; + MemberId member_id_message = 0x00000001; bool common_message_ec {false}; CommonStructMember common_message {TypeObjectUtils::build_common_struct_member(member_id_message, member_flags_message, TypeObjectUtils::retrieve_complete_type_identifier(type_ids_message, common_message_ec))}; if (!common_message_ec) @@ -117,47 +147,17 @@ void register_FlowControlExample_type_identifier( } MemberName name_message = "message"; eprosima::fastcdr::optional member_ann_builtin_message; - ann_custom_FlowControlExample.reset(); - CompleteMemberDetail detail_message = TypeObjectUtils::build_complete_member_detail(name_message, member_ann_builtin_message, ann_custom_FlowControlExample); + ann_custom_FlowControl.reset(); + CompleteMemberDetail detail_message = TypeObjectUtils::build_complete_member_detail(name_message, member_ann_builtin_message, ann_custom_FlowControl); CompleteStructMember member_message = TypeObjectUtils::build_complete_struct_member(common_message, detail_message); - TypeObjectUtils::add_complete_struct_member(member_seq_FlowControlExample, member_message); - } - { - TypeIdentifierPair type_ids_wasFast; - ReturnCode_t return_code_wasFast {eprosima::fastdds::dds::RETCODE_OK}; - return_code_wasFast = - eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->type_object_registry().get_type_identifiers( - "_char", type_ids_wasFast); - - if (eprosima::fastdds::dds::RETCODE_OK != return_code_wasFast) - { - EPROSIMA_LOG_ERROR(XTYPES_TYPE_REPRESENTATION, - "wasFast Structure member TypeIdentifier unknown to TypeObjectRegistry."); - return; - } - StructMemberFlag member_flags_wasFast = TypeObjectUtils::build_struct_member_flag(eprosima::fastdds::dds::xtypes::TryConstructFailAction::DISCARD, - false, false, false, false); - MemberId member_id_wasFast = 0x00000001; - bool common_wasFast_ec {false}; - CommonStructMember common_wasFast {TypeObjectUtils::build_common_struct_member(member_id_wasFast, member_flags_wasFast, TypeObjectUtils::retrieve_complete_type_identifier(type_ids_wasFast, common_wasFast_ec))}; - if (!common_wasFast_ec) - { - EPROSIMA_LOG_ERROR(XTYPES_TYPE_REPRESENTATION, "Structure wasFast member TypeIdentifier inconsistent."); - return; - } - MemberName name_wasFast = "wasFast"; - eprosima::fastcdr::optional member_ann_builtin_wasFast; - ann_custom_FlowControlExample.reset(); - CompleteMemberDetail detail_wasFast = TypeObjectUtils::build_complete_member_detail(name_wasFast, member_ann_builtin_wasFast, ann_custom_FlowControlExample); - CompleteStructMember member_wasFast = TypeObjectUtils::build_complete_struct_member(common_wasFast, detail_wasFast); - TypeObjectUtils::add_complete_struct_member(member_seq_FlowControlExample, member_wasFast); + TypeObjectUtils::add_complete_struct_member(member_seq_FlowControl, member_message); } - CompleteStructType struct_type_FlowControlExample = TypeObjectUtils::build_complete_struct_type(struct_flags_FlowControlExample, header_FlowControlExample, member_seq_FlowControlExample); + CompleteStructType struct_type_FlowControl = TypeObjectUtils::build_complete_struct_type(struct_flags_FlowControl, header_FlowControl, member_seq_FlowControl); if (eprosima::fastdds::dds::RETCODE_BAD_PARAMETER == - TypeObjectUtils::build_and_register_struct_type_object(struct_type_FlowControlExample, type_name_FlowControlExample.to_string(), type_ids_FlowControlExample)) + TypeObjectUtils::build_and_register_struct_type_object(struct_type_FlowControl, type_name_FlowControl.to_string(), type_ids_FlowControl)) { EPROSIMA_LOG_ERROR(XTYPES_TYPE_REPRESENTATION, - "FlowControlExample already registered in TypeObjectRegistry for a different type."); + "FlowControl already registered in TypeObjectRegistry for a different type."); } } } diff --git a/examples/cpp/dds/FlowControlExample/FlowControlExampleTypeObjectSupport.hpp b/examples/cpp/flow_control/FlowControlTypeObjectSupport.hpp similarity index 81% rename from examples/cpp/dds/FlowControlExample/FlowControlExampleTypeObjectSupport.hpp rename to examples/cpp/flow_control/FlowControlTypeObjectSupport.hpp index d0f01990ad0..c0ebc8c665d 100644 --- a/examples/cpp/dds/FlowControlExample/FlowControlExampleTypeObjectSupport.hpp +++ b/examples/cpp/flow_control/FlowControlTypeObjectSupport.hpp @@ -13,14 +13,14 @@ // limitations under the License. /*! - * @file FlowControlExampleTypeObjectSupport.hpp + * @file FlowControlTypeObjectSupport.hpp * Header file containing the API required to register the TypeObject representation of the described types in the IDL file * * This file was generated by the tool fastddsgen. */ -#ifndef FAST_DDS_GENERATED__FLOWCONTROLEXAMPLE_TYPE_OBJECT_SUPPORT_HPP -#define FAST_DDS_GENERATED__FLOWCONTROLEXAMPLE_TYPE_OBJECT_SUPPORT_HPP +#ifndef _FAST_DDS_GENERATED_FLOWCONTROL_TYPE_OBJECT_SUPPORT_HPP_ +#define _FAST_DDS_GENERATED_FLOWCONTROL_TYPE_OBJECT_SUPPORT_HPP_ #include @@ -38,7 +38,7 @@ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC /** - * @brief Register FlowControlExample related TypeIdentifier. + * @brief Register FlowControl related TypeIdentifier. * Fully-descriptive TypeIdentifiers are directly registered. * Hash TypeIdentifiers require to fill the TypeObject information and hash it, consequently, the TypeObject is * indirectly registered as well. @@ -47,10 +47,10 @@ * The returned TypeIdentifier corresponds to the complete TypeIdentifier in case of hashed TypeIdentifiers. * Invalid TypeIdentifier is returned in case of error. */ -eProsima_user_DllExport void register_FlowControlExample_type_identifier( +eProsima_user_DllExport void register_FlowControl_type_identifier( eprosima::fastdds::dds::xtypes::TypeIdentifierPair& type_ids); #endif // DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -#endif // FAST_DDS_GENERATED__FLOWCONTROLEXAMPLE_TYPE_OBJECT_SUPPORT_HPP +#endif // _FAST_DDS_GENERATED_FLOWCONTROL_TYPE_OBJECT_SUPPORT_HPP_ diff --git a/examples/cpp/flow_control/PublisherApp.cpp b/examples/cpp/flow_control/PublisherApp.cpp new file mode 100644 index 00000000000..cec9601282d --- /dev/null +++ b/examples/cpp/flow_control/PublisherApp.cpp @@ -0,0 +1,249 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file PublisherApp.cpp + * + */ + +#include "PublisherApp.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "FlowControlPubSubTypes.hpp" + +using namespace eprosima::fastdds::dds; +using namespace eprosima::fastdds::rtps; + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace flow_control { + +PublisherApp::PublisherApp( + const CLIParser::flow_control_config& config) + : participant_(nullptr) + , publisher_(nullptr) + , topic_(nullptr) + , fast_writer_(nullptr) + , slow_writer_(nullptr) + , type_(new FlowControlPubSubType()) + , matched_(0) + , samples_(config.samples) + , stop_(false) +{ + // Create Participant + DomainParticipantQos pqos; + + // This controller allows 300kb per second. + auto slow_flow_controller_descriptor = std::make_shared(); + slow_flow_controller_descriptor->name = "slow_flow_controller_descriptor"; + slow_flow_controller_descriptor->max_bytes_per_period = config.max_bytes_per_period; + slow_flow_controller_descriptor->period_ms = config.period; + slow_flow_controller_descriptor->scheduler = config.scheduler; + pqos.flow_controllers().push_back(slow_flow_controller_descriptor); + + participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos); + + if (participant_ == nullptr) + { + throw std::runtime_error("Participant initialization failed"); + } + + //Register the type + type_.register_type(participant_); + + // Create fast Publisher, which has no controller of its own + PublisherQos pub_qos = PUBLISHER_QOS_DEFAULT; + participant_->get_default_publisher_qos(pub_qos); + publisher_ = participant_->create_publisher(pub_qos, nullptr, StatusMask::none()); + + if (publisher_ == nullptr) + { + throw std::runtime_error("Publisher initialization failed"); + } + + // Create Topic + topic_ = participant_->create_topic("flow_control_topic", type_.get_type_name(), TOPIC_QOS_DEFAULT); + + if (topic_ == nullptr) + { + throw std::runtime_error("Topic initialization failed"); + } + + // Create slow DataWriter + DataWriterQos wsqos = DATAWRITER_QOS_DEFAULT; + + // Retrieve default QoS, in case they have been previously set through an XML file + publisher_->get_default_datawriter_qos(wsqos); + wsqos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE; + wsqos.publish_mode().flow_controller_name = slow_flow_controller_descriptor->name; + wsqos.properties().properties().emplace_back("fastdds.sfc.priority", config.priority); + wsqos.properties().properties().emplace_back("fastdds.sfc.bandwidth_reservation", config.bandwidth); + + // Disable Data Sharing to force the communication on a Transport, + // since a Flow Control is applied only on Transports. + wsqos.data_sharing().off(); + + // Set a user data value to share with the DataReader the information about the kind of Data Writer + // (0 for Slow and 1 for Fast) + wsqos.user_data().data_vec({0}); + slow_writer_ = publisher_->create_datawriter(topic_, wsqos, this, StatusMask::all()); + + if (slow_writer_ == nullptr) + { + throw std::runtime_error("Slow DataWriter initialization failed"); + } + std::cout << "Slow publisher created, waiting for Subscribers." << std::endl; + + // Create fast DataWriter + DataWriterQos wfqos = DATAWRITER_QOS_DEFAULT; + + // Retrieve default QoS, in case they have been previously set through an XML file + publisher_->get_default_datawriter_qos(wsqos); + wfqos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE; + + // Disable Data Sharing to be consistent with the Slow Writer + wfqos.data_sharing().off(); + + // Set a user data value to share with the DataReader the information about the kind of Data Writer + // (0 for Slow and 1 for Fast) + wfqos.user_data().data_vec({1}); + fast_writer_ = publisher_->create_datawriter(topic_, wfqos, this, StatusMask::all()); + + if (fast_writer_ == nullptr) + { + throw std::runtime_error("Fast DataWriter initialization failed"); + } + + std::cout << "Fast publisher created, waiting for Subscribers." << std::endl; +} + +PublisherApp::~PublisherApp() +{ + if (nullptr != participant_) + { + // Delete DDS entities contained within the DomainParticipant + participant_->delete_contained_entities(); + + // Delete DomainParticipant + DomainParticipantFactory::get_instance()->delete_participant(participant_); + } +} + +void PublisherApp::on_publication_matched( + eprosima::fastdds::dds::DataWriter*, + const eprosima::fastdds::dds::PublicationMatchedStatus& info) +{ + // Protect access to matched_ + std::lock_guard matched_lock(mutex_); + + if (info.current_count_change == 1) + { + matched_ = static_cast(info.current_count); + std::cout << "Publisher matched." << std::endl; + cv_.notify_one(); + } + else if (info.current_count_change == -1) + { + matched_ = static_cast(info.current_count); + std::cout << "Publisher unmatched." << std::endl; + } + else + { + std::cout << info.current_count_change + << " is not a valid value for PublicationMatchedStatus current count change" << std::endl; + } +} + +void PublisherApp::run() +{ + // Publication code + FlowControl msg; + + while (!is_stopped() && ((samples_ == 0) || (msg.index() < samples_))) + { + msg.index(msg.index() + 1); + + if (publish(slow_writer_, msg)) + { + std::cout << "Message SENT from SLOW WRITER, count=" << msg.index() << std::endl; + } + else + { + throw std::runtime_error("Slow Publisher failed sending a message"); + } + + if (publish(fast_writer_, msg)) + { + std::cout << "Message SENT from FAST WRITER, count=" << msg.index() << std::endl; + } + else + { + throw std::runtime_error("Fast Publisher failed sending a message"); + } + + // Wait for period or stop event + std::unique_lock period_lock(mutex_); + cv_.wait_for(period_lock, std::chrono::milliseconds(send_period_), [&]() + { + return is_stopped(); + }); + } +} + +bool PublisherApp::publish( + DataWriter* writer_, + FlowControl msg) +{ + bool ret = false; + // Wait for the data endpoints discovery + std::unique_lock matched_lock(mutex_); + cv_.wait(matched_lock, [&]() + { + // at least one has been discovered + return ((matched_ > 0) || is_stopped()); + }); + + if (!is_stopped()) + { + ret = (RETCODE_OK == writer_->write(&msg)); + } + return ret; +} + +bool PublisherApp::is_stopped() +{ + return stop_.load(); +} + +void PublisherApp::stop() +{ + stop_.store(true); + cv_.notify_one(); +} + +} // namespace flow_control +} // namespace examples +} // namespace fastdds +} // namespace eprosima diff --git a/examples/cpp/flow_control/PublisherApp.hpp b/examples/cpp/flow_control/PublisherApp.hpp new file mode 100644 index 00000000000..9e5749b46fe --- /dev/null +++ b/examples/cpp/flow_control/PublisherApp.hpp @@ -0,0 +1,107 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file PublisherApp.hpp + * + */ + + +#ifndef FASTDDS_FLOW_CONTROL_PUBLISHER_APP_HPP +#define FASTDDS_FLOW_CONTROL_PUBLISHER_APP_HPP + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "Application.hpp" +#include "CLIParser.hpp" +#include "FlowControlPubSubTypes.hpp" + +using namespace eprosima::fastdds::dds; + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace flow_control { + +class PublisherApp : public Application, public DataWriterListener +{ +public: + + PublisherApp( + const CLIParser::flow_control_config& config); + + ~PublisherApp(); + + //! Publisher matched method + void on_publication_matched( + DataWriter* writer, + const PublicationMatchedStatus& info) override; + + //! Run publisher + void run() override; + + //! Stop publisher + void stop() override; + +private: + + //! Return the current state of execution + bool is_stopped(); + + //! Publish a sample + bool publish( + DataWriter* writer_, + FlowControl msg); + + DomainParticipant* participant_; + + Publisher* publisher_; + + Topic* topic_; + + DataWriter* fast_writer_; + + DataWriter* slow_writer_; + + TypeSupport type_; + + int16_t matched_; + + uint16_t samples_; + + std::mutex mutex_; + + std::condition_variable cv_; + + std::atomic stop_; + + const uint32_t send_period_ = 2000; // in ms +}; + +} // namespace flow_control +} // namespace examples +} // namespace fastdds +} // namespace eprosima + +#endif /* _FASTDDS_FLOW_CONTROL_PUBLISHER_APP_HPP */ diff --git a/examples/cpp/flow_control/README.md b/examples/cpp/flow_control/README.md new file mode 100644 index 00000000000..f1728b02c71 --- /dev/null +++ b/examples/cpp/flow_control/README.md @@ -0,0 +1,109 @@ +# Flow control example + +The *eProsima Fast DDS flow control* example is a simple application intended to demonstrate the use +of Flow Controllers. + +This example is part of the suite of examples designed by eProsima that aims to illustrate the features +and possible configurations of DDS deployments through *eProsima Fast DDS*. + +* [Description of the example](#description-of-the-example) +* [Run the example](#run-the-example) + +## Description of the example + +eProsima Fast DDS provides a mechanism to control the data flow sent by a DataWriter. +The Flow Control is implemented through objects called Flow Controllers. +These controllers are registered on the creation of the DomainParticipant using a [Flow Controller Descriptor](https://fast-dds.docs.eprosima.com/en/latest/fastdds/api_reference/rtps/flowcontrol/FlowControllerDescriptor.html#flowcontrollerdescriptor), and then referenced on the creation of the DataWriter using Publish Mode Qos Policy. + +A Flow Controller Descriptor is a simple struct that univocally defines a flow controller. +It includes the following [FlowControllersQos](https://fast-dds.docs.eprosima.com/en/latest/fastdds/dds_layer/core/policy/eprosimaExtensions.html#flowcontrollersqos) settings and configurations: +* Name of the flow controller. +* Scheduler policy used by the flow controller. +* Maximum number of bytes to be sent to network per period. +* Period of time in milliseconds on which the flow controller is allowed to send the maximum number of bytes per period. +* Thread settings for the sender thread. + +When using Flow Controllers, the DataWriter may need specific parameters related to the priority and the bandwith. +For more information, please refer to [Flow Controller Settings](https://fast-dds.docs.eprosima.com/en/latest/fastdds/property_policies/flow_control.html#flow-controller-settings). +* Property `fastdds.sfc.priority` is used to set the priority of the DataWriter for `HIGH_PRIORITY` and `PRIORITY_WITH_RESERVATION` flow controllers. Allowed values are from -10 (highest priority) to 10 (lowest priority). The default value is the lowest priority. +* Property `fastdds.sfc.bandwidth_reservation` is used to set the percentage of the bandwidth that the DataWriter is requesting for `PRIORITY_WITH_RESERVATION` flow controllers. Allowed values are from 0 to 100, and express a percentage of the total flow controller limit. By default, no bandwidth is reserved for the DataWriter. + +Once instantiated, a flow controller will make sure there is a limit on the data it processes, so that no more than the specified size gets through it in the specified time. + +In this example, the Fast DataWriter has no flow controller, while the Slow DataWriter has a flow controller limiting the maximum number of bytes to be sent per period, as well as the period of time on which the DataWriter is allowed to send. + +The information regarding the kind of a DataWriter, whether it is a slow or a fast one, is communicated through the user data field during the discovery phase. This user data is embedded within the discovery protocol, allowing other participants in the network to shared information without requiring direct communication or configuration. +For more information, please refer to [UserDataQosPolicy](https://fast-dds.docs.eprosima.com/en/latest/fastdds/api_reference/dds_pim/core/policy/userdataqospolicy.html#userdataqospolicy). + +## Run the example + +To launch this example, two different terminals are required. +One of them will run the publisher example application, and the other will run the subscriber application. + +### Flow Control publisher + +* Ubuntu ( / MacOS ) + + ```shell + user@machine:example_path$ ./flow_control publisher + Publisher running. Please press Ctrl+C to stop the Publisher at any time. + ``` + +* Windows + + ```powershell + example_path> flow_control.exe publisher + Publisher running. Please press Ctrl+C to stop the Publisher at any time. + ``` + +### Flow Control subscriber + +* Ubuntu ( / MacOS ) + + ```shell + user@machine:example_path$ ./flow_control subscriber + Subscriber running. Please press Ctrl+C to stop the Subscriber at any time. + ``` + +* Windows + + ```powershell + example_path> flow_control.exe subscriber + Subscriber running. Please press Ctrl+C to stop the Subscriber at any time. + ``` + + +### Expected output + +Regardless of which application is run first, since the publisher will not start sending data until a subscriber is discovered, the expected output both for publishers and subscribers is a first displayed message acknowledging the match, followed by the amount of samples sent or received until Ctrl+C is pressed. +The samples are sent every 2s. The Slow DataWriter sends a sample before the Fast DataWriter, but the sample is always received later according with the FlowControllerQos settings in the Flow Controller. + +### Hello world publisher + +```shell +Publisher running. Please press Ctrl+C to stop the Publisher at any time. +Publisher matched. +Message SENT from SLOW WRITER, count=1 +Message SENT from FAST WRITER, count=1 +Message SENT from SLOW WRITER, count=2 +Message SENT from FAST WRITER, count=2 +Message SENT from SLOW WRITER, count=3 +Message SENT from FAST WRITER, count=3 +... +``` + +### Hello world subscriber + +```shell +Subscriber running. Please press Ctrl+C to stop the Subscriber at any time. +Subscriber matched. +Sample RECEIVED from fast writer, count=1 +Sample RECEIVED from slow writer, count=1 +Sample RECEIVED from fast writer, count=2 +Sample RECEIVED from slow writer, count=2 +Sample RECEIVED from fast writer, count=3 +Sample RECEIVED from slow writer, count=3 +... +``` + +When Ctrl+C is pressed to stop one of the applications, the other one will show the unmatched status, displaying an informative message, and it will stop sending / receiving messages. diff --git a/examples/cpp/flow_control/SubscriberApp.cpp b/examples/cpp/flow_control/SubscriberApp.cpp new file mode 100644 index 00000000000..62e5deb6570 --- /dev/null +++ b/examples/cpp/flow_control/SubscriberApp.cpp @@ -0,0 +1,242 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SubscriberApp.cpp + * + */ + +#include "SubscriberApp.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "Application.hpp" +#include "CLIParser.hpp" + +using namespace eprosima::fastdds::dds; + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace flow_control { + +SubscriberApp::SubscriberApp( + const CLIParser::flow_control_config& config) + : participant_(nullptr) + , subscriber_(nullptr) + , topic_(nullptr) + , reader_(nullptr) + , type_(new FlowControlPubSubType()) + , samples_(config.samples) + , stop_(false) +{ + StatusMask status_mask = StatusMask::none(); + status_mask << StatusMask::data_available(); + status_mask << StatusMask::subscription_matched(); + + // Create Participant + participant_ = DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT, this, + status_mask); + if (participant_ == nullptr) + { + throw std::runtime_error("Participant initialization failed"); + } + + //Register the type + type_.register_type(participant_); + + // Create Subscriber + SubscriberQos sub_qos = SUBSCRIBER_QOS_DEFAULT; + + // Retrieve default QoS, in case they have been previously set with an XML file + participant_->get_default_subscriber_qos(sub_qos); + subscriber_ = participant_->create_subscriber(sub_qos, nullptr, StatusMask::none()); + + if (subscriber_ == nullptr) + { + throw std::runtime_error("Subscriber initialization failed"); + } + + // Create Topic + topic_ = participant_->create_topic("flow_control_topic", type_.get_type_name(), TOPIC_QOS_DEFAULT); + + if (topic_ == nullptr) + { + throw std::runtime_error("Topic initialization failed"); + } + + // Create DataReader + DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; + subscriber_->get_default_datareader_qos(reader_qos); + reader_ = subscriber_->create_datareader(topic_, reader_qos); + + if (reader_ == nullptr) + { + throw std::runtime_error("DataReader initialization failed"); + } +} + +SubscriberApp::~SubscriberApp() +{ + if (nullptr != participant_) + { + // Delete DDS entities contained within the DomainParticipant + participant_->delete_contained_entities(); + + // Delete DomainParticipant + DomainParticipantFactory::get_instance()->delete_participant(participant_); + } +} + +void SubscriberApp::on_subscription_matched( + DataReader*, + const SubscriptionMatchedStatus& info) +{ + if (info.current_count_change == 1) + { + std::cout << "Subscriber matched." << std::endl; + } + else if (info.current_count_change == -1) + { + std::cout << "Subscriber unmatched." << std::endl; + } + else + { + std::cout << info.current_count_change + << " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl; + } +} + +void SubscriberApp::on_data_available( + DataReader* reader) +{ + SampleInfo info; + FlowControl msg; + while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&msg, &info))) + { + if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) + { + static unsigned int fast_messages = 0; + static unsigned int slow_messages = 0; + + auto it_f = std::find(fast_writer_guid.begin(), fast_writer_guid.end(), info.sample_identity.writer_guid()); + + auto it_s = std::find(slow_writer_guid.begin(), slow_writer_guid.end(), info.sample_identity.writer_guid()); + + if (it_f != fast_writer_guid.end()) + { + fast_messages++; + std::cout << "Sample RECEIVED from FAST writer with id " << *it_f << ", index=" << msg.index() << + std::endl; + } + else if (it_s != slow_writer_guid.end()) + { + slow_messages++; + std::cout << "Sample RECEIVED from SLOW writer with id " << *it_s << ", index=" << msg.index() << + std::endl; + } + + if ((samples_ > 0) && (fast_messages >= samples_) && (slow_messages >= samples_)) + { + stop(); + } + } + } +} + +void SubscriberApp::on_data_writer_discovery( + DomainParticipant* /*participant*/, + eprosima::fastdds::rtps::WriterDiscoveryStatus status, + const eprosima::fastdds::dds::PublicationBuiltinTopicData& info, + bool& /*should_be_ignored*/) +{ + std::vector slow_writer_id = {0}; + std::vector fast_writer_id = {1}; + + if (info.user_data.data_vec() == fast_writer_id) + { + if (status == + eprosima::fastdds::rtps::WriterDiscoveryStatus::DISCOVERED_WRITER) + { + fast_writer_guid.push_back(info.guid); + + std::cout << "Fast writer with id " << info.guid << " matched" << std::endl; + } + else if (status == + eprosima::fastdds::rtps::WriterDiscoveryStatus::REMOVED_WRITER) + { + auto it = std::find(fast_writer_guid.begin(), fast_writer_guid.end(), info.guid); + + if (it != fast_writer_guid.end()) + { + fast_writer_guid.erase(it); + std::cout << "Fast writer with id " << info.guid << " removed" << std::endl; + } + } + } + else if (info.user_data.data_vec() == slow_writer_id) + { + if (status == + eprosima::fastdds::rtps::WriterDiscoveryStatus::DISCOVERED_WRITER) + { + slow_writer_guid.push_back(info.guid); + + std::cout << "Slow writer with id " << info.guid << " matched" << std::endl; + } + else if (status == + eprosima::fastdds::rtps::WriterDiscoveryStatus::REMOVED_WRITER) + { + auto it = std::find(slow_writer_guid.begin(), slow_writer_guid.end(), info.guid); + + if (it != slow_writer_guid.end()) + { + slow_writer_guid.erase(it); + std::cout << "Slow writer with id " << info.guid << " removed" << std::endl; + } + } + } +} + +void SubscriberApp::run() +{ + std::unique_lock lck(terminate_cv_mtx_); + terminate_cv_.wait(lck, [&] + { + return is_stopped(); + }); +} + +bool SubscriberApp::is_stopped() +{ + return stop_.load(); +} + +void SubscriberApp::stop() +{ + stop_.store(true); + terminate_cv_.notify_all(); +} + +} // namespace flow_control +} // namespace examples +} // namespace fastdds +} // namespace eprosima diff --git a/examples/cpp/flow_control/SubscriberApp.hpp b/examples/cpp/flow_control/SubscriberApp.hpp new file mode 100644 index 00000000000..53a5b35ee68 --- /dev/null +++ b/examples/cpp/flow_control/SubscriberApp.hpp @@ -0,0 +1,111 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SubscriberApp.hpp + * + */ + + +#ifndef FASTDDS_FLOW_CONTROL_SUBSCRIBER_APP_HPP +#define FASTDDS_FLOW_CONTROL_SUBSCRIBER_APP_HPP + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "Application.hpp" +#include "CLIParser.hpp" +#include "FlowControlPubSubTypes.hpp" + +using namespace eprosima::fastdds::dds; + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace flow_control { + +class SubscriberApp : public Application, public DomainParticipantListener +{ +public: + + SubscriberApp( + const CLIParser::flow_control_config& config); + + ~SubscriberApp(); + + //! Subscription callback + void on_data_available( + DataReader* reader) override; + + //! Subscriber matched method + void on_subscription_matched( + DataReader* reader, + const SubscriptionMatchedStatus& info) override; + + void on_data_writer_discovery( + DomainParticipant* participant, + eprosima::fastdds::rtps::WriterDiscoveryStatus status, + const eprosima::fastdds::dds::PublicationBuiltinTopicData& info, + bool& should_be_ignored) override; + + //! Run subscriber + void run() override; + + //! Trigger the end of execution + void stop() override; + +private: + + //! Return the current state of execution + bool is_stopped(); + + DomainParticipant* participant_; + + Subscriber* subscriber_; + + Topic* topic_; + + DataReader* reader_; + + TypeSupport type_; + + uint16_t samples_; + + std::atomic stop_; + + mutable std::mutex terminate_cv_mtx_; + + std::condition_variable terminate_cv_; + + std::vector slow_writer_guid; + + std::vector fast_writer_guid; + +}; + +} // namespace flow_control +} // namespace examples +} // namespace fastdds +} // namespace eprosima + +#endif // FASTDDS_FLOW_CONTROL_SUBSCRIBER_APP_HPP diff --git a/examples/cpp/flow_control/flow_control_profile.xml b/examples/cpp/flow_control/flow_control_profile.xml new file mode 100644 index 00000000000..2d7dbbc56fa --- /dev/null +++ b/examples/cpp/flow_control/flow_control_profile.xml @@ -0,0 +1,46 @@ + + + + + + TRANSIENT_LOCAL + + + RELIABLE + + + + + KEEP_LAST + 100 + + + 100 + 1 + 100 + + + + + + + + TRANSIENT_LOCAL + + + RELIABLE + + + + + KEEP_LAST + 100 + + + 100 + 1 + 100 + + + + diff --git a/examples/cpp/flow_control/main.cpp b/examples/cpp/flow_control/main.cpp new file mode 100644 index 00000000000..fbf5544f758 --- /dev/null +++ b/examples/cpp/flow_control/main.cpp @@ -0,0 +1,110 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file main.cpp + * + */ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "Application.hpp" +#include "CLIParser.hpp" + +using eprosima::fastdds::dds::Log; + +using namespace eprosima::fastdds::examples::flow_control; + +std::function stop_app_handler; +void signal_handler( + int signum) +{ + stop_app_handler(signum); +} + +int main( + int argc, + char** argv) +{ + auto ret = EXIT_SUCCESS; + CLIParser::flow_control_config config = CLIParser::parse_cli_options(argc, argv); + uint16_t samples = 0; + switch (config.entity) + { + case CLIParser::EntityKind::PUBLISHER: + samples = config.samples; + break; + case CLIParser::EntityKind::SUBSCRIBER: + samples = config.samples; + break; + default: + break; + } + + std::string app_name = CLIParser::parse_entity_kind(config.entity); + std::shared_ptr app; + + try + { + app = Application::make_app(config); + } + catch (const std::runtime_error& e) + { + EPROSIMA_LOG_ERROR(app_name, e.what()); + ret = EXIT_FAILURE; + } + + if (EXIT_FAILURE != ret) + { + std::thread thread(&Application::run, app); + + if (samples == 0) + { + std::cout << app_name << " running. Please press Ctrl+C to stop the " + << app_name << " at any time." << std::endl; + } + else + { + std::cout << app_name << " running for " << samples << " samples. Please press Ctrl+C to stop the " + << app_name << " at any time." << std::endl; + } + + stop_app_handler = [&](int signum) + { + std::cout << "\n" << CLIParser::parse_signal(signum) << " received, stopping " << app_name + << " execution." << std::endl; + app->stop(); + }; + + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); + #ifndef _WIN32 + signal(SIGQUIT, signal_handler); + signal(SIGHUP, signal_handler); + #endif // _WIN32 + + thread.join(); + } + + Log::Reset(); + return ret; +} diff --git a/examples/cpp/xtypes/SubscriberApp.cpp b/examples/cpp/xtypes/SubscriberApp.cpp index 23952a4371f..6f5b3e74a47 100644 --- a/examples/cpp/xtypes/SubscriberApp.cpp +++ b/examples/cpp/xtypes/SubscriberApp.cpp @@ -115,7 +115,7 @@ void SubscriberApp::on_data_available( DataReader* reader) { SampleInfo info; - while (RETCODE_OK == reader->take_next_sample(&hello_, &info)) + while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&hello_, &info))) { if (ALIVE_INSTANCE_STATE == info.instance_state && info.valid_data && TK_STRUCTURE == remote_type_object_.complete()._d()) diff --git a/test/examples/flow_control.compose.yml b/test/examples/flow_control.compose.yml new file mode 100644 index 00000000000..7a048e51d82 --- /dev/null +++ b/test/examples/flow_control.compose.yml @@ -0,0 +1,43 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +version: "3" + +services: + subscriber: + image: @DOCKER_IMAGE_NAME@ + volumes: + - @PROJECT_BINARY_DIR@:@PROJECT_BINARY_DIR@ + - @fastcdr_LIB_DIR@:@fastcdr_LIB_DIR@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + # TODO(eduponz): LD_LIBRARY_PATH is not the correct variable for Windows + LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@@TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH@ + EXAMPLE_DIR: @PROJECT_BINARY_DIR@/examples/cpp/flow_control@FILE_EXTENSION@ + FASTDDS_DEFAULT_PROFILES_FILE: @PROJECT_BINARY_DIR@/examples/cpp/flow_control/flow_control_profile.xml + command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/flow_control@FILE_EXTENSION@ subscriber --samples 10" + + publisher: + image: @DOCKER_IMAGE_NAME@ + volumes: + - @PROJECT_BINARY_DIR@:@PROJECT_BINARY_DIR@ + - @fastcdr_LIB_DIR@:@fastcdr_LIB_DIR@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + # TODO(eduponz): LD_LIBRARY_PATH is not the correct variable for Windows + LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@@TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH@ + EXAMPLE_DIR: @PROJECT_BINARY_DIR@/examples/cpp/flow_control + FASTDDS_DEFAULT_PROFILES_FILE: @PROJECT_BINARY_DIR@/examples/cpp/flow_control/flow_control_profile.xml + command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/flow_control@FILE_EXTENSION@ publisher --samples 10 --period 500 --max-bytes 300000 --scheduler PRIORITY-RESERVATION --bandwidth 99 --priority 1" + depends_on: + - subscriber diff --git a/test/examples/test_flow_control.py b/test/examples/test_flow_control.py new file mode 100644 index 00000000000..6b65995e052 --- /dev/null +++ b/test/examples/test_flow_control.py @@ -0,0 +1,54 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess + +def test_flow_control(): + """.""" + ret = False + out = '' + try: + out = subprocess.check_output( + '@DOCKER_EXECUTABLE@ compose -f flow_control.compose.yml up', + stderr=subprocess.STDOUT, + shell=True, + timeout=30 + ).decode().split('\n') + + sent = 0 + received = 0 + for line in out: + if 'SENT' in line: + sent += 1 + continue + + if 'RECEIVED' in line: + received += 1 + continue + + if sent != 0 and received != 0 and sent == received: + ret = True + else: + print('ERROR: sent: ' + str(sent) + ', but received: ' + str(received) + + ' (expected: ' + str(sent) + ')') + raise subprocess.CalledProcessError(1, '') + + except subprocess.CalledProcessError: + for l in out: + print(l) + except subprocess.TimeoutExpired: + print('TIMEOUT') + print(out) + + assert(ret) diff --git a/versions.md b/versions.md index a4430f76400..4cd12a291f7 100644 --- a/versions.md +++ b/versions.md @@ -61,6 +61,7 @@ Forthcoming * Discovery server example. * Request-reply example to showcase RPC paradigms over Fast DDS. * Static EDP discovery example to avoid EDP meta-traffic. + * Flow Controller example with `FlowControllersQos` and property settings. * Removed `TypeConsistencyQos` from DataReader, and included `TypeConsistencyEnforcementQosPolicy` and `DataRepresentationQosPolicy` * Added new `flow_controller_descriptor_list` XML configuration, remove `ThroughtputController`. * Migrate `#define`s within `BuiltinEndpoints.hpp` to namespaced `constexpr` variables.