From 9db08590eed128531b23b22a5d71a7fe442208d4 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 May 2024 07:45:22 +0200 Subject: [PATCH 1/3] Handle errors when setting socket buffer sizes (#4760) (#4796) * Refs #20972. Method socket_buffer_size in DDS_PIM helpers sets also sending buffer. Signed-off-by: Miguel Company * Refs #20972. Method socket_buffer_size in fastrtps_deprecated helpers sets also sending buffer. Signed-off-by: Miguel Company * Refs #20972. Improvements in on_sample_lost blackbox tests. Signed-off-by: Miguel Company * Refs #20972. Move code into new private methods. Signed-off-by: Miguel Company * Refs #20972. Refactor on configure_send_buffer_size. Signed-off-by: Miguel Company * Refs #20972. Refactor on configure_receive_buffer_size. Signed-off-by: Miguel Company * Refs #20972. Check user configuration at the beginning of init method. Signed-off-by: Miguel Company * Refs #20972. Use maxMessageSize as minimum possible value. Signed-off-by: Miguel Company * Refs #20972. Applying changes on OpenAndBindUnicastOutputSocket. Signed-off-by: Miguel Company * Refs #20972. Applying changes on CreateInputChannelResource. Signed-off-by: Miguel Company * Revert "Refs #20972. Applying changes on CreateInputChannelResource." This reverts commit ed848e9de267fcfdbe1cb7294b2e408d85a33575. * Refs #20972. Add helper header with template method. Signed-off-by: Miguel Company * Refs #20972. Configure methods return boolean. Signed-off-by: Miguel Company * Refs #20972. Configure methods use new template method. Signed-off-by: Miguel Company * Refs #20972. OpenAndBindUnicastOutputSocket uses new template method. Signed-off-by: Miguel Company * Refs #20972. Changes in OpenAndBindInputSocket. Signed-off-by: Miguel Company * Refs #20972.Setting options on TCP channels. Signed-off-by: Miguel Company * Refs #20972. Doxygen. Signed-off-by: Miguel Company * Refs #20972. Check limits of configured sizes. Signed-off-by: Miguel Company * Refs #20972. Add UDP unit tests. Signed-off-by: Miguel Company * Refs #20972. Add TCP unit tests. Signed-off-by: Miguel Company * Refs #20972. Move checks in TCP to beginning of init. Signed-off-by: Miguel Company * Refs #20972. Refactor for common code in UDP. Signed-off-by: Miguel Company * Refs #20972. Refactor for common code in TCP. Signed-off-by: Miguel Company * Refs #20972. Remove unused constants in UDP tests. Signed-off-by: Miguel Company * Refs #20972. Check final configuration on unit tests. Signed-off-by: Miguel Company * Refs #20972. Uncrustify. Signed-off-by: Miguel Company * Refs #20972. Less strict tests. Signed-off-by: Miguel Company * Refs #20972. Remove `s_minimumSocketBuffer` from tests. Signed-off-by: Miguel Company * Refs #20972. Deprecate `s_minimumSocketBuffer`. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company (cherry picked from commit 53cd211a8449bac59c13e18912ebd7a9262de51c) # Conflicts: # src/cpp/rtps/transport/TCPTransportInterface.cpp # src/cpp/rtps/transport/UDPTransportInterface.cpp # src/cpp/rtps/transport/UDPv4Transport.cpp # src/cpp/rtps/transport/UDPv6Transport.cpp # test/blackbox/common/DDSBlackboxTestsListeners.cpp --- .../rtps/transport/TransportInterface.h | 3 +- .../fastrtps/transport/TransportInterface.h | 3 +- src/cpp/rtps/transport/TCPChannelResource.cpp | 49 +++++- src/cpp/rtps/transport/TCPChannelResource.h | 10 ++ .../transport/TCPChannelResourceBasic.cpp | 4 +- .../transport/TCPChannelResourceSecure.cpp | 4 +- .../rtps/transport/TCPTransportInterface.cpp | 64 +++++++- .../rtps/transport/UDPTransportInterface.cpp | 112 ++++++++++--- .../rtps/transport/UDPTransportInterface.h | 1 + src/cpp/rtps/transport/UDPv4Transport.cpp | 22 ++- src/cpp/rtps/transport/UDPv6Transport.cpp | 22 ++- src/cpp/rtps/transport/asio_helpers.hpp | 155 ++++++++++++++++++ test/blackbox/api/dds-pim/PubSubReader.hpp | 1 + test/blackbox/api/dds-pim/PubSubWriter.hpp | 1 + .../api/fastrtps_deprecated/PubSubReader.hpp | 1 + .../api/fastrtps_deprecated/PubSubWriter.hpp | 1 + .../common/DDSBlackboxTestsListeners.cpp | 21 +++ test/unittest/transport/TCPv4Tests.cpp | 73 ++++++++- test/unittest/transport/TCPv6Tests.cpp | 71 +++++++- test/unittest/transport/UDPv4Tests.cpp | 74 ++++++++- test/unittest/transport/UDPv6Tests.cpp | 73 ++++++++- 21 files changed, 716 insertions(+), 49 deletions(-) create mode 100644 src/cpp/rtps/transport/asio_helpers.hpp diff --git a/include/fastdds/rtps/transport/TransportInterface.h b/include/fastdds/rtps/transport/TransportInterface.h index 60dec2d299f..26d713b9c50 100644 --- a/include/fastdds/rtps/transport/TransportInterface.h +++ b/include/fastdds/rtps/transport/TransportInterface.h @@ -34,7 +34,8 @@ namespace rtps { constexpr uint32_t s_maximumMessageSize = 65500; //! Default maximum initial peers range constexpr uint32_t s_maximumInitialPeersRange = 4; -//! Default minimum socket buffer +// Default minimum socket buffer +FASTDDS_DEPRECATED_UNTIL(3, s_minimumSocketBuffer, "Minimum socket buffer is now taken from the maximum msg size") constexpr uint32_t s_minimumSocketBuffer = 65536; //! Default IPv4 address static const std::string s_IPv4AddressAny = "0.0.0.0"; diff --git a/include/fastrtps/transport/TransportInterface.h b/include/fastrtps/transport/TransportInterface.h index 9f0a5f88760..22c1386803c 100644 --- a/include/fastrtps/transport/TransportInterface.h +++ b/include/fastrtps/transport/TransportInterface.h @@ -29,7 +29,8 @@ using TransportInterface = fastdds::rtps::TransportInterface; static const uint32_t s_maximumMessageSize = fastdds::rtps::s_maximumMessageSize; static const uint32_t s_maximumInitialPeersRange = fastdds::rtps::s_maximumInitialPeersRange; -static const uint32_t s_minimumSocketBuffer = fastdds::rtps::s_minimumSocketBuffer; +FASTDDS_DEPRECATED_UNTIL(3, s_minimumSocketBuffer, "Minimum socket buffer is now taken from the maximum msg size") +static const uint32_t s_minimumSocketBuffer = 65536; static const std::string s_IPv4AddressAny = fastdds::rtps::s_IPv4AddressAny; static const std::string s_IPv6AddressAny = fastdds::rtps::s_IPv6AddressAny; diff --git a/src/cpp/rtps/transport/TCPChannelResource.cpp b/src/cpp/rtps/transport/TCPChannelResource.cpp index efc7a6903b9..2fbaa3118c0 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.cpp +++ b/src/cpp/rtps/transport/TCPChannelResource.cpp @@ -18,6 +18,7 @@ #include #include +#include #include namespace eprosima { @@ -370,6 +371,52 @@ bool TCPChannelResource::check_socket_send_buffer( return true; } +void TCPChannelResource::set_socket_options( + asio::basic_socket& socket, + const TCPTransportDescriptor* options) +{ + uint32_t minimum_value = options->maxMessageSize; + + // Set the send buffer size + { + uint32_t desired_value = options->sendBufferSize; + uint32_t configured_value = 0; + if (!asio_helpers::try_setting_buffer_size( + socket, desired_value, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TCP_TRANSPORT, + "Couldn't set send buffer size to minimum value: " << minimum_value); + } + else if (desired_value != configured_value) + { + EPROSIMA_LOG_WARNING(TCP_TRANSPORT, + "Couldn't set send buffer size to desired value. " + << "Using " << configured_value << " instead of " << desired_value); + } + } + + // Set the receive buffer size + { + uint32_t desired_value = options->receiveBufferSize; + uint32_t configured_value = 0; + if (!asio_helpers::try_setting_buffer_size( + socket, desired_value, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TCP_TRANSPORT, + "Couldn't set receive buffer size to minimum value: " << minimum_value); + } + else if (desired_value != configured_value) + { + EPROSIMA_LOG_WARNING(TCP_TRANSPORT, + "Couldn't set receive buffer size to desired value. " + << "Using " << configured_value << " instead of " << desired_value); + } + } + + // Set the TCP_NODELAY option + socket.set_option(asio::ip::tcp::no_delay(options->enable_tcp_nodelay)); +} + } // namespace rtps -} // namespace fastrtps +} // namespace fastdds } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPChannelResource.h b/src/cpp/rtps/transport/TCPChannelResource.h index c0a9e97f7bb..589ae6221e7 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.h +++ b/src/cpp/rtps/transport/TCPChannelResource.h @@ -234,6 +234,16 @@ class TCPChannelResource : public ChannelResource const size_t& msg_size, const asio::ip::tcp::socket::native_handle_type& socket_native_handle); + /** + * @brief Set descriptor options on a socket. + * + * @param socket Socket on which to set the options. + * @param options Descriptor with the options to set. + */ + static void set_socket_options( + asio::basic_socket& socket, + const TCPTransportDescriptor* options); + TCPConnectionType tcp_connection_type_; friend class TCPTransportInterface; diff --git a/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp b/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp index 51a28913c19..7e3f53789e5 100644 --- a/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp +++ b/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp @@ -200,9 +200,7 @@ asio::ip::tcp::endpoint TCPChannelResourceBasic::local_endpoint( void TCPChannelResourceBasic::set_options( const TCPTransportDescriptor* options) { - socket_->set_option(socket_base::receive_buffer_size(options->receiveBufferSize)); - socket_->set_option(socket_base::send_buffer_size(options->sendBufferSize)); - socket_->set_option(ip::tcp::no_delay(options->enable_tcp_nodelay)); + TCPChannelResource::set_socket_options(*socket_, options); } void TCPChannelResourceBasic::cancel() diff --git a/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp b/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp index deddd7ffd31..7e424315374 100644 --- a/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp +++ b/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp @@ -280,9 +280,7 @@ asio::ip::tcp::endpoint TCPChannelResourceSecure::local_endpoint( void TCPChannelResourceSecure::set_options( const TCPTransportDescriptor* options) { - secure_socket_->lowest_layer().set_option(socket_base::receive_buffer_size(options->receiveBufferSize)); - secure_socket_->lowest_layer().set_option(socket_base::send_buffer_size(options->sendBufferSize)); - secure_socket_->lowest_layer().set_option(ip::tcp::no_delay(options->enable_tcp_nodelay)); + TCPChannelResource::set_socket_options(secure_socket_->lowest_layer(), options); } void TCPChannelResourceSecure::set_tls_verify_mode( diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index b550d9deceb..6e2bd10c2f9 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -18,10 +18,11 @@ #include #include #include +#include #include -#include #include #include +#include #include #include #include @@ -53,6 +54,7 @@ #include #include +#include #include #include #include @@ -442,6 +444,42 @@ bool TCPTransportInterface::DoInputLocatorsMatch( bool TCPTransportInterface::init( const fastrtps::rtps::PropertyPolicy* properties) { + uint32_t maximumMessageSize = max_msg_size_no_frag == 0 ? s_maximumMessageSize : max_msg_size_no_frag; + uint32_t cfg_max_msg_size = configuration()->maxMessageSize; + uint32_t cfg_send_size = configuration()->sendBufferSize; + uint32_t cfg_recv_size = configuration()->receiveBufferSize; + uint32_t max_int_value = static_cast(std::numeric_limits::max()); + + if (cfg_max_msg_size > maximumMessageSize) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than " << maximumMessageSize); + return false; + } + + if (cfg_send_size > max_int_value) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "sendBufferSize cannot be greater than " << max_int_value); + return false; + } + + if (cfg_recv_size > max_int_value) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "receiveBufferSize cannot be greater than " << max_int_value); + return false; + } + + if ((cfg_send_size > 0) && (cfg_max_msg_size > cfg_send_size)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than sendBufferSize"); + return false; + } + + if ((cfg_recv_size > 0) && (cfg_max_msg_size > cfg_recv_size)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than receiveBufferSize"); + return false; + } + if (!apply_tls_config()) { // TODO decide wether the Transport initialization should keep working after this error @@ -474,8 +512,12 @@ bool TCPTransportInterface::init( } // Check system buffer sizes. - if (configuration()->sendBufferSize == 0) + uint32_t send_size = 0; + uint32_t recv_size = 0; + if (!asio_helpers::configure_buffer_sizes( + *initial_peer_local_locator_socket_, *configuration(), send_size, recv_size)) { +<<<<<<< HEAD socket_base::send_buffer_size option; initial_peer_local_locator_socket_->get_option(option); set_send_buffer_size(option.value()); @@ -501,21 +543,27 @@ bool TCPTransportInterface::init( if (configuration()->maxMessageSize > s_maximumMessageSize) { EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than 65000"); +======= + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "Couldn't set buffer sizes to minimum value: " << cfg_max_msg_size); +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) return false; } - if (configuration()->maxMessageSize > configuration()->sendBufferSize) + if (cfg_send_size > 0 && send_size != cfg_send_size) { - EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than send_buffer_size"); - return false; + EPROSIMA_LOG_WARNING(TRANSPORT_TCP, "UDPTransport sendBufferSize could not be set to the desired value. " + << "Using " << send_size << " instead of " << cfg_send_size); } - if (configuration()->maxMessageSize > configuration()->receiveBufferSize) + if (cfg_recv_size > 0 && recv_size != cfg_recv_size) { - EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than receive_buffer_size"); - return false; + EPROSIMA_LOG_WARNING(TRANSPORT_TCP, "UDPTransport receiveBufferSize could not be set to the desired value. " + << "Using " << recv_size << " instead of " << cfg_recv_size); } + set_send_buffer_size(send_size); + set_receive_buffer_size(recv_size); + if (!rtcp_message_manager_) { rtcp_message_manager_ = std::make_shared(this); diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 5c5feaa400c..6290505a2fe 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -14,15 +14,18 @@ #include -#include -#include #include #include +#include +#include +#include +#include #include #include -#include #include + +#include #include #include @@ -119,6 +122,7 @@ bool UDPTransportInterface::DoInputLocatorsMatch( bool UDPTransportInterface::init( const fastrtps::rtps::PropertyPolicy*) { +<<<<<<< HEAD if (configuration()->sendBufferSize == 0 || configuration()->receiveBufferSize == 0) { // Check system buffer sizes. @@ -155,25 +159,84 @@ bool UDPTransportInterface::init( if (configuration()->maxMessageSize > s_maximumMessageSize) { EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than 65000"); +======= + uint32_t maximumMessageSize = max_msg_size_no_frag == 0 ? s_maximumMessageSize : max_msg_size_no_frag; + uint32_t cfg_max_msg_size = configuration()->maxMessageSize; + uint32_t cfg_send_size = configuration()->sendBufferSize; + uint32_t cfg_recv_size = configuration()->receiveBufferSize; + uint32_t max_int_value = static_cast(std::numeric_limits::max()); + + if (cfg_max_msg_size > maximumMessageSize) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than " << maximumMessageSize); +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) return false; } - if (configuration()->maxMessageSize > configuration()->sendBufferSize) + if (cfg_send_size > max_int_value) { - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than send_buffer_size"); + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "sendBufferSize cannot be greater than " << max_int_value); return false; } - if (configuration()->maxMessageSize > configuration()->receiveBufferSize) + if (cfg_recv_size > max_int_value) { - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than receive_buffer_size"); + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "receiveBufferSize cannot be greater than " << max_int_value); return false; } +<<<<<<< HEAD // TODO(Ricardo) Create an event that update this list. get_ips(currentInterfaces); return true; +======= + if ((cfg_send_size > 0) && (cfg_max_msg_size > cfg_send_size)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than sendBufferSize"); + return false; + } + + if ((cfg_recv_size > 0) && (cfg_max_msg_size > cfg_recv_size)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than receiveBufferSize"); + return false; + } + + asio::error_code ec; + ip::udp::socket socket(io_service_); + socket.open(generate_protocol(), ec); + if (!!ec) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "Error creating socket: " << ec.message()); + return false; + } + + bool ret = asio_helpers::configure_buffer_sizes(socket, *configuration(), mSendBufferSize, mReceiveBufferSize); + if (ret) + { + if (cfg_send_size > 0 && mSendBufferSize != cfg_send_size) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport sendBufferSize could not be set to the desired value. " + << "Using " << mSendBufferSize << " instead of " << cfg_send_size); + } + + if (cfg_recv_size > 0 && mReceiveBufferSize != cfg_recv_size) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport receiveBufferSize could not be set to the desired value. " + << "Using " << mReceiveBufferSize << " instead of " << cfg_recv_size); + } + + set_send_buffer_size(mSendBufferSize); + set_receive_buffer_size(mReceiveBufferSize); + } + else + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "Couldn't set buffer sizes to minimum value: " << cfg_max_msg_size); + } + + return ret; +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) } bool UDPTransportInterface::IsInputChannelOpen( @@ -211,9 +274,8 @@ bool UDPTransportInterface::OpenAndBindInputSockets( catch (asio::system_error const& e) { (void)e; - EPROSIMA_LOG_INFO(RTPS_MSG_OUT, "UDPTransport Error binding at port: (" << IPLocator::getPhysicalPort( - locator) << ")" - << " with msg: " << e.what()); + EPROSIMA_LOG_INFO(TRANSPORT_UDP, "UDPTransport Error binding at port: (" + << IPLocator::getPhysicalPort(locator) << ")" << " with msg: " << e.what()); mInputSockets.erase(IPLocator::getPhysicalPort(locator)); return false; } @@ -243,7 +305,18 @@ eProsimaUDPSocket UDPTransportInterface::OpenAndBindUnicastOutputSocket( getSocketPtr(socket)->open(generate_protocol()); if (mSendBufferSize != 0) { - getSocketPtr(socket)->set_option(socket_base::send_buffer_size(static_cast(mSendBufferSize))); + uint32_t configured_value = 0; + if (!asio_helpers::try_setting_buffer_size( + socket, mSendBufferSize, configuration()->maxMessageSize, configured_value)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, + "Couldn't set send buffer size to minimum value: " << configuration()->maxMessageSize); + } + else if (configured_value != mSendBufferSize) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport sendBufferSize could not be set to the desired value. " + << "Using " << configured_value << " instead of " << mSendBufferSize); + } } getSocketPtr(socket)->set_option(ip::multicast::hops(configuration()->TTL)); getSocketPtr(socket)->bind(endpoint); @@ -301,7 +374,7 @@ bool UDPTransportInterface::OpenOutputChannel( catch (asio::system_error const& e) { (void)e; - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "UDPTransport Error binding interface " + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport Error binding interface " << localhost_name() << " (skipping) with msg: " << e.what()); } } @@ -325,7 +398,7 @@ bool UDPTransportInterface::OpenOutputChannel( catch (asio::system_error const& e) { (void)e; - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "UDPTransport Error binding interface " + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport Error binding interface " << (*locIt).name << " (skipping) with msg: " << e.what()); } } @@ -357,7 +430,7 @@ bool UDPTransportInterface::OpenOutputChannel( { (void)e; /* TODO Que hacer? - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "UDPTransport Error binding at port: (" << IPLocator::getPhysicalPort(locator) << ")" + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "UDPTransport Error binding at port: (" << IPLocator::getPhysicalPort(locator) << ")" << " with msg: " << e.what()); for (auto& socket : mOutputSockets) { @@ -515,23 +588,24 @@ bool UDPTransportInterface::send( if ((ec.value() == asio::error::would_block) || (ec.value() == asio::error::try_again)) { - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "UDP send would have blocked. Packet is dropped."); + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDP send would have blocked. Packet is dropped."); return true; } - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, ec.message()); + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, ec.message()); return false; } } catch (const std::exception& error) { - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, error.what()); + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, error.what()); return false; } (void)bytesSent; - EPROSIMA_LOG_INFO(RTPS_MSG_OUT, "UDPTransport: " << bytesSent << " bytes TO endpoint: " << destinationEndpoint - << " FROM " << getSocketPtr(socket)->local_endpoint()); + EPROSIMA_LOG_INFO(TRANSPORT_UDP, + "UDPTransport: " << bytesSent << " bytes TO endpoint: " << destinationEndpoint << + " FROM " << getSocketPtr(socket)->local_endpoint()); success = true; } diff --git a/src/cpp/rtps/transport/UDPTransportInterface.h b/src/cpp/rtps/transport/UDPTransportInterface.h index bbeeafed885..2e4585e9c9d 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.h +++ b/src/cpp/rtps/transport/UDPTransportInterface.h @@ -292,6 +292,7 @@ class UDPTransportInterface : public TransportInterface bool return_loopback = false); std::atomic_bool rescan_interfaces_ = {true}; + }; } // namespace rtps diff --git a/src/cpp/rtps/transport/UDPv4Transport.cpp b/src/cpp/rtps/transport/UDPv4Transport.cpp index 2d1b1e0e4e8..fc5be0411bc 100644 --- a/src/cpp/rtps/transport/UDPv4Transport.cpp +++ b/src/cpp/rtps/transport/UDPv4Transport.cpp @@ -26,6 +26,13 @@ #include #include +<<<<<<< HEAD +======= +#include +#include + +#include +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) using namespace std; using namespace asio; @@ -297,7 +304,20 @@ eProsimaUDPSocket UDPv4Transport::OpenAndBindInputSocket( getSocketPtr(socket)->open(generate_protocol()); if (mReceiveBufferSize != 0) { - getSocketPtr(socket)->set_option(socket_base::receive_buffer_size(mReceiveBufferSize)); + uint32_t configured_value = 0; + uint32_t minimum_value = configuration()->maxMessageSize; + if (!asio_helpers::try_setting_buffer_size( + socket, mReceiveBufferSize, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDPV4, + "Couldn't set receive buffer size to minimum value: " << minimum_value); + } + else if (mReceiveBufferSize != configured_value) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDPV4, + "Receive buffer size could not be set to the desired value. " + << "Using " << configured_value << " instead of " << mReceiveBufferSize); + } } if (is_multicast) diff --git a/src/cpp/rtps/transport/UDPv6Transport.cpp b/src/cpp/rtps/transport/UDPv6Transport.cpp index 385ee12e524..60191afd7ee 100644 --- a/src/cpp/rtps/transport/UDPv6Transport.cpp +++ b/src/cpp/rtps/transport/UDPv6Transport.cpp @@ -23,6 +23,13 @@ #include #include #include +<<<<<<< HEAD +======= + +#include +#include +#include +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) using namespace std; using namespace asio; @@ -301,7 +308,20 @@ eProsimaUDPSocket UDPv6Transport::OpenAndBindInputSocket( getSocketPtr(socket)->open(generate_protocol()); if (mReceiveBufferSize != 0) { - getSocketPtr(socket)->set_option(socket_base::receive_buffer_size(mReceiveBufferSize)); + uint32_t configured_value = 0; + uint32_t minimum_value = configuration()->maxMessageSize; + if (!asio_helpers::asio_helpers::try_setting_buffer_size( + socket, mReceiveBufferSize, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDPV6, + "Couldn't set receive buffer size to minimum value: " << minimum_value); + } + else if (mReceiveBufferSize != configured_value) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDPV6, + "Receive buffer size could not be set to the desired value. " + << "Using " << configured_value << " instead of " << mReceiveBufferSize); + } } if (is_multicast) diff --git a/src/cpp/rtps/transport/asio_helpers.hpp b/src/cpp/rtps/transport/asio_helpers.hpp new file mode 100644 index 00000000000..f9ba81b84e0 --- /dev/null +++ b/src/cpp/rtps/transport/asio_helpers.hpp @@ -0,0 +1,155 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RTPS_TRANSPORT__ASIO_HELPERS_HPP_ +#define RTPS_TRANSPORT__ASIO_HELPERS_HPP_ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/// Helper functions for asio. +// NOTE: using a struct instead of a namespace to avoid linker errors when using inline free functions. +struct asio_helpers +{ + /** + * @brief Try to set a buffer size on a socket, trying to set the initial value and then halving it until it is + * possible to set it or the minimum value is reached. + * + * @tparam BufferOptionType Type of the buffer option to set. + * @tparam SocketType Type of socket on which to set the buffer size option. + * + * @param socket Socket on which to set the buffer size option. + * @param initial_buffer_value Initial value to try to set. + * @param minimum_buffer_value Minimum value to set. + * @param final_buffer_value Output parameter where the final value set will be stored. + * + * @return true if the buffer size was successfully set, false otherwise. + */ + template + static inline bool try_setting_buffer_size( + SocketType& socket, + const uint32_t initial_buffer_value, + const uint32_t minimum_buffer_value, + uint32_t& final_buffer_value) + { + asio::error_code ec; + + final_buffer_value = initial_buffer_value; + while (final_buffer_value >= minimum_buffer_value) + { + socket.set_option(BufferOptionType(static_cast(final_buffer_value)), ec); + if (!ec) + { + return true; + } + + final_buffer_value /= 2; + } + + final_buffer_value = minimum_buffer_value; + socket.set_option(BufferOptionType(final_buffer_value), ec); + return !ec; + } + + /** + * @brief Configure a buffer size on a socket, using the system default value if the initial value is 0. + * Ensures that the final buffer size is at least the minimum value. + * + * @tparam BufferOptionType Type of the buffer option to set. + * @tparam SocketType Type of socket on which to set the buffer size option. + * + * @param socket Socket on which to set the buffer size option. + * @param initial_buffer_value Initial value to try to set. + * @param minimum_buffer_value Minimum value to set. + * @param final_buffer_value Output parameter where the final value set will be stored. + * + * @return true if the buffer size was successfully set, false otherwise. + */ + template + static inline bool configure_buffer_size( + SocketType& socket, + const uint32_t initial_buffer_value, + const uint32_t minimum_buffer_value, + uint32_t& final_buffer_value) + { + final_buffer_value = initial_buffer_value; + + // If the initial value is 0, try using the system default value + if (initial_buffer_value == 0) + { + asio::error_code ec; + BufferOptionType option; + socket.get_option(option, ec); + if (!ec) + { + final_buffer_value = option.value(); + } + } + + // Ensure the minimum value is used + if (final_buffer_value < minimum_buffer_value) + { + final_buffer_value = minimum_buffer_value; + } + + // Try to set the highest possible value the system allows + return try_setting_buffer_size(socket, final_buffer_value, minimum_buffer_value, + final_buffer_value); + } + + /** + * @brief Configure the send and receive buffer sizes on a socket, using the system default value if the initial + * values are 0. Ensures that the final buffer sizes are at least the minimum value. + * + * @tparam SocketType Type of socket on which to set the buffer size options. + * + * @param socket Socket on which to set the buffer size options. + * @param descriptor Transport descriptor with the buffer sizes to set. + * @param final_send_buffer_size Output parameter where the final send buffer size will be stored. + * @param final_receive_buffer_size Output parameter where the final receive buffer size will be stored. + * + * @return true if the buffer sizes were successfully set, false otherwise. + */ + template + static inline bool configure_buffer_sizes( + SocketType& socket, + const SocketTransportDescriptor& descriptor, + uint32_t& final_send_buffer_size, + uint32_t& final_receive_buffer_size) + { + uint32_t minimum_socket_buffer = descriptor.maxMessageSize; + uint32_t send_buffer_size = descriptor.sendBufferSize; + uint32_t receive_buffer_size = descriptor.receiveBufferSize; + + bool send_buffer_size_set = configure_buffer_size( + socket, send_buffer_size, minimum_socket_buffer, final_send_buffer_size); + bool receive_buffer_size_set = configure_buffer_size( + socket, receive_buffer_size, minimum_socket_buffer, final_receive_buffer_size); + + return send_buffer_size_set && receive_buffer_size_set; + } + +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // RTPS_TRANSPORT__ASIO_HELPERS_HPP_ + diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 699f2ab3ef0..fb0945f4e62 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -1302,6 +1302,7 @@ class PubSubReader uint32_t sockerBufferSize) { participant_qos_.transport().listen_socket_buffer_size = sockerBufferSize; + participant_qos_.transport().send_socket_buffer_size = sockerBufferSize; return *this; } diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 5a37df54978..0e358e4fc2b 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -1485,6 +1485,7 @@ class PubSubWriter uint32_t sockerBufferSize) { participant_qos_.transport().listen_socket_buffer_size = sockerBufferSize; + participant_qos_.transport().send_socket_buffer_size = sockerBufferSize; return *this; } diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 3031ade192f..be99ec69032 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -1069,6 +1069,7 @@ class PubSubReader uint32_t sockerBufferSize) { participant_attr_.rtps.listenSocketBufferSize = sockerBufferSize; + participant_attr_.rtps.sendSocketBufferSize = sockerBufferSize; return *this; } diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp index c5ee2a1bd7a..1fdd2beb514 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp @@ -1249,6 +1249,7 @@ class PubSubWriter uint32_t sockerBufferSize) { participant_attr_.rtps.listenSocketBufferSize = sockerBufferSize; + participant_attr_.rtps.sendSocketBufferSize = sockerBufferSize; return *this; } diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 1d7c40933bb..d443811747b 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -674,10 +674,25 @@ TEST_P(DDSStatus, DataAvailableConditions) subscriber_reader.wait_waitset_timeout(); } +<<<<<<< HEAD +======= +// We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. +// Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any +// other possible loss. +static constexpr uint32_t SAMPLE_LOST_TEST_BUFFER_SIZE = + 300ul * 1024ul // sample size + * 13ul // number of samples + * 2ul; // 2x to avoid any possible loss + +template +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) void sample_lost_test_dw_init( PubSubWriter& writer) { auto testTransport = std::make_shared(); + testTransport->sendBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; + testTransport->receiveBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; + testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool { uint32_t old_pos = msg.pos; @@ -736,6 +751,12 @@ void sample_lost_test_init( PubSubWriter& writer, std::function functor) { +<<<<<<< HEAD +======= + reader.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); + writer.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); + +>>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) sample_lost_test_dw_init(writer); sample_lost_test_dr_init(reader, functor); diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 36ee6f139ec..cfb9064ec10 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -82,6 +83,74 @@ class TCPv4Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(TCPv4Tests, wrong_configuration_values) +{ + // Too big sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(TCPv4Tests, locators_with_kind_1_supported) { // Given @@ -1334,7 +1403,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); uint16_t port = g_default_port; - uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; + uint32_t msg_size = 64ul * 1024ul; // Create a TCP Server transport using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; @@ -1910,7 +1979,7 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) TEST_F(TCPv4Tests, non_blocking_send) { uint16_t port = g_default_port; - uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; + uint32_t msg_size = 64ul * 1024ul; // Create a TCP Server transport TCPv4TransportDescriptor senderDescriptor; senderDescriptor.add_listener_port(port); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index fa9791f8bd2..4cad23e28e7 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -83,6 +84,74 @@ class TCPv6Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(TCPv6Tests, wrong_configuration_values) +{ + // Too big sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(TCPv6Tests, conversion_to_ip6_string) { Locator_t locator; @@ -323,7 +392,7 @@ TEST_F(TCPv6Tests, client_announced_local_port_uniqueness) TEST_F(TCPv6Tests, non_blocking_send) { uint16_t port = g_default_port; - uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; + uint32_t msg_size = 64ul * 1024ul; // Create a TCP Server transport TCPv6TransportDescriptor senderDescriptor; senderDescriptor.add_listener_port(port); diff --git a/test/unittest/transport/UDPv4Tests.cpp b/test/unittest/transport/UDPv4Tests.cpp index 430e58feabf..3f24c0a034b 100644 --- a/test/unittest/transport/UDPv4Tests.cpp +++ b/test/unittest/transport/UDPv4Tests.cpp @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include +#include #include #include #include @@ -30,10 +32,6 @@ using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; using UDPv4Transport = eprosima::fastdds::rtps::UDPv4Transport; -#ifndef __APPLE__ -const uint32_t ReceiveBufferCapacity = 65536; -#endif // ifndef __APPLE__ - #if defined(_WIN32) #define GET_PID _getpid #else @@ -75,6 +73,74 @@ class UDPv4Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(UDPv4Tests, wrong_configuration) +{ + // Too big sendBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(UDPv4Tests, locators_with_kind_1_supported) { // Given diff --git a/test/unittest/transport/UDPv6Tests.cpp b/test/unittest/transport/UDPv6Tests.cpp index 805518934c5..6009c7b593c 100644 --- a/test/unittest/transport/UDPv6Tests.cpp +++ b/test/unittest/transport/UDPv6Tests.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -30,10 +31,6 @@ using namespace eprosima::fastrtps::rtps; using namespace eprosima::fastrtps; using UDPv6Transport = eprosima::fastdds::rtps::UDPv6Transport; -#ifndef __APPLE__ -const uint32_t ReceiveBufferCapacity = 65536; -#endif // ifndef __APPLE__ - #if defined(_WIN32) #define GET_PID _getpid #else @@ -83,6 +80,74 @@ class UDPv6Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(UDPv6Tests, wrong_configuration) +{ + // Too big sendBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(UDPv6Tests, conversion_to_ip6_string) { Locator_t locator; From 17e304100362924de17583573b4b9f4860d879aa Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 May 2024 10:16:18 +0200 Subject: [PATCH 2/3] Refs #21036. Fix conflicts. Signed-off-by: Miguel Company --- .../rtps/transport/TCPTransportInterface.cpp | 28 ----------- .../rtps/transport/UDPTransportInterface.cpp | 49 ++----------------- src/cpp/rtps/transport/UDPv4Transport.cpp | 6 --- src/cpp/rtps/transport/UDPv6Transport.cpp | 5 -- .../common/DDSBlackboxTestsListeners.cpp | 21 -------- 5 files changed, 3 insertions(+), 106 deletions(-) diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 6e2bd10c2f9..7a6a54c6e7e 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -517,35 +517,7 @@ bool TCPTransportInterface::init( if (!asio_helpers::configure_buffer_sizes( *initial_peer_local_locator_socket_, *configuration(), send_size, recv_size)) { -<<<<<<< HEAD - socket_base::send_buffer_size option; - initial_peer_local_locator_socket_->get_option(option); - set_send_buffer_size(option.value()); - - if (configuration()->sendBufferSize < s_minimumSocketBuffer) - { - set_send_buffer_size(s_minimumSocketBuffer); - } - } - - if (configuration()->receiveBufferSize == 0) - { - socket_base::receive_buffer_size option; - initial_peer_local_locator_socket_->get_option(option); - set_receive_buffer_size(option.value()); - - if (configuration()->receiveBufferSize < s_minimumSocketBuffer) - { - set_receive_buffer_size(s_minimumSocketBuffer); - } - } - - if (configuration()->maxMessageSize > s_maximumMessageSize) - { - EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than 65000"); -======= EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "Couldn't set buffer sizes to minimum value: " << cfg_max_msg_size); ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) return false; } diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 6290505a2fe..41d1f75e820 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -122,44 +122,6 @@ bool UDPTransportInterface::DoInputLocatorsMatch( bool UDPTransportInterface::init( const fastrtps::rtps::PropertyPolicy*) { -<<<<<<< HEAD - if (configuration()->sendBufferSize == 0 || configuration()->receiveBufferSize == 0) - { - // Check system buffer sizes. - ip::udp::socket socket(io_service_); - socket.open(generate_protocol()); - - if (configuration()->sendBufferSize == 0) - { - socket_base::send_buffer_size option; - socket.get_option(option); - set_send_buffer_size(static_cast(option.value())); - - if (configuration()->sendBufferSize < s_minimumSocketBuffer) - { - set_send_buffer_size(s_minimumSocketBuffer); - mSendBufferSize = s_minimumSocketBuffer; - } - } - - if (configuration()->receiveBufferSize == 0) - { - socket_base::receive_buffer_size option; - socket.get_option(option); - set_receive_buffer_size(static_cast(option.value())); - - if (configuration()->receiveBufferSize < s_minimumSocketBuffer) - { - set_receive_buffer_size(s_minimumSocketBuffer); - mReceiveBufferSize = s_minimumSocketBuffer; - } - } - } - - if (configuration()->maxMessageSize > s_maximumMessageSize) - { - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than 65000"); -======= uint32_t maximumMessageSize = max_msg_size_no_frag == 0 ? s_maximumMessageSize : max_msg_size_no_frag; uint32_t cfg_max_msg_size = configuration()->maxMessageSize; uint32_t cfg_send_size = configuration()->sendBufferSize; @@ -169,7 +131,6 @@ bool UDPTransportInterface::init( if (cfg_max_msg_size > maximumMessageSize) { EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than " << maximumMessageSize); ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) return false; } @@ -185,12 +146,6 @@ bool UDPTransportInterface::init( return false; } -<<<<<<< HEAD - // TODO(Ricardo) Create an event that update this list. - get_ips(currentInterfaces); - - return true; -======= if ((cfg_send_size > 0) && (cfg_max_msg_size > cfg_send_size)) { EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than sendBufferSize"); @@ -203,6 +158,9 @@ bool UDPTransportInterface::init( return false; } + // TODO(Ricardo) Create an event that update this list. + get_ips(currentInterfaces); + asio::error_code ec; ip::udp::socket socket(io_service_); socket.open(generate_protocol(), ec); @@ -236,7 +194,6 @@ bool UDPTransportInterface::init( } return ret; ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) } bool UDPTransportInterface::IsInputChannelOpen( diff --git a/src/cpp/rtps/transport/UDPv4Transport.cpp b/src/cpp/rtps/transport/UDPv4Transport.cpp index fc5be0411bc..1f4f0416d90 100644 --- a/src/cpp/rtps/transport/UDPv4Transport.cpp +++ b/src/cpp/rtps/transport/UDPv4Transport.cpp @@ -26,14 +26,8 @@ #include #include -<<<<<<< HEAD -======= -#include #include -#include ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) - using namespace std; using namespace asio; diff --git a/src/cpp/rtps/transport/UDPv6Transport.cpp b/src/cpp/rtps/transport/UDPv6Transport.cpp index 60191afd7ee..c825a180be5 100644 --- a/src/cpp/rtps/transport/UDPv6Transport.cpp +++ b/src/cpp/rtps/transport/UDPv6Transport.cpp @@ -23,13 +23,8 @@ #include #include #include -<<<<<<< HEAD -======= -#include #include -#include ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) using namespace std; using namespace asio; diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index d443811747b..1d7c40933bb 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -674,25 +674,10 @@ TEST_P(DDSStatus, DataAvailableConditions) subscriber_reader.wait_waitset_timeout(); } -<<<<<<< HEAD -======= -// We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. -// Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any -// other possible loss. -static constexpr uint32_t SAMPLE_LOST_TEST_BUFFER_SIZE = - 300ul * 1024ul // sample size - * 13ul // number of samples - * 2ul; // 2x to avoid any possible loss - -template ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) void sample_lost_test_dw_init( PubSubWriter& writer) { auto testTransport = std::make_shared(); - testTransport->sendBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; - testTransport->receiveBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; - testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool { uint32_t old_pos = msg.pos; @@ -751,12 +736,6 @@ void sample_lost_test_init( PubSubWriter& writer, std::function functor) { -<<<<<<< HEAD -======= - reader.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); - writer.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); - ->>>>>>> 53cd211a8 (Handle errors when setting socket buffer sizes (#4760) (#4796)) sample_lost_test_dw_init(writer); sample_lost_test_dr_init(reader, functor); From a4b492892f7977d68419b6ad63e3c00397f1707d Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 24 May 2024 08:48:52 +0200 Subject: [PATCH 3/3] Refs #21036. Update for non-backported changes Signed-off-by: Miguel Company --- src/cpp/rtps/transport/TCPTransportInterface.cpp | 2 +- src/cpp/rtps/transport/UDPTransportInterface.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 7a6a54c6e7e..de753416c22 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -444,7 +444,7 @@ bool TCPTransportInterface::DoInputLocatorsMatch( bool TCPTransportInterface::init( const fastrtps::rtps::PropertyPolicy* properties) { - uint32_t maximumMessageSize = max_msg_size_no_frag == 0 ? s_maximumMessageSize : max_msg_size_no_frag; + uint32_t maximumMessageSize = s_maximumMessageSize; uint32_t cfg_max_msg_size = configuration()->maxMessageSize; uint32_t cfg_send_size = configuration()->sendBufferSize; uint32_t cfg_recv_size = configuration()->receiveBufferSize; diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 41d1f75e820..613851509f5 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -122,7 +122,7 @@ bool UDPTransportInterface::DoInputLocatorsMatch( bool UDPTransportInterface::init( const fastrtps::rtps::PropertyPolicy*) { - uint32_t maximumMessageSize = max_msg_size_no_frag == 0 ? s_maximumMessageSize : max_msg_size_no_frag; + uint32_t maximumMessageSize = s_maximumMessageSize; uint32_t cfg_max_msg_size = configuration()->maxMessageSize; uint32_t cfg_send_size = configuration()->sendBufferSize; uint32_t cfg_recv_size = configuration()->receiveBufferSize;