Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20650] Create InitialConnection for TCP initial peers 3.x & 2.14 #4946

Merged
merged 2 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,12 @@ bool PDPSimple::create_dcps_participant_endpoints()

WriterAttributes watt = create_builtin_writer_attributes();
watt.endpoint.reliabilityKind = BEST_EFFORT;
watt.endpoint.remoteLocatorList = m_discovery.initialPeersList;
if (!m_discovery.initialPeersList.empty())
{
auto entry = LocatorSelectorEntry::create_fully_selected_entry(
m_discovery.initialPeersList, LocatorList_t());
mp_RTPSParticipant->createSenderResources(entry);
}

// We assume that if we have at least one flow controller defined, we use async flow controller
if (!pattr.flow_controllers.empty())
Expand Down
13 changes: 13 additions & 0 deletions src/cpp/rtps/transport/UDPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,19 @@ bool UDPTransportInterface::OpenOutputChannel(
return true;
}

bool UDPTransportInterface::OpenOutputChannels(
SendResourceList& send_resource_list,
const LocatorSelectorEntry& locator_selector_entry)
{
bool success = false;
for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i)
{
size_t index = locator_selector_entry.state.unicast[i];
success |= OpenOutputChannel(send_resource_list, locator_selector_entry.unicast[index]);
}
return success;
}

Locator UDPTransportInterface::RemoteToMainLocal(
const Locator& remote) const
{
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/UDPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ class UDPTransportInterface : public TransportInterface
SendResourceList& sender_resource_list,
const Locator&) override;

/**
* Opens a socket on the locators provided by the given locator_selector_entry.
*
* @param sender_resource_list Participant's send resource list.
* @param locator_selector_entry Locator selector entry with the remote entity locators.
*
* @return true if the socket was correctly opened or if finding an already opened one.
*/
bool OpenOutputChannels(
SendResourceList& sender_resource_list,
const LocatorSelectorEntry& locator_selector_entry) override;

/**
* Converts a given remote locator (that is, a locator referring to a remote
* destination) to the main local locator whose channel can write to that
Expand Down
13 changes: 13 additions & 0 deletions src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,19 @@ bool SharedMemTransport::OpenOutputChannel(
return true;
}

bool SharedMemTransport::OpenOutputChannels(
SendResourceList& send_resource_list,
const LocatorSelectorEntry& locator_selector_entry)
{
bool success = false;
for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i)
{
size_t index = locator_selector_entry.state.unicast[i];
success |= OpenOutputChannel(send_resource_list, locator_selector_entry.unicast[index]);
}
return success;
}

Locator SharedMemTransport::RemoteToMainLocal(
const Locator& remote) const
{
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/shared_mem/SharedMemTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ class SharedMemTransport : public TransportInterface
SendResourceList& sender_resource_list,
const Locator&) override;

/**
* Opens a socket on the locators provided by the given locator_selector_entry.
*
* @param sender_resource_list Participant's send resource list.
* @param locator_selector_entry Locator selector entry with the remote entity locators.
*
* @return true if the socket was correctly opened or if finding an already opened one.
*/
bool OpenOutputChannels(
SendResourceList& sender_resource_list,
const LocatorSelectorEntry& locator_selector_entry) override;

/**
* Converts a given remote locator (that is, a locator referring to a remote
* destination) to the main local locator whose channel can write to that
Expand Down
64 changes: 62 additions & 2 deletions test/blackbox/common/BlackboxTestsTransportTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ TEST(TransportTCP, Client_reconnection)
delete requester;
}

// Test copy constructor and copy assignment for TCPv4
// Test zero listening port for TCPv4
TEST_P(TransportTCP, TCPv4_autofill_port)
{
PubSubReader<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
Expand Down Expand Up @@ -704,7 +704,7 @@ TEST_P(TransportTCP, TCPv4_autofill_port)
EXPECT_TRUE(IPLocator::getPhysicalPort(p2_locators.begin()[0]) == port);
}

// Test copy constructor and copy assignment for TCPv6
// Test zero listening port for TCPv6
TEST_P(TransportTCP, TCPv6_autofill_port)
{
PubSubReader<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
Expand Down Expand Up @@ -1325,6 +1325,66 @@ TEST_P(TransportTCP, large_message_large_data_send_receive)
reader.block_for_all();
}

// Test CreateInitialConnection for TCP
TEST_P(TransportTCP, TCP_initial_peers_connection)
{
PubSubWriter<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> p2(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> p3(TEST_TOPIC_NAME);

// Add TCP Transport with listening port
auto p1_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
p1_transport->add_listener_port(global_port);
auto p2_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
p2_transport->add_listener_port(global_port + 1);
auto p3_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
p3_transport->add_listener_port(global_port - 1);

// Add initial peer to clients
Locator_t initialPeerLocator;
initialPeerLocator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1);
initialPeerLocator.port = global_port;
LocatorList_t initial_peer_list;
initial_peer_list.push_back(initialPeerLocator);

// Setup participants
p1.disable_builtin_transport()
.add_user_transport_to_pparams(p1_transport);

p2.disable_builtin_transport()
.initial_peers(initial_peer_list)
.add_user_transport_to_pparams(p2_transport);

p3.disable_builtin_transport()
.initial_peers(initial_peer_list)
.add_user_transport_to_pparams(p3_transport);

// Init participants
p1.init();
p2.init();
p3.init();
ASSERT_TRUE(p1.isInitialized());
ASSERT_TRUE(p2.isInitialized());
ASSERT_TRUE(p3.isInitialized());

// Wait for discovery
p1.wait_discovery(2, std::chrono::seconds(0));
p2.wait_discovery(std::chrono::seconds(0), 1);
p3.wait_discovery(std::chrono::seconds(0), 1);

// Send and receive data
auto data = default_helloworld_data_generator();
p2.startReception(data);
p3.startReception(data);

p1.send(data);
EXPECT_TRUE(data.empty());

p2.block_for_all();
p3.block_for_all();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
Loading