diff --git a/include/fastdds/rtps/transport/TransportInterface.h b/include/fastdds/rtps/transport/TransportInterface.h index 60dec2d299f..26d713b9c50 100644 --- a/include/fastdds/rtps/transport/TransportInterface.h +++ b/include/fastdds/rtps/transport/TransportInterface.h @@ -34,7 +34,8 @@ namespace rtps { constexpr uint32_t s_maximumMessageSize = 65500; //! Default maximum initial peers range constexpr uint32_t s_maximumInitialPeersRange = 4; -//! Default minimum socket buffer +// Default minimum socket buffer +FASTDDS_DEPRECATED_UNTIL(3, s_minimumSocketBuffer, "Minimum socket buffer is now taken from the maximum msg size") constexpr uint32_t s_minimumSocketBuffer = 65536; //! Default IPv4 address static const std::string s_IPv4AddressAny = "0.0.0.0"; diff --git a/include/fastrtps/transport/TransportInterface.h b/include/fastrtps/transport/TransportInterface.h index 9f0a5f88760..22c1386803c 100644 --- a/include/fastrtps/transport/TransportInterface.h +++ b/include/fastrtps/transport/TransportInterface.h @@ -29,7 +29,8 @@ using TransportInterface = fastdds::rtps::TransportInterface; static const uint32_t s_maximumMessageSize = fastdds::rtps::s_maximumMessageSize; static const uint32_t s_maximumInitialPeersRange = fastdds::rtps::s_maximumInitialPeersRange; -static const uint32_t s_minimumSocketBuffer = fastdds::rtps::s_minimumSocketBuffer; +FASTDDS_DEPRECATED_UNTIL(3, s_minimumSocketBuffer, "Minimum socket buffer is now taken from the maximum msg size") +static const uint32_t s_minimumSocketBuffer = 65536; static const std::string s_IPv4AddressAny = fastdds::rtps::s_IPv4AddressAny; static const std::string s_IPv6AddressAny = fastdds::rtps::s_IPv6AddressAny; diff --git a/src/cpp/rtps/transport/TCPChannelResource.cpp b/src/cpp/rtps/transport/TCPChannelResource.cpp index efc7a6903b9..2fbaa3118c0 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.cpp +++ b/src/cpp/rtps/transport/TCPChannelResource.cpp @@ -18,6 +18,7 @@ #include #include +#include #include namespace eprosima { @@ -370,6 +371,52 @@ bool TCPChannelResource::check_socket_send_buffer( return true; } +void TCPChannelResource::set_socket_options( + asio::basic_socket& socket, + const TCPTransportDescriptor* options) +{ + uint32_t minimum_value = options->maxMessageSize; + + // Set the send buffer size + { + uint32_t desired_value = options->sendBufferSize; + uint32_t configured_value = 0; + if (!asio_helpers::try_setting_buffer_size( + socket, desired_value, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TCP_TRANSPORT, + "Couldn't set send buffer size to minimum value: " << minimum_value); + } + else if (desired_value != configured_value) + { + EPROSIMA_LOG_WARNING(TCP_TRANSPORT, + "Couldn't set send buffer size to desired value. " + << "Using " << configured_value << " instead of " << desired_value); + } + } + + // Set the receive buffer size + { + uint32_t desired_value = options->receiveBufferSize; + uint32_t configured_value = 0; + if (!asio_helpers::try_setting_buffer_size( + socket, desired_value, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TCP_TRANSPORT, + "Couldn't set receive buffer size to minimum value: " << minimum_value); + } + else if (desired_value != configured_value) + { + EPROSIMA_LOG_WARNING(TCP_TRANSPORT, + "Couldn't set receive buffer size to desired value. " + << "Using " << configured_value << " instead of " << desired_value); + } + } + + // Set the TCP_NODELAY option + socket.set_option(asio::ip::tcp::no_delay(options->enable_tcp_nodelay)); +} + } // namespace rtps -} // namespace fastrtps +} // namespace fastdds } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPChannelResource.h b/src/cpp/rtps/transport/TCPChannelResource.h index c0a9e97f7bb..589ae6221e7 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.h +++ b/src/cpp/rtps/transport/TCPChannelResource.h @@ -234,6 +234,16 @@ class TCPChannelResource : public ChannelResource const size_t& msg_size, const asio::ip::tcp::socket::native_handle_type& socket_native_handle); + /** + * @brief Set descriptor options on a socket. + * + * @param socket Socket on which to set the options. + * @param options Descriptor with the options to set. + */ + static void set_socket_options( + asio::basic_socket& socket, + const TCPTransportDescriptor* options); + TCPConnectionType tcp_connection_type_; friend class TCPTransportInterface; diff --git a/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp b/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp index 51a28913c19..7e3f53789e5 100644 --- a/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp +++ b/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp @@ -200,9 +200,7 @@ asio::ip::tcp::endpoint TCPChannelResourceBasic::local_endpoint( void TCPChannelResourceBasic::set_options( const TCPTransportDescriptor* options) { - socket_->set_option(socket_base::receive_buffer_size(options->receiveBufferSize)); - socket_->set_option(socket_base::send_buffer_size(options->sendBufferSize)); - socket_->set_option(ip::tcp::no_delay(options->enable_tcp_nodelay)); + TCPChannelResource::set_socket_options(*socket_, options); } void TCPChannelResourceBasic::cancel() diff --git a/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp b/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp index deddd7ffd31..7e424315374 100644 --- a/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp +++ b/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp @@ -280,9 +280,7 @@ asio::ip::tcp::endpoint TCPChannelResourceSecure::local_endpoint( void TCPChannelResourceSecure::set_options( const TCPTransportDescriptor* options) { - secure_socket_->lowest_layer().set_option(socket_base::receive_buffer_size(options->receiveBufferSize)); - secure_socket_->lowest_layer().set_option(socket_base::send_buffer_size(options->sendBufferSize)); - secure_socket_->lowest_layer().set_option(ip::tcp::no_delay(options->enable_tcp_nodelay)); + TCPChannelResource::set_socket_options(secure_socket_->lowest_layer(), options); } void TCPChannelResourceSecure::set_tls_verify_mode( diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index b550d9deceb..de753416c22 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -18,10 +18,11 @@ #include #include #include +#include #include -#include #include #include +#include #include #include #include @@ -53,6 +54,7 @@ #include #include +#include #include #include #include @@ -442,6 +444,42 @@ bool TCPTransportInterface::DoInputLocatorsMatch( bool TCPTransportInterface::init( const fastrtps::rtps::PropertyPolicy* properties) { + uint32_t maximumMessageSize = s_maximumMessageSize; + uint32_t cfg_max_msg_size = configuration()->maxMessageSize; + uint32_t cfg_send_size = configuration()->sendBufferSize; + uint32_t cfg_recv_size = configuration()->receiveBufferSize; + uint32_t max_int_value = static_cast(std::numeric_limits::max()); + + if (cfg_max_msg_size > maximumMessageSize) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than " << maximumMessageSize); + return false; + } + + if (cfg_send_size > max_int_value) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "sendBufferSize cannot be greater than " << max_int_value); + return false; + } + + if (cfg_recv_size > max_int_value) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "receiveBufferSize cannot be greater than " << max_int_value); + return false; + } + + if ((cfg_send_size > 0) && (cfg_max_msg_size > cfg_send_size)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than sendBufferSize"); + return false; + } + + if ((cfg_recv_size > 0) && (cfg_max_msg_size > cfg_recv_size)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "maxMessageSize cannot be greater than receiveBufferSize"); + return false; + } + if (!apply_tls_config()) { // TODO decide wether the Transport initialization should keep working after this error @@ -474,48 +512,30 @@ bool TCPTransportInterface::init( } // Check system buffer sizes. - if (configuration()->sendBufferSize == 0) - { - socket_base::send_buffer_size option; - initial_peer_local_locator_socket_->get_option(option); - set_send_buffer_size(option.value()); - - if (configuration()->sendBufferSize < s_minimumSocketBuffer) - { - set_send_buffer_size(s_minimumSocketBuffer); - } - } - - if (configuration()->receiveBufferSize == 0) + uint32_t send_size = 0; + uint32_t recv_size = 0; + if (!asio_helpers::configure_buffer_sizes( + *initial_peer_local_locator_socket_, *configuration(), send_size, recv_size)) { - socket_base::receive_buffer_size option; - initial_peer_local_locator_socket_->get_option(option); - set_receive_buffer_size(option.value()); - - if (configuration()->receiveBufferSize < s_minimumSocketBuffer) - { - set_receive_buffer_size(s_minimumSocketBuffer); - } - } - - if (configuration()->maxMessageSize > s_maximumMessageSize) - { - EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than 65000"); + EPROSIMA_LOG_ERROR(TRANSPORT_TCP, "Couldn't set buffer sizes to minimum value: " << cfg_max_msg_size); return false; } - if (configuration()->maxMessageSize > configuration()->sendBufferSize) + if (cfg_send_size > 0 && send_size != cfg_send_size) { - EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than send_buffer_size"); - return false; + EPROSIMA_LOG_WARNING(TRANSPORT_TCP, "UDPTransport sendBufferSize could not be set to the desired value. " + << "Using " << send_size << " instead of " << cfg_send_size); } - if (configuration()->maxMessageSize > configuration()->receiveBufferSize) + if (cfg_recv_size > 0 && recv_size != cfg_recv_size) { - EPROSIMA_LOG_ERROR(RTCP_MSG_OUT, "maxMessageSize cannot be greater than receive_buffer_size"); - return false; + EPROSIMA_LOG_WARNING(TRANSPORT_TCP, "UDPTransport receiveBufferSize could not be set to the desired value. " + << "Using " << recv_size << " instead of " << cfg_recv_size); } + set_send_buffer_size(send_size); + set_receive_buffer_size(recv_size); + if (!rtcp_message_manager_) { rtcp_message_manager_ = std::make_shared(this); diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 5c5feaa400c..613851509f5 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -14,15 +14,18 @@ #include -#include -#include #include #include +#include +#include +#include +#include #include #include -#include #include + +#include #include #include @@ -119,61 +122,78 @@ bool UDPTransportInterface::DoInputLocatorsMatch( bool UDPTransportInterface::init( const fastrtps::rtps::PropertyPolicy*) { - if (configuration()->sendBufferSize == 0 || configuration()->receiveBufferSize == 0) - { - // Check system buffer sizes. - ip::udp::socket socket(io_service_); - socket.open(generate_protocol()); - - if (configuration()->sendBufferSize == 0) - { - socket_base::send_buffer_size option; - socket.get_option(option); - set_send_buffer_size(static_cast(option.value())); + uint32_t maximumMessageSize = s_maximumMessageSize; + uint32_t cfg_max_msg_size = configuration()->maxMessageSize; + uint32_t cfg_send_size = configuration()->sendBufferSize; + uint32_t cfg_recv_size = configuration()->receiveBufferSize; + uint32_t max_int_value = static_cast(std::numeric_limits::max()); - if (configuration()->sendBufferSize < s_minimumSocketBuffer) - { - set_send_buffer_size(s_minimumSocketBuffer); - mSendBufferSize = s_minimumSocketBuffer; - } - } - - if (configuration()->receiveBufferSize == 0) - { - socket_base::receive_buffer_size option; - socket.get_option(option); - set_receive_buffer_size(static_cast(option.value())); + if (cfg_max_msg_size > maximumMessageSize) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than " << maximumMessageSize); + return false; + } - if (configuration()->receiveBufferSize < s_minimumSocketBuffer) - { - set_receive_buffer_size(s_minimumSocketBuffer); - mReceiveBufferSize = s_minimumSocketBuffer; - } - } + if (cfg_send_size > max_int_value) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "sendBufferSize cannot be greater than " << max_int_value); + return false; } - if (configuration()->maxMessageSize > s_maximumMessageSize) + if (cfg_recv_size > max_int_value) { - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than 65000"); + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "receiveBufferSize cannot be greater than " << max_int_value); return false; } - if (configuration()->maxMessageSize > configuration()->sendBufferSize) + if ((cfg_send_size > 0) && (cfg_max_msg_size > cfg_send_size)) { - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than send_buffer_size"); + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than sendBufferSize"); return false; } - if (configuration()->maxMessageSize > configuration()->receiveBufferSize) + if ((cfg_recv_size > 0) && (cfg_max_msg_size > cfg_recv_size)) { - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "maxMessageSize cannot be greater than receive_buffer_size"); + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "maxMessageSize cannot be greater than receiveBufferSize"); return false; } // TODO(Ricardo) Create an event that update this list. get_ips(currentInterfaces); - return true; + asio::error_code ec; + ip::udp::socket socket(io_service_); + socket.open(generate_protocol(), ec); + if (!!ec) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "Error creating socket: " << ec.message()); + return false; + } + + bool ret = asio_helpers::configure_buffer_sizes(socket, *configuration(), mSendBufferSize, mReceiveBufferSize); + if (ret) + { + if (cfg_send_size > 0 && mSendBufferSize != cfg_send_size) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport sendBufferSize could not be set to the desired value. " + << "Using " << mSendBufferSize << " instead of " << cfg_send_size); + } + + if (cfg_recv_size > 0 && mReceiveBufferSize != cfg_recv_size) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport receiveBufferSize could not be set to the desired value. " + << "Using " << mReceiveBufferSize << " instead of " << cfg_recv_size); + } + + set_send_buffer_size(mSendBufferSize); + set_receive_buffer_size(mReceiveBufferSize); + } + else + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "Couldn't set buffer sizes to minimum value: " << cfg_max_msg_size); + } + + return ret; } bool UDPTransportInterface::IsInputChannelOpen( @@ -211,9 +231,8 @@ bool UDPTransportInterface::OpenAndBindInputSockets( catch (asio::system_error const& e) { (void)e; - EPROSIMA_LOG_INFO(RTPS_MSG_OUT, "UDPTransport Error binding at port: (" << IPLocator::getPhysicalPort( - locator) << ")" - << " with msg: " << e.what()); + EPROSIMA_LOG_INFO(TRANSPORT_UDP, "UDPTransport Error binding at port: (" + << IPLocator::getPhysicalPort(locator) << ")" << " with msg: " << e.what()); mInputSockets.erase(IPLocator::getPhysicalPort(locator)); return false; } @@ -243,7 +262,18 @@ eProsimaUDPSocket UDPTransportInterface::OpenAndBindUnicastOutputSocket( getSocketPtr(socket)->open(generate_protocol()); if (mSendBufferSize != 0) { - getSocketPtr(socket)->set_option(socket_base::send_buffer_size(static_cast(mSendBufferSize))); + uint32_t configured_value = 0; + if (!asio_helpers::try_setting_buffer_size( + socket, mSendBufferSize, configuration()->maxMessageSize, configured_value)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, + "Couldn't set send buffer size to minimum value: " << configuration()->maxMessageSize); + } + else if (configured_value != mSendBufferSize) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport sendBufferSize could not be set to the desired value. " + << "Using " << configured_value << " instead of " << mSendBufferSize); + } } getSocketPtr(socket)->set_option(ip::multicast::hops(configuration()->TTL)); getSocketPtr(socket)->bind(endpoint); @@ -301,7 +331,7 @@ bool UDPTransportInterface::OpenOutputChannel( catch (asio::system_error const& e) { (void)e; - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "UDPTransport Error binding interface " + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport Error binding interface " << localhost_name() << " (skipping) with msg: " << e.what()); } } @@ -325,7 +355,7 @@ bool UDPTransportInterface::OpenOutputChannel( catch (asio::system_error const& e) { (void)e; - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "UDPTransport Error binding interface " + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDPTransport Error binding interface " << (*locIt).name << " (skipping) with msg: " << e.what()); } } @@ -357,7 +387,7 @@ bool UDPTransportInterface::OpenOutputChannel( { (void)e; /* TODO Que hacer? - EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "UDPTransport Error binding at port: (" << IPLocator::getPhysicalPort(locator) << ")" + EPROSIMA_LOG_ERROR(TRANSPORT_UDP, "UDPTransport Error binding at port: (" << IPLocator::getPhysicalPort(locator) << ")" << " with msg: " << e.what()); for (auto& socket : mOutputSockets) { @@ -515,23 +545,24 @@ bool UDPTransportInterface::send( if ((ec.value() == asio::error::would_block) || (ec.value() == asio::error::try_again)) { - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "UDP send would have blocked. Packet is dropped."); + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, "UDP send would have blocked. Packet is dropped."); return true; } - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, ec.message()); + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, ec.message()); return false; } } catch (const std::exception& error) { - EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, error.what()); + EPROSIMA_LOG_WARNING(TRANSPORT_UDP, error.what()); return false; } (void)bytesSent; - EPROSIMA_LOG_INFO(RTPS_MSG_OUT, "UDPTransport: " << bytesSent << " bytes TO endpoint: " << destinationEndpoint - << " FROM " << getSocketPtr(socket)->local_endpoint()); + EPROSIMA_LOG_INFO(TRANSPORT_UDP, + "UDPTransport: " << bytesSent << " bytes TO endpoint: " << destinationEndpoint << + " FROM " << getSocketPtr(socket)->local_endpoint()); success = true; } diff --git a/src/cpp/rtps/transport/UDPTransportInterface.h b/src/cpp/rtps/transport/UDPTransportInterface.h index bbeeafed885..2e4585e9c9d 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.h +++ b/src/cpp/rtps/transport/UDPTransportInterface.h @@ -292,6 +292,7 @@ class UDPTransportInterface : public TransportInterface bool return_loopback = false); std::atomic_bool rescan_interfaces_ = {true}; + }; } // namespace rtps diff --git a/src/cpp/rtps/transport/UDPv4Transport.cpp b/src/cpp/rtps/transport/UDPv4Transport.cpp index 2d1b1e0e4e8..1f4f0416d90 100644 --- a/src/cpp/rtps/transport/UDPv4Transport.cpp +++ b/src/cpp/rtps/transport/UDPv4Transport.cpp @@ -26,6 +26,7 @@ #include #include +#include using namespace std; using namespace asio; @@ -297,7 +298,20 @@ eProsimaUDPSocket UDPv4Transport::OpenAndBindInputSocket( getSocketPtr(socket)->open(generate_protocol()); if (mReceiveBufferSize != 0) { - getSocketPtr(socket)->set_option(socket_base::receive_buffer_size(mReceiveBufferSize)); + uint32_t configured_value = 0; + uint32_t minimum_value = configuration()->maxMessageSize; + if (!asio_helpers::try_setting_buffer_size( + socket, mReceiveBufferSize, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDPV4, + "Couldn't set receive buffer size to minimum value: " << minimum_value); + } + else if (mReceiveBufferSize != configured_value) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDPV4, + "Receive buffer size could not be set to the desired value. " + << "Using " << configured_value << " instead of " << mReceiveBufferSize); + } } if (is_multicast) diff --git a/src/cpp/rtps/transport/UDPv6Transport.cpp b/src/cpp/rtps/transport/UDPv6Transport.cpp index 385ee12e524..c825a180be5 100644 --- a/src/cpp/rtps/transport/UDPv6Transport.cpp +++ b/src/cpp/rtps/transport/UDPv6Transport.cpp @@ -24,6 +24,8 @@ #include #include +#include + using namespace std; using namespace asio; @@ -301,7 +303,20 @@ eProsimaUDPSocket UDPv6Transport::OpenAndBindInputSocket( getSocketPtr(socket)->open(generate_protocol()); if (mReceiveBufferSize != 0) { - getSocketPtr(socket)->set_option(socket_base::receive_buffer_size(mReceiveBufferSize)); + uint32_t configured_value = 0; + uint32_t minimum_value = configuration()->maxMessageSize; + if (!asio_helpers::asio_helpers::try_setting_buffer_size( + socket, mReceiveBufferSize, minimum_value, configured_value)) + { + EPROSIMA_LOG_ERROR(TRANSPORT_UDPV6, + "Couldn't set receive buffer size to minimum value: " << minimum_value); + } + else if (mReceiveBufferSize != configured_value) + { + EPROSIMA_LOG_WARNING(TRANSPORT_UDPV6, + "Receive buffer size could not be set to the desired value. " + << "Using " << configured_value << " instead of " << mReceiveBufferSize); + } } if (is_multicast) diff --git a/src/cpp/rtps/transport/asio_helpers.hpp b/src/cpp/rtps/transport/asio_helpers.hpp new file mode 100644 index 00000000000..f9ba81b84e0 --- /dev/null +++ b/src/cpp/rtps/transport/asio_helpers.hpp @@ -0,0 +1,155 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RTPS_TRANSPORT__ASIO_HELPERS_HPP_ +#define RTPS_TRANSPORT__ASIO_HELPERS_HPP_ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/// Helper functions for asio. +// NOTE: using a struct instead of a namespace to avoid linker errors when using inline free functions. +struct asio_helpers +{ + /** + * @brief Try to set a buffer size on a socket, trying to set the initial value and then halving it until it is + * possible to set it or the minimum value is reached. + * + * @tparam BufferOptionType Type of the buffer option to set. + * @tparam SocketType Type of socket on which to set the buffer size option. + * + * @param socket Socket on which to set the buffer size option. + * @param initial_buffer_value Initial value to try to set. + * @param minimum_buffer_value Minimum value to set. + * @param final_buffer_value Output parameter where the final value set will be stored. + * + * @return true if the buffer size was successfully set, false otherwise. + */ + template + static inline bool try_setting_buffer_size( + SocketType& socket, + const uint32_t initial_buffer_value, + const uint32_t minimum_buffer_value, + uint32_t& final_buffer_value) + { + asio::error_code ec; + + final_buffer_value = initial_buffer_value; + while (final_buffer_value >= minimum_buffer_value) + { + socket.set_option(BufferOptionType(static_cast(final_buffer_value)), ec); + if (!ec) + { + return true; + } + + final_buffer_value /= 2; + } + + final_buffer_value = minimum_buffer_value; + socket.set_option(BufferOptionType(final_buffer_value), ec); + return !ec; + } + + /** + * @brief Configure a buffer size on a socket, using the system default value if the initial value is 0. + * Ensures that the final buffer size is at least the minimum value. + * + * @tparam BufferOptionType Type of the buffer option to set. + * @tparam SocketType Type of socket on which to set the buffer size option. + * + * @param socket Socket on which to set the buffer size option. + * @param initial_buffer_value Initial value to try to set. + * @param minimum_buffer_value Minimum value to set. + * @param final_buffer_value Output parameter where the final value set will be stored. + * + * @return true if the buffer size was successfully set, false otherwise. + */ + template + static inline bool configure_buffer_size( + SocketType& socket, + const uint32_t initial_buffer_value, + const uint32_t minimum_buffer_value, + uint32_t& final_buffer_value) + { + final_buffer_value = initial_buffer_value; + + // If the initial value is 0, try using the system default value + if (initial_buffer_value == 0) + { + asio::error_code ec; + BufferOptionType option; + socket.get_option(option, ec); + if (!ec) + { + final_buffer_value = option.value(); + } + } + + // Ensure the minimum value is used + if (final_buffer_value < minimum_buffer_value) + { + final_buffer_value = minimum_buffer_value; + } + + // Try to set the highest possible value the system allows + return try_setting_buffer_size(socket, final_buffer_value, minimum_buffer_value, + final_buffer_value); + } + + /** + * @brief Configure the send and receive buffer sizes on a socket, using the system default value if the initial + * values are 0. Ensures that the final buffer sizes are at least the minimum value. + * + * @tparam SocketType Type of socket on which to set the buffer size options. + * + * @param socket Socket on which to set the buffer size options. + * @param descriptor Transport descriptor with the buffer sizes to set. + * @param final_send_buffer_size Output parameter where the final send buffer size will be stored. + * @param final_receive_buffer_size Output parameter where the final receive buffer size will be stored. + * + * @return true if the buffer sizes were successfully set, false otherwise. + */ + template + static inline bool configure_buffer_sizes( + SocketType& socket, + const SocketTransportDescriptor& descriptor, + uint32_t& final_send_buffer_size, + uint32_t& final_receive_buffer_size) + { + uint32_t minimum_socket_buffer = descriptor.maxMessageSize; + uint32_t send_buffer_size = descriptor.sendBufferSize; + uint32_t receive_buffer_size = descriptor.receiveBufferSize; + + bool send_buffer_size_set = configure_buffer_size( + socket, send_buffer_size, minimum_socket_buffer, final_send_buffer_size); + bool receive_buffer_size_set = configure_buffer_size( + socket, receive_buffer_size, minimum_socket_buffer, final_receive_buffer_size); + + return send_buffer_size_set && receive_buffer_size_set; + } + +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // RTPS_TRANSPORT__ASIO_HELPERS_HPP_ + diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 699f2ab3ef0..fb0945f4e62 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -1302,6 +1302,7 @@ class PubSubReader uint32_t sockerBufferSize) { participant_qos_.transport().listen_socket_buffer_size = sockerBufferSize; + participant_qos_.transport().send_socket_buffer_size = sockerBufferSize; return *this; } diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 5a37df54978..0e358e4fc2b 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -1485,6 +1485,7 @@ class PubSubWriter uint32_t sockerBufferSize) { participant_qos_.transport().listen_socket_buffer_size = sockerBufferSize; + participant_qos_.transport().send_socket_buffer_size = sockerBufferSize; return *this; } diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 3031ade192f..be99ec69032 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -1069,6 +1069,7 @@ class PubSubReader uint32_t sockerBufferSize) { participant_attr_.rtps.listenSocketBufferSize = sockerBufferSize; + participant_attr_.rtps.sendSocketBufferSize = sockerBufferSize; return *this; } diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp index c5ee2a1bd7a..1fdd2beb514 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp @@ -1249,6 +1249,7 @@ class PubSubWriter uint32_t sockerBufferSize) { participant_attr_.rtps.listenSocketBufferSize = sockerBufferSize; + participant_attr_.rtps.sendSocketBufferSize = sockerBufferSize; return *this; } diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 36ee6f139ec..cfb9064ec10 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -82,6 +83,74 @@ class TCPv4Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(TCPv4Tests, wrong_configuration_values) +{ + // Too big sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + TCPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(TCPv4Tests, locators_with_kind_1_supported) { // Given @@ -1334,7 +1403,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); uint16_t port = g_default_port; - uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; + uint32_t msg_size = 64ul * 1024ul; // Create a TCP Server transport using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; @@ -1910,7 +1979,7 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) TEST_F(TCPv4Tests, non_blocking_send) { uint16_t port = g_default_port; - uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; + uint32_t msg_size = 64ul * 1024ul; // Create a TCP Server transport TCPv4TransportDescriptor senderDescriptor; senderDescriptor.add_listener_port(port); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index fa9791f8bd2..4cad23e28e7 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -83,6 +84,74 @@ class TCPv6Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(TCPv6Tests, wrong_configuration_values) +{ + // Too big sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + auto wrong_descriptor = descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + auto wrong_descriptor = descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + TCPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(TCPv6Tests, conversion_to_ip6_string) { Locator_t locator; @@ -323,7 +392,7 @@ TEST_F(TCPv6Tests, client_announced_local_port_uniqueness) TEST_F(TCPv6Tests, non_blocking_send) { uint16_t port = g_default_port; - uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; + uint32_t msg_size = 64ul * 1024ul; // Create a TCP Server transport TCPv6TransportDescriptor senderDescriptor; senderDescriptor.add_listener_port(port); diff --git a/test/unittest/transport/UDPv4Tests.cpp b/test/unittest/transport/UDPv4Tests.cpp index 430e58feabf..3f24c0a034b 100644 --- a/test/unittest/transport/UDPv4Tests.cpp +++ b/test/unittest/transport/UDPv4Tests.cpp @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include +#include #include #include #include @@ -30,10 +32,6 @@ using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; using UDPv4Transport = eprosima::fastdds::rtps::UDPv4Transport; -#ifndef __APPLE__ -const uint32_t ReceiveBufferCapacity = 65536; -#endif // ifndef __APPLE__ - #if defined(_WIN32) #define GET_PID _getpid #else @@ -75,6 +73,74 @@ class UDPv4Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(UDPv4Tests, wrong_configuration) +{ + // Too big sendBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + UDPv4TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + UDPv4Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(UDPv4Tests, locators_with_kind_1_supported) { // Given diff --git a/test/unittest/transport/UDPv6Tests.cpp b/test/unittest/transport/UDPv6Tests.cpp index 805518934c5..6009c7b593c 100644 --- a/test/unittest/transport/UDPv6Tests.cpp +++ b/test/unittest/transport/UDPv6Tests.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -30,10 +31,6 @@ using namespace eprosima::fastrtps::rtps; using namespace eprosima::fastrtps; using UDPv6Transport = eprosima::fastdds::rtps::UDPv6Transport; -#ifndef __APPLE__ -const uint32_t ReceiveBufferCapacity = 65536; -#endif // ifndef __APPLE__ - #if defined(_WIN32) #define GET_PID _getpid #else @@ -83,6 +80,74 @@ class UDPv6Tests : public ::testing::Test std::unique_ptr receiverThread; }; +TEST_F(UDPv6Tests, wrong_configuration) +{ + // Too big sendBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = std::numeric_limits::max(); + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big receiveBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.receiveBufferSize = std::numeric_limits::max(); + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Too big maxMessageSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = std::numeric_limits::max(); + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than receiveBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.receiveBufferSize = 5; + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // maxMessageSize bigger than sendBufferSize + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.maxMessageSize = 10; + wrong_descriptor.sendBufferSize = 5; + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_FALSE(transportUnderTest.init()); + eprosima::fastdds::dds::Log::Flush(); + } + + // Buffer sizes automatically decrease + { + UDPv6TransportDescriptor wrong_descriptor; + wrong_descriptor.sendBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.receiveBufferSize = static_cast(std::numeric_limits::max()); + wrong_descriptor.maxMessageSize = 1470; + UDPv6Transport transportUnderTest(wrong_descriptor); + ASSERT_TRUE(transportUnderTest.init()); + auto* final_cfg = transportUnderTest.configuration(); + EXPECT_GE(final_cfg->sendBufferSize, final_cfg->maxMessageSize); + // The system could allow for the send buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->sendBufferSize, wrong_descriptor.sendBufferSize); + EXPECT_GE(final_cfg->receiveBufferSize, final_cfg->maxMessageSize); + // The system could allow for the receive buffer to be MAX_INT, so we cannot check it to be strictly lower + EXPECT_LE(final_cfg->receiveBufferSize, wrong_descriptor.receiveBufferSize); + eprosima::fastdds::dds::Log::Flush(); + } +} + TEST_F(UDPv6Tests, conversion_to_ip6_string) { Locator_t locator;