diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 04d69bd95c1..0538e4fb6bb 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -205,7 +205,21 @@ void TCPTransportInterface::clean() } } +<<<<<<< HEAD if (io_service_thread_) +======= + if (initial_peer_local_locator_socket_) + { + if (initial_peer_local_locator_socket_->is_open()) + { + initial_peer_local_locator_socket_->close(); + } + + initial_peer_local_locator_socket_.reset(); + } + + if (io_service_thread_.joinable()) +>>>>>>> b43f3a065 (TCP unique client announced local port (#4216)) { io_service_.stop(); io_service_thread_->join(); @@ -377,37 +391,46 @@ bool TCPTransportInterface::init( EPROSIMA_LOG_WARNING(TLS, "Error configuring TLS, using TCP transport without security"); } - if (configuration()->sendBufferSize == 0 || configuration()->receiveBufferSize == 0) + /* + Open and bind a socket to obtain a unique port. This port is assigned to PDP passed locators. + Although real client socket local port will differ, this ensures uniqueness for server's channel + resources mapping (uses client locators as keys). + Open and bind a socket to obtain a unique port. This unique port is assigned to to PDP passed locators. + This process ensures uniqueness in the server's channel resources mapping, which uses client locators as keys. + Although differing from the real client socket local port, provides a reliable mapping mechanism. + */ + initial_peer_local_locator_socket_ = std::unique_ptr(new asio::ip::tcp::socket(io_service_)); + initial_peer_local_locator_socket_->open(generate_protocol()); + + // Binding to port 0 delegates the port selection to the system. + initial_peer_local_locator_socket_->bind(asio::ip::tcp::endpoint(generate_protocol(), 0)); + + ip::tcp::endpoint local_endpoint = initial_peer_local_locator_socket_->local_endpoint(); + initial_peer_local_locator_port_ = local_endpoint.port(); + + // Check system buffer sizes. + if (configuration()->sendBufferSize == 0) { - // Check system buffer sizes. - ip::tcp::socket socket(io_service_); - socket.open(generate_protocol()); + socket_base::send_buffer_size option; + initial_peer_local_locator_socket_->get_option(option); + set_send_buffer_size(option.value()); - if (configuration()->sendBufferSize == 0) + if (configuration()->sendBufferSize < s_minimumSocketBuffer) { - socket_base::send_buffer_size option; - socket.get_option(option); - set_send_buffer_size(option.value()); - - if (configuration()->sendBufferSize < s_minimumSocketBuffer) - { - set_send_buffer_size(s_minimumSocketBuffer); - } + set_send_buffer_size(s_minimumSocketBuffer); } + } - if (configuration()->receiveBufferSize == 0) - { - socket_base::receive_buffer_size option; - socket.get_option(option); - set_receive_buffer_size(option.value()); + if (configuration()->receiveBufferSize == 0) + { + socket_base::receive_buffer_size option; + initial_peer_local_locator_socket_->get_option(option); + set_receive_buffer_size(option.value()); - if (configuration()->receiveBufferSize < s_minimumSocketBuffer) - { - set_receive_buffer_size(s_minimumSocketBuffer); - } + if (configuration()->receiveBufferSize < s_minimumSocketBuffer) + { + set_receive_buffer_size(s_minimumSocketBuffer); } - - socket.close(); } if (configuration()->maxMessageSize > s_maximumMessageSize) @@ -645,6 +668,11 @@ bool TCPTransportInterface::OpenOutputChannel( if (existing_channel != channel_resources_.end() && existing_channel->second != tcp_sender_resource->channel()) { +<<<<<<< HEAD +======= + // Disconnect the old channel + tcp_sender_resource->channel()->disconnect(); +>>>>>>> b43f3a065 (TCP unique client announced local port (#4216)) // Update sender resource with new channel tcp_sender_resource->channel() = existing_channel->second; } @@ -1465,18 +1493,7 @@ bool TCPTransportInterface::fillMetatrafficUnicastLocator( { if (IPLocator::getPhysicalPort(locator.port) == 0) { - const TCPTransportDescriptor* config = configuration(); - if (config != nullptr) - { - if (!config->listening_ports.empty()) - { - IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin())); - } - else - { - IPLocator::setPhysicalPort(locator, static_cast(SystemInfo::instance().process_id())); - } - } + fill_local_physical_port(locator); } if (IPLocator::getLogicalPort(locator) == 0) @@ -1534,18 +1551,7 @@ bool TCPTransportInterface::fillUnicastLocator( { if (IPLocator::getPhysicalPort(locator.port) == 0) { - const TCPTransportDescriptor* config = configuration(); - if (config != nullptr) - { - if (!config->listening_ports.empty()) - { - IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin())); - } - else - { - IPLocator::setPhysicalPort(locator, static_cast(SystemInfo::instance().process_id())); - } - } + fill_local_physical_port(locator); } if (IPLocator::getLogicalPort(locator) == 0) @@ -1726,6 +1732,29 @@ void TCPTransportInterface::update_network_interfaces() // TODO(jlbueno) } +<<<<<<< HEAD +======= +bool TCPTransportInterface::is_localhost_allowed() const +{ + Locator local_locator; + fill_local_ip(local_locator); + return is_locator_allowed(local_locator); +} + +void TCPTransportInterface::fill_local_physical_port( + Locator& locator) const +{ + if (!configuration()->listening_ports.empty()) + { + IPLocator::setPhysicalPort(locator, *(configuration()->listening_ports.begin())); + } + else + { + IPLocator::setPhysicalPort(locator, initial_peer_local_locator_port_); + } +} + +>>>>>>> b43f3a065 (TCP unique client announced local port (#4216)) } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index ca2922db315..5b322134f8a 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -76,6 +76,9 @@ class TCPTransportInterface : public TransportInterface std::vector current_interfaces_; asio::io_service io_service_; asio::io_service io_service_timers_; + std::unique_ptr initial_peer_local_locator_socket_; + uint16_t initial_peer_local_locator_port_; + #if TLS_FOUND asio::ssl::context ssl_context_; #endif // if TLS_FOUND @@ -428,6 +431,18 @@ class TCPTransportInterface : public TransportInterface void keep_alive(); void update_network_interfaces() override; +<<<<<<< HEAD +======= + + bool is_localhost_allowed() const override; + + /** + * Method to fill local locator physical port. + * @param locator locator to be filled. + */ + void fill_local_physical_port( + Locator& locator) const; +>>>>>>> b43f3a065 (TCP unique client announced local port (#4216)) }; } // namespace rtps diff --git a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp index f5b0364d4a1..cb3694a1b30 100644 --- a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp +++ b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp @@ -299,18 +299,11 @@ TCPTransactionId RTCPMessageManager::sendConnectionRequest( Locator locator; mTransport->endpoint_to_locator(channel->local_endpoint(), locator); - auto config = mTransport->configuration(); - if (!config->listening_ports.empty()) - { - IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin())); - } - else - { - IPLocator::setPhysicalPort(locator, static_cast(SystemInfo::instance().process_id())); - } + mTransport->fill_local_physical_port(locator); if (locator.kind == LOCATOR_KIND_TCPv4) { + auto config = mTransport->configuration(); const TCPv4TransportDescriptor* pTCPv4Desc = static_cast(config); IPLocator::setWan(locator, pTCPv4Desc->wan_addr[0], pTCPv4Desc->wan_addr[1], pTCPv4Desc->wan_addr[2], pTCPv4Desc->wan_addr[3]); diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 504283dfa10..f42aca8450a 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1658,6 +1658,43 @@ TEST_F(TCPv4Tests, autofill_port) EXPECT_TRUE(transportUnderTest_multiple_autofill.configuration()->listening_ports.size() == 3); } +// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators. +// Clients typically communicated its PID as its locator port. When having several clients in the same +// process this lead to overwriting server's channel resources map elements. +TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) +{ + TCPv4TransportDescriptor recvDescriptor; + recvDescriptor.add_listener_port(g_default_port); + MockTCPv4Transport receiveTransportUnderTest(recvDescriptor); + receiveTransportUnderTest.init(); + + TCPv4TransportDescriptor sendDescriptor_1; + TCPv4Transport sendTransportUnderTest_1(sendDescriptor_1); + sendTransportUnderTest_1.init(); + + TCPv4TransportDescriptor sendDescriptor_2; + TCPv4Transport sendTransportUnderTest_2(sendDescriptor_2); + sendTransportUnderTest_2.init(); + + Locator_t outputLocator; + outputLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); + outputLocator.port = g_default_port; + IPLocator::setLogicalPort(outputLocator, 7410); + + SendResourceList send_resource_list_1; + ASSERT_TRUE(sendTransportUnderTest_1.OpenOutputChannel(send_resource_list_1, outputLocator)); + ASSERT_FALSE(send_resource_list_1.empty()); + + SendResourceList send_resource_list_2; + ASSERT_TRUE(sendTransportUnderTest_2.OpenOutputChannel(send_resource_list_2, outputLocator)); + ASSERT_FALSE(send_resource_list_2.empty()); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2); +} + void TCPv4Tests::HELPER_SetDescriptorDefaults() { descriptor.add_listener_port(g_default_port); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index e410b59aa00..b7a10f32606 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -25,6 +25,7 @@ #include #include +#include "mock/MockTCPv6Transport.h" #include #include @@ -205,6 +206,43 @@ TEST_F(TCPv6Tests, autofill_port) transportUnderTest_multiple_autofill.configuration()->listening_ports[2]); EXPECT_TRUE(transportUnderTest_multiple_autofill.configuration()->listening_ports.size() == 3); } + +// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators. +// Clients typically communicated its PID as its locator port. When having several clients in the same +// process this lead to overwriting server's channel resources map elements. +TEST_F(TCPv6Tests, client_announced_local_port_uniqueness) +{ + TCPv6TransportDescriptor recvDescriptor; + recvDescriptor.add_listener_port(g_default_port); + MockTCPv6Transport receiveTransportUnderTest(recvDescriptor); + receiveTransportUnderTest.init(); + + TCPv6TransportDescriptor sendDescriptor_1; + TCPv6Transport sendTransportUnderTest_1(sendDescriptor_1); + sendTransportUnderTest_1.init(); + + TCPv6TransportDescriptor sendDescriptor_2; + TCPv6Transport sendTransportUnderTest_2(sendDescriptor_2); + sendTransportUnderTest_2.init(); + + Locator_t outputLocator; + outputLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(outputLocator, "::1"); + outputLocator.port = g_default_port; + IPLocator::setLogicalPort(outputLocator, 7610); + + SendResourceList send_resource_list_1; + ASSERT_TRUE(sendTransportUnderTest_1.OpenOutputChannel(send_resource_list_1, outputLocator)); + ASSERT_FALSE(send_resource_list_1.empty()); + + SendResourceList send_resource_list_2; + ASSERT_TRUE(sendTransportUnderTest_2.OpenOutputChannel(send_resource_list_2, outputLocator)); + ASSERT_FALSE(send_resource_list_2.empty()); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2); +} /* TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports) { diff --git a/test/unittest/transport/mock/MockTCPv4Transport.h b/test/unittest/transport/mock/MockTCPv4Transport.h index aa14065a36d..a561e473b4c 100644 --- a/test/unittest/transport/mock/MockTCPv4Transport.h +++ b/test/unittest/transport/mock/MockTCPv4Transport.h @@ -16,7 +16,6 @@ #define MOCK_TRANSPORT_TCP4_STUFF_H #include -#include #include namespace eprosima { @@ -25,10 +24,6 @@ namespace rtps { using TCPv4Transport = eprosima::fastdds::rtps::TCPv4Transport; using TCPChannelResource = eprosima::fastdds::rtps::TCPChannelResource; -using TCPChannelResourceBasic = eprosima::fastdds::rtps::TCPChannelResourceBasic; -#if TLS_FOUND -using TCPChannelResourceSecure = eprosima::fastdds::rtps::TCPChannelResourceSecure; -#endif // if TLS_FOUND class MockTCPv4Transport : public TCPv4Transport { @@ -36,42 +31,15 @@ class MockTCPv4Transport : public TCPv4Transport MockTCPv4Transport( const TCPv4TransportDescriptor& descriptor) + : TCPv4Transport(descriptor) { - configuration_ = descriptor; } - virtual bool OpenOutputChannel( - SendResourceList&, - const Locator_t& locator) override + const std::map>& get_channel_resources() const { - const Locator_t& physicalLocator = IPLocator::toPhysicalLocator(locator); - std::shared_ptr channel( -#if TLS_FOUND - (configuration_.apply_security) ? - static_cast( - new TCPChannelResourceSecure(this, io_service_, ssl_context_, physicalLocator, 0)) : -#endif // if TLS_FOUND - static_cast( - new TCPChannelResourceBasic(this, io_service_, physicalLocator, 0)) - ); - - channel_resources_[physicalLocator] = channel; - return true; + return channel_resources_; } - /* - virtual bool CloseOutputChannel(const Locator_t& locator) override - { - const Locator_t& physicalLocator = IPLocator::toPhysicalLocator(locator); - auto it = channel_resources_.find(physicalLocator); - if (it != channel_resources_.end()) - { - delete it->second; - channel_resources_.erase(it); - } - return true; - } - */ }; } // namespace rtps diff --git a/test/unittest/transport/mock/MockTCPv6Transport.h b/test/unittest/transport/mock/MockTCPv6Transport.h new file mode 100644 index 00000000000..d84347cbce7 --- /dev/null +++ b/test/unittest/transport/mock/MockTCPv6Transport.h @@ -0,0 +1,49 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef MOCK_TRANSPORT_TCP6_STUFF_H +#define MOCK_TRANSPORT_TCP6_STUFF_H + +#include +#include + +namespace eprosima { +namespace fastrtps { +namespace rtps { + +using TCPv6Transport = eprosima::fastdds::rtps::TCPv6Transport; +using TCPChannelResource = eprosima::fastdds::rtps::TCPChannelResource; + +class MockTCPv6Transport : public TCPv6Transport +{ +public: + + MockTCPv6Transport( + const TCPv6TransportDescriptor& descriptor) + : TCPv6Transport(descriptor) + { + } + + const std::map>& get_channel_resources() const + { + return channel_resources_; + } + +}; + +} // namespace rtps +} // namespace fastrtps +} // namespace eprosima + +#endif //MOCK_TRANSPORT_TCP6_STUFF_H