From 56c6602525b6a54dc5e816e09c148bab523033b7 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 21 May 2024 11:06:23 +0200 Subject: [PATCH] New `max_message_size` property to limit output datagrams size (#4777) (#4805) * Refs #20489. Parse property in RTPS writer. Signed-off-by: Miguel Company * Refs #20849. Parse property in RTPS participant. Signed-off-by: Miguel Company * Refs #20849: Add test for RTPS writer Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Add test for participant Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Apply suggestions Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Uncrustify Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Apply suggestions Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Add tests in DDS layer Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Apply suggestions Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> --------- Signed-off-by: Miguel Company Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> Co-authored-by: Miguel Company (cherry picked from commit 6d20b649bc25a13ed38665e5eff8b2ed9f2b02ce) Co-authored-by: elianalf <62831776+elianalf@users.noreply.github.com> --- include/fastdds/rtps/writer/RTPSWriter.h | 3 + .../rtps/participant/RTPSParticipantImpl.cpp | 23 ++++- .../rtps/participant/RTPSParticipantImpl.h | 3 + src/cpp/rtps/writer/RTPSWriter.cpp | 40 +++++--- .../blackbox/common/DDSBlackboxTestsBasic.cpp | 91 ++++++++++++++++++ .../common/RTPSBlackboxTestsBasic.cpp | 95 +++++++++++++++++++ 6 files changed, 242 insertions(+), 13 deletions(-) diff --git a/include/fastdds/rtps/writer/RTPSWriter.h b/include/fastdds/rtps/writer/RTPSWriter.h index 6faad6b225a..b4503bd18bf 100644 --- a/include/fastdds/rtps/writer/RTPSWriter.h +++ b/include/fastdds/rtps/writer/RTPSWriter.h @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -537,6 +538,8 @@ class RTPSWriter //! Flow controller. fastdds::rtps::FlowController* flow_controller_; + //! Maximum number of bytes allowed for an RTPS datagram generated by this writer. + uint32_t max_output_message_size_ = std::numeric_limits::max(); //!WriterHistory WriterHistory* mp_history = nullptr; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 2e8a95698e0..6e4b1a02291 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -658,6 +658,22 @@ void RTPSParticipantImpl::setup_initial_peers() void RTPSParticipantImpl::setup_output_traffic() { + { + const std::string* max_size_property = + PropertyPolicyHelper::find_property(m_att.properties, "fastdds.max_message_size"); + if (max_size_property != nullptr) + { + try + { + max_output_message_size_ = std::stoul(*max_size_property); + } + catch (const std::exception& e) + { + EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what()); + } + } + } + bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic; size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number; if (num_send_buffers == 0) @@ -2382,8 +2398,11 @@ uint32_t RTPSParticipantImpl::getMaxMessageSize() const #endif // if HAVE_SECURITY return (std::min)( - m_network_Factory.get_max_message_size_between_transports(), - max_receiver_buffer_size); + { + max_output_message_size_, + m_network_Factory.get_max_message_size_between_transports(), + max_receiver_buffer_size + }); } uint32_t RTPSParticipantImpl::getMaxDataSize() diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 21aeed182bb..25059076623 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -584,6 +585,8 @@ class RTPSParticipantImpl std::function type_check_fn_; //!Pool of send buffers std::unique_ptr send_buffers_; + //! Maximum number of bytes allowed for an RTPS datagram generated by this writer. + uint32_t max_output_message_size_ = std::numeric_limits::max(); /** * Client override flag: SIMPLE participant that has been overriden with the environment variable and transformed diff --git a/src/cpp/rtps/writer/RTPSWriter.cpp b/src/cpp/rtps/writer/RTPSWriter.cpp index 1415fd4fc8e..ec7527bd75f 100644 --- a/src/cpp/rtps/writer/RTPSWriter.cpp +++ b/src/cpp/rtps/writer/RTPSWriter.cpp @@ -17,24 +17,22 @@ * */ -#include - -#include -#include - -#include -#include +#include -#include +#include #include -#include - +#include #include - #include +#include +#include +#include +#include +#include + #include #include @@ -109,6 +107,22 @@ void RTPSWriter::init( const std::shared_ptr& change_pool, const WriterAttributes& att) { + { + const std::string* max_size_property = + PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.max_message_size"); + if (max_size_property != nullptr) + { + try + { + max_output_message_size_ = std::stoul(*max_size_property); + } + catch (const std::exception& e) + { + EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what()); + } + } + } + payload_pool_ = payload_pool; change_pool_ = change_pool; fixed_payload_size_ = 0; @@ -308,6 +322,10 @@ uint32_t RTPSWriter::getMaxDataSize() uint32_t flow_max = flow_controller_->get_max_payload(); uint32_t part_max = mp_RTPSParticipant->getMaxMessageSize(); uint32_t max_size = flow_max > part_max ? part_max : flow_max; + if (max_output_message_size_ < max_size) + { + max_size = max_output_message_size_; + } max_size = calculateMaxDataSize(max_size); return max_size &= ~3; diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index 518c6c3a9d9..b1ad9216c7e 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -763,6 +763,97 @@ TEST(DDSBasic, endpoint_custom_payload_pools) participant->delete_contained_entities(); } +/** + * @test Set the maximum number of bytes allowed for a datagram generated by a DomainParticipant. + */ +TEST(DDSBasic, max_output_message_size_participant) +{ + PubSubReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + auto testTransport = std::make_shared(); + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + + // Create the DomainParticipants with the appropriate value for the property + eprosima::fastrtps::rtps::PropertyPolicy property_policy; + property_policy.properties().emplace_back("fastdds.max_message_size", segment_size_str); + PubSubWriter writer(TEST_TOPIC_NAME); + writer.property_policy(property_policy).disable_builtin_transport() + .add_user_transport_to_pparams(testTransport).init(); + EXPECT_TRUE(writer.isInitialized()); + + // Wait for discovery + writer.wait_discovery(std::chrono::seconds(2)); + reader.wait_discovery(std::chrono::seconds(2)); + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.startReception(samples); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); +} + +/** + * @test Set the maximum number of bytes allowed for a datagram generated by a DataWriter. + */ +TEST(DDSBasic, max_output_message_size_writer) +{ + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + + auto testTransport = std::make_shared(); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + + // Create the DataWriter with the appropriate value for the property + eprosima::fastrtps::rtps::PropertyPolicy property_policy; + property_policy.properties().emplace_back("fastdds.max_message_size", segment_size_str); + PubSubWriter writer(TEST_TOPIC_NAME); + writer.entity_property_policy(property_policy).disable_builtin_transport() + .add_user_transport_to_pparams(testTransport).init(); + ASSERT_TRUE(writer.isInitialized()); + + PubSubReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(std::chrono::seconds(2)); + reader.wait_discovery(std::chrono::seconds(2)); + + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.startReception(samples); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); + +} + } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp index 35e8a45b554..409fdc3dff8 100644 --- a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp @@ -1181,6 +1181,101 @@ TEST(RTPS, participant_ignore_local_endpoints_two_participants) eprosima::fastrtps::rtps::RTPSDomain::removeRTPSParticipant(participant_reader); } +/* Maximum number of bytes allowed for an RTPS datagram generated by this participant. */ +TEST(RTPS, max_output_message_size_participant) +{ + /* Set up */ + // Create the RTPSReader + RTPSWithRegistrationReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + // Create the RTPSParticipants with the appropriate value for the property + auto testTransport = std::make_shared(); + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + + eprosima::fastrtps::rtps::RTPSParticipantAttributes patt; + patt.useBuiltinTransports = false; + patt.userTransports.push_back(testTransport); + patt.properties.properties().emplace_back("fastdds.max_message_size", segment_size_str); + eprosima::fastrtps::rtps::RTPSParticipant* participant_writer = + eprosima::fastrtps::rtps::RTPSDomain::createParticipant(static_cast(GET_PID()) % 230, patt); + ASSERT_NE(participant_writer, nullptr); + + // Create the RTPSWriter + RTPSWithRegistrationWriter writer(TEST_TOPIC_NAME, participant_writer); + writer.init(); + EXPECT_TRUE(writer.isInitialized()); + + // Wait for discovery + writer.wait_discovery(1, std::chrono::seconds(2)); + reader.wait_discovery(1, std::chrono::seconds(2)); + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.expected_data(samples); + reader.startReception(); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); + + /* Tear-down */ + eprosima::fastrtps::rtps::RTPSDomain::removeRTPSParticipant(participant_writer); +} + +/* Maximum number of bytes allowed for an RTPS datagram generated by this writer. */ +TEST(RTPS, max_output_message_size_writer) +{ + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + + auto testTransport = std::make_shared(); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + RTPSWithRegistrationWriter writer(TEST_TOPIC_NAME); + writer.add_property("fastdds.max_message_size", segment_size_str). + disable_builtin_transport().add_user_transport_to_pparams(testTransport).init(); + ASSERT_TRUE(writer.isInitialized()); + + RTPSWithRegistrationReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + reader.wait_discovery(); + + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.expected_data(samples); + reader.startReception(); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); + +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else