diff --git a/include/fastdds/rtps/writer/RTPSWriter.h b/include/fastdds/rtps/writer/RTPSWriter.h index fc4d1395504..38753d45ba2 100644 --- a/include/fastdds/rtps/writer/RTPSWriter.h +++ b/include/fastdds/rtps/writer/RTPSWriter.h @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -472,6 +473,8 @@ class RTPSWriter //! Flow controller. fastdds::rtps::FlowController* flow_controller_; + //! Maximum number of bytes allowed for an RTPS datagram generated by this writer. + uint32_t max_output_message_size_ = std::numeric_limits::max(); //!WriterHistory WriterHistory* mp_history = nullptr; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 471d3256d0f..09ad5114386 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -425,6 +425,22 @@ RTPSParticipantImpl::RTPSParticipantImpl( " It may is opened by another application. Discovery may fail."); } + { + const std::string* max_size_property = + PropertyPolicyHelper::find_property(m_att.properties, "fastdds.max_message_size"); + if (max_size_property != nullptr) + { + try + { + max_output_message_size_ = std::stoul(*max_size_property); + } + catch (const std::exception& e) + { + logError(RTPS_WRITER, "Error parsing max_message_size property: " << e.what()); + } + } + } + bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic; size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number; if (num_send_buffers == 0) @@ -2109,8 +2125,11 @@ uint32_t RTPSParticipantImpl::getMaxMessageSize() const #endif // if HAVE_SECURITY return (std::min)( - m_network_Factory.get_max_message_size_between_transports(), - max_receiver_buffer_size); + { + max_output_message_size_, + m_network_Factory.get_max_message_size_between_transports(), + max_receiver_buffer_size + }); } uint32_t RTPSParticipantImpl::getMaxDataSize() diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 5a06b0a7139..eb765390ad0 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -574,6 +575,8 @@ class RTPSParticipantImpl std::function type_check_fn_; //!Pool of send buffers std::unique_ptr send_buffers_; + //! Maximum number of bytes allowed for an RTPS datagram generated by this writer. + uint32_t max_output_message_size_ = std::numeric_limits::max(); /** * Client override flag: SIMPLE participant that has been overriden with the environment variable and transformed diff --git a/src/cpp/rtps/writer/RTPSWriter.cpp b/src/cpp/rtps/writer/RTPSWriter.cpp index 9c5f188efb0..a2e140bcaa7 100644 --- a/src/cpp/rtps/writer/RTPSWriter.cpp +++ b/src/cpp/rtps/writer/RTPSWriter.cpp @@ -17,24 +17,22 @@ * */ -#include - -#include -#include - -#include -#include +#include -#include +#include #include -#include - +#include #include - #include +#include +#include +#include +#include +#include + #include #include @@ -109,6 +107,22 @@ void RTPSWriter::init( const std::shared_ptr& change_pool, const WriterAttributes& att) { + { + const std::string* max_size_property = + PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.max_message_size"); + if (max_size_property != nullptr) + { + try + { + max_output_message_size_ = std::stoul(*max_size_property); + } + catch (const std::exception& e) + { + logError(RTPS_WRITER, "Error parsing max_message_size property: " << e.what()); + } + } + } + payload_pool_ = payload_pool; change_pool_ = change_pool; fixed_payload_size_ = 0; @@ -306,6 +320,10 @@ uint32_t RTPSWriter::getMaxDataSize() uint32_t flow_max = flow_controller_->get_max_payload(); uint32_t part_max = mp_RTPSParticipant->getMaxMessageSize(); uint32_t max_size = flow_max > part_max ? part_max : flow_max; + if (max_output_message_size_ < max_size) + { + max_size = max_output_message_size_; + } max_size = calculateMaxDataSize(max_size); return max_size &= ~3; diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index f9e6110a383..d387d796248 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -35,8 +35,12 @@ #include #include +#include + #include "BlackboxTests.hpp" #include "../types/HelloWorldPubSubTypes.h" +#include "PubSubReader.hpp" +#include "PubSubWriter.hpp" namespace eprosima { namespace fastdds { @@ -271,6 +275,97 @@ TEST(DDSBasic, MultithreadedReaderCreationDoesNotDeadlock) ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant)); } +/** + * @test Set the maximum number of bytes allowed for a datagram generated by a DomainParticipant. + */ +TEST(DDSBasic, max_output_message_size_participant) +{ + PubSubReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + auto testTransport = std::make_shared(); + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + + // Create the DomainParticipants with the appropriate value for the property + eprosima::fastrtps::rtps::PropertyPolicy property_policy; + property_policy.properties().emplace_back("fastdds.max_message_size", segment_size_str); + PubSubWriter writer(TEST_TOPIC_NAME); + writer.property_policy(property_policy).disable_builtin_transport() + .add_user_transport_to_pparams(testTransport).init(); + EXPECT_TRUE(writer.isInitialized()); + + // Wait for discovery + writer.wait_discovery(std::chrono::seconds(2)); + reader.wait_discovery(std::chrono::seconds(2)); + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.startReception(samples); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); +} + +/** + * @test Set the maximum number of bytes allowed for a datagram generated by a DataWriter. + */ +TEST(DDSBasic, max_output_message_size_writer) +{ + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + + auto testTransport = std::make_shared(); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + + // Create the DataWriter with the appropriate value for the property + eprosima::fastrtps::rtps::PropertyPolicy property_policy; + property_policy.properties().emplace_back("fastdds.max_message_size", segment_size_str); + PubSubWriter writer(TEST_TOPIC_NAME); + writer.entity_property_policy(property_policy).disable_builtin_transport() + .add_user_transport_to_pparams(testTransport).init(); + ASSERT_TRUE(writer.isInitialized()); + + PubSubReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(std::chrono::seconds(2)); + reader.wait_discovery(std::chrono::seconds(2)); + + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.startReception(samples); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); + +} + } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp index 7dd05a71e41..13b5f31a371 100644 --- a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp @@ -893,6 +893,92 @@ TEST(RTPS, RTPSCorrectGAPProcessing) } +/* Maximum number of bytes allowed for an RTPS datagram generated by this participant. */ +TEST(RTPS, max_output_message_size_participant) +{ + /* Set up */ + // Create the RTPSReader + RTPSWithRegistrationReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + // Create the RTPSWriter + RTPSWithRegistrationWriter writer(TEST_TOPIC_NAME); + // Create the RTPSParticipants with the appropriate value for the property + auto testTransport = std::make_shared(); + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + writer.add_user_transport_to_pparams(testTransport). + disable_builtin_transport(). + add_property("fastdds.max_message_size", segment_size_str). + init(); + EXPECT_TRUE(writer.isInitialized()); + + // Wait for discovery + writer.wait_discovery(std::chrono::seconds(2)); + reader.wait_discovery(std::chrono::seconds(2)); + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.expected_data(samples); + reader.startReception(); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); +} + +/* Maximum number of bytes allowed for an RTPS datagram generated by this writer. */ +TEST(RTPS, max_output_message_size_writer) +{ + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + + auto testTransport = std::make_shared(); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + RTPSWithRegistrationWriter writer(TEST_TOPIC_NAME); + writer.add_property("fastdds.max_message_size", segment_size_str). + disable_builtin_transport().add_user_transport_to_pparams(testTransport).init(); + ASSERT_TRUE(writer.isInitialized()); + + RTPSWithRegistrationReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + reader.wait_discovery(); + + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.expected_data(samples); + reader.startReception(); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); + +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else