Skip to content

Commit

Permalink
TCP unique client announced local port (#4216)
Browse files Browse the repository at this point in the history
* Refs #20179: Guarantee unique announced client local port

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Regression tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Prevent from join() deadlock (ReceiverResource mtx taken)

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Fix windows release tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Apply sugestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
(cherry picked from commit b43f3a0)

# Conflicts:
#	src/cpp/rtps/transport/TCPTransportInterface.cpp
  • Loading branch information
jepemi authored and mergify[bot] committed Jan 24, 2024
1 parent 432d620 commit 45ac81e
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 92 deletions.
115 changes: 67 additions & 48 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,21 @@ void TCPTransportInterface::clean()
}
}

<<<<<<< HEAD
if (io_service_thread_)
=======
if (initial_peer_local_locator_socket_)
{
if (initial_peer_local_locator_socket_->is_open())
{
initial_peer_local_locator_socket_->close();
}

initial_peer_local_locator_socket_.reset();
}

if (io_service_thread_.joinable())
>>>>>>> b43f3a065 (TCP unique client announced local port (#4216))
{
io_service_.stop();
io_service_thread_->join();
Expand Down Expand Up @@ -377,37 +391,46 @@ bool TCPTransportInterface::init(
EPROSIMA_LOG_WARNING(TLS, "Error configuring TLS, using TCP transport without security");
}

if (configuration()->sendBufferSize == 0 || configuration()->receiveBufferSize == 0)
/*
Open and bind a socket to obtain a unique port. This port is assigned to PDP passed locators.
Although real client socket local port will differ, this ensures uniqueness for server's channel
resources mapping (uses client locators as keys).
Open and bind a socket to obtain a unique port. This unique port is assigned to to PDP passed locators.
This process ensures uniqueness in the server's channel resources mapping, which uses client locators as keys.
Although differing from the real client socket local port, provides a reliable mapping mechanism.
*/
initial_peer_local_locator_socket_ = std::unique_ptr<asio::ip::tcp::socket>(new asio::ip::tcp::socket(io_service_));
initial_peer_local_locator_socket_->open(generate_protocol());

// Binding to port 0 delegates the port selection to the system.
initial_peer_local_locator_socket_->bind(asio::ip::tcp::endpoint(generate_protocol(), 0));

ip::tcp::endpoint local_endpoint = initial_peer_local_locator_socket_->local_endpoint();
initial_peer_local_locator_port_ = local_endpoint.port();

// Check system buffer sizes.
if (configuration()->sendBufferSize == 0)
{
// Check system buffer sizes.
ip::tcp::socket socket(io_service_);
socket.open(generate_protocol());
socket_base::send_buffer_size option;
initial_peer_local_locator_socket_->get_option(option);
set_send_buffer_size(option.value());

if (configuration()->sendBufferSize == 0)
if (configuration()->sendBufferSize < s_minimumSocketBuffer)
{
socket_base::send_buffer_size option;
socket.get_option(option);
set_send_buffer_size(option.value());

if (configuration()->sendBufferSize < s_minimumSocketBuffer)
{
set_send_buffer_size(s_minimumSocketBuffer);
}
set_send_buffer_size(s_minimumSocketBuffer);
}
}

if (configuration()->receiveBufferSize == 0)
{
socket_base::receive_buffer_size option;
socket.get_option(option);
set_receive_buffer_size(option.value());
if (configuration()->receiveBufferSize == 0)
{
socket_base::receive_buffer_size option;
initial_peer_local_locator_socket_->get_option(option);
set_receive_buffer_size(option.value());

if (configuration()->receiveBufferSize < s_minimumSocketBuffer)
{
set_receive_buffer_size(s_minimumSocketBuffer);
}
if (configuration()->receiveBufferSize < s_minimumSocketBuffer)
{
set_receive_buffer_size(s_minimumSocketBuffer);
}

socket.close();
}

if (configuration()->maxMessageSize > s_maximumMessageSize)
Expand Down Expand Up @@ -656,6 +679,11 @@ bool TCPTransportInterface::OpenOutputChannel(
if (existing_channel != channel_resources_.end() &&
existing_channel->second != tcp_sender_resource->channel())
{
<<<<<<< HEAD
=======
// Disconnect the old channel
tcp_sender_resource->channel()->disconnect();
>>>>>>> b43f3a065 (TCP unique client announced local port (#4216))
// Update sender resource with new channel
tcp_sender_resource->channel() = existing_channel->second;
}
Expand Down Expand Up @@ -1477,18 +1505,7 @@ bool TCPTransportInterface::fillMetatrafficUnicastLocator(
{
if (IPLocator::getPhysicalPort(locator.port) == 0)
{
const TCPTransportDescriptor* config = configuration();
if (config != nullptr)
{
if (!config->listening_ports.empty())
{
IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin()));
}
else
{
IPLocator::setPhysicalPort(locator, static_cast<uint16_t>(SystemInfo::instance().process_id()));
}
}
fill_local_physical_port(locator);
}

if (IPLocator::getLogicalPort(locator) == 0)
Expand Down Expand Up @@ -1546,18 +1563,7 @@ bool TCPTransportInterface::fillUnicastLocator(
{
if (IPLocator::getPhysicalPort(locator.port) == 0)
{
const TCPTransportDescriptor* config = configuration();
if (config != nullptr)
{
if (!config->listening_ports.empty())
{
IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin()));
}
else
{
IPLocator::setPhysicalPort(locator, static_cast<uint16_t>(SystemInfo::instance().process_id()));
}
}
fill_local_physical_port(locator);
}

if (IPLocator::getLogicalPort(locator) == 0)
Expand Down Expand Up @@ -1745,6 +1751,19 @@ bool TCPTransportInterface::is_localhost_allowed() const
return is_locator_allowed(local_locator);
}

void TCPTransportInterface::fill_local_physical_port(
Locator& locator) const
{
if (!configuration()->listening_ports.empty())
{
IPLocator::setPhysicalPort(locator, *(configuration()->listening_ports.begin()));
}
else
{
IPLocator::setPhysicalPort(locator, initial_peer_local_locator_port_);
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
10 changes: 10 additions & 0 deletions src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class TCPTransportInterface : public TransportInterface
std::vector<fastrtps::rtps::IPFinder::info_IP> current_interfaces_;
asio::io_service io_service_;
asio::io_service io_service_timers_;
std::unique_ptr<asio::ip::tcp::socket> initial_peer_local_locator_socket_;
uint16_t initial_peer_local_locator_port_;

#if TLS_FOUND
asio::ssl::context ssl_context_;
#endif // if TLS_FOUND
Expand Down Expand Up @@ -434,6 +437,13 @@ class TCPTransportInterface : public TransportInterface
void update_network_interfaces() override;

bool is_localhost_allowed() const override;

/**
* Method to fill local locator physical port.
* @param locator locator to be filled.
*/
void fill_local_physical_port(
Locator& locator) const;
};

} // namespace rtps
Expand Down
11 changes: 2 additions & 9 deletions src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,11 @@ TCPTransactionId RTCPMessageManager::sendConnectionRequest(
Locator locator;
mTransport->endpoint_to_locator(channel->local_endpoint(), locator);

auto config = mTransport->configuration();
if (!config->listening_ports.empty())
{
IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin()));
}
else
{
IPLocator::setPhysicalPort(locator, static_cast<uint16_t>(SystemInfo::instance().process_id()));
}
mTransport->fill_local_physical_port(locator);

if (locator.kind == LOCATOR_KIND_TCPv4)
{
auto config = mTransport->configuration();
const TCPv4TransportDescriptor* pTCPv4Desc = static_cast<TCPv4TransportDescriptor*>(config);
IPLocator::setWan(locator, pTCPv4Desc->wan_addr[0], pTCPv4Desc->wan_addr[1], pTCPv4Desc->wan_addr[2],
pTCPv4Desc->wan_addr[3]);
Expand Down
37 changes: 37 additions & 0 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,43 @@ TEST_F(TCPv4Tests, autofill_port)
EXPECT_TRUE(transportUnderTest_multiple_autofill.configuration()->listening_ports.size() == 3);
}

// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators.
// Clients typically communicated its PID as its locator port. When having several clients in the same
// process this lead to overwriting server's channel resources map elements.
TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
{
TCPv4TransportDescriptor recvDescriptor;
recvDescriptor.add_listener_port(g_default_port);
MockTCPv4Transport receiveTransportUnderTest(recvDescriptor);
receiveTransportUnderTest.init();

TCPv4TransportDescriptor sendDescriptor_1;
TCPv4Transport sendTransportUnderTest_1(sendDescriptor_1);
sendTransportUnderTest_1.init();

TCPv4TransportDescriptor sendDescriptor_2;
TCPv4Transport sendTransportUnderTest_2(sendDescriptor_2);
sendTransportUnderTest_2.init();

Locator_t outputLocator;
outputLocator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(outputLocator, 127, 0, 0, 1);
outputLocator.port = g_default_port;
IPLocator::setLogicalPort(outputLocator, 7410);

SendResourceList send_resource_list_1;
ASSERT_TRUE(sendTransportUnderTest_1.OpenOutputChannel(send_resource_list_1, outputLocator));
ASSERT_FALSE(send_resource_list_1.empty());

SendResourceList send_resource_list_2;
ASSERT_TRUE(sendTransportUnderTest_2.OpenOutputChannel(send_resource_list_2, outputLocator));
ASSERT_FALSE(send_resource_list_2.empty());

std::this_thread::sleep_for(std::chrono::milliseconds(100));

ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2);
}

void TCPv4Tests::HELPER_SetDescriptorDefaults()
{
descriptor.add_listener_port(g_default_port);
Expand Down
38 changes: 38 additions & 0 deletions test/unittest/transport/TCPv6Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <fastrtps/utils/Semaphore.h>

#include <MockReceiverResource.h>
#include "mock/MockTCPv6Transport.h"
#include <rtps/network/NetworkFactory.h>
#include <rtps/transport/TCPv6Transport.h>

Expand Down Expand Up @@ -213,6 +214,43 @@ TEST_F(TCPv6Tests, autofill_port)
transportUnderTest_multiple_autofill.configuration()->listening_ports[2]);
EXPECT_TRUE(transportUnderTest_multiple_autofill.configuration()->listening_ports.size() == 3);
}

// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators.
// Clients typically communicated its PID as its locator port. When having several clients in the same
// process this lead to overwriting server's channel resources map elements.
TEST_F(TCPv6Tests, client_announced_local_port_uniqueness)
{
TCPv6TransportDescriptor recvDescriptor;
recvDescriptor.add_listener_port(g_default_port);
MockTCPv6Transport receiveTransportUnderTest(recvDescriptor);
receiveTransportUnderTest.init();

TCPv6TransportDescriptor sendDescriptor_1;
TCPv6Transport sendTransportUnderTest_1(sendDescriptor_1);
sendTransportUnderTest_1.init();

TCPv6TransportDescriptor sendDescriptor_2;
TCPv6Transport sendTransportUnderTest_2(sendDescriptor_2);
sendTransportUnderTest_2.init();

Locator_t outputLocator;
outputLocator.kind = LOCATOR_KIND_TCPv6;
IPLocator::setIPv6(outputLocator, "::1");
outputLocator.port = g_default_port;
IPLocator::setLogicalPort(outputLocator, 7610);

SendResourceList send_resource_list_1;
ASSERT_TRUE(sendTransportUnderTest_1.OpenOutputChannel(send_resource_list_1, outputLocator));
ASSERT_FALSE(send_resource_list_1.empty());

SendResourceList send_resource_list_2;
ASSERT_TRUE(sendTransportUnderTest_2.OpenOutputChannel(send_resource_list_2, outputLocator));
ASSERT_FALSE(send_resource_list_2.empty());

std::this_thread::sleep_for(std::chrono::milliseconds(100));

ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2);
}
/*
TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports)
{
Expand Down
38 changes: 3 additions & 35 deletions test/unittest/transport/mock/MockTCPv4Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#define MOCK_TRANSPORT_TCP4_STUFF_H

#include <fastrtps/transport/TCPv4TransportDescriptor.h>
#include <fastrtps/utils/IPLocator.h>
#include <rtps/transport/TCPv4Transport.h>

namespace eprosima {
Expand All @@ -25,53 +24,22 @@ namespace rtps {

using TCPv4Transport = eprosima::fastdds::rtps::TCPv4Transport;
using TCPChannelResource = eprosima::fastdds::rtps::TCPChannelResource;
using TCPChannelResourceBasic = eprosima::fastdds::rtps::TCPChannelResourceBasic;
#if TLS_FOUND
using TCPChannelResourceSecure = eprosima::fastdds::rtps::TCPChannelResourceSecure;
#endif // if TLS_FOUND

class MockTCPv4Transport : public TCPv4Transport
{
public:

MockTCPv4Transport(
const TCPv4TransportDescriptor& descriptor)
: TCPv4Transport(descriptor)
{
configuration_ = descriptor;
}

virtual bool OpenOutputChannel(
SendResourceList&,
const Locator_t& locator) override
const std::map<Locator_t, std::shared_ptr<TCPChannelResource>>& get_channel_resources() const
{
const Locator_t& physicalLocator = IPLocator::toPhysicalLocator(locator);
std::shared_ptr<TCPChannelResource> channel(
#if TLS_FOUND
(configuration_.apply_security) ?
static_cast<TCPChannelResource*>(
new TCPChannelResourceSecure(this, io_service_, ssl_context_, physicalLocator, 0)) :
#endif // if TLS_FOUND
static_cast<TCPChannelResource*>(
new TCPChannelResourceBasic(this, io_service_, physicalLocator, 0))
);

channel_resources_[physicalLocator] = channel;
return true;
return channel_resources_;
}

/*
virtual bool CloseOutputChannel(const Locator_t& locator) override
{
const Locator_t& physicalLocator = IPLocator::toPhysicalLocator(locator);
auto it = channel_resources_.find(physicalLocator);
if (it != channel_resources_.end())
{
delete it->second;
channel_resources_.erase(it);
}
return true;
}
*/
};

} // namespace rtps
Expand Down
Loading

0 comments on commit 45ac81e

Please sign in to comment.