From b521c14caec3ddd08fae04d036ef1421d0e1abf7 Mon Sep 17 00:00:00 2001 From: Eduardo Ponz Segrelles Date: Fri, 22 Dec 2023 10:27:49 +0100 Subject: [PATCH] Revert "TCP deadlock on channel reuse (#4099)" (#4181) * Revert "TCP deadlock on channel reuse (#4099)" This reverts commit dd4c434ccd3a029e1a4aa207443e79b05db5eb61. Signed-off-by: EduPonz * Refs #20055: Separate builtin transports tests into individual cases Signed-off-by: EduPonz * Refs #20055: Mark large_data tests as flaky due to TCP Signed-off-by: EduPonz --------- Signed-off-by: EduPonz (cherry picked from commit 5e87eb3acb6c56acc0c1624d4502b2a406ee21fc) --- .../rtps/transport/TCPTransportInterface.cpp | 33 ++++--------------- .../common/BlackboxTestsTransportTCP.cpp | 6 ++-- .../common/TCPReqRepHelloWorldReplier.cpp | 8 +---- .../common/TCPReqRepHelloWorldReplier.hpp | 16 +-------- 4 files changed, 11 insertions(+), 52 deletions(-) diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 04d69bd95c1..9df4a5b25b3 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -222,21 +222,6 @@ void TCPTransportInterface::bind_socket( auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel); assert(it_remove != unbound_channel_resources_.end()); unbound_channel_resources_.erase(it_remove); - - unbound_lock.unlock(); - - // Look for an existing channel that matches this physical locator - auto existing_channel = channel_resources_.find(channel->locator()); - // If the channel exists, check if the channel reference wait until it finishes its tasks - if (existing_channel != channel_resources_.end()) - { - // Disconnect the old channel - existing_channel->second->disconnect(); - scopedLock.unlock(); - existing_channel->second->clear(); - scopedLock.lock(); - } - channel_resources_[channel->locator()] = channel; } @@ -645,6 +630,9 @@ bool TCPTransportInterface::OpenOutputChannel( if (existing_channel != channel_resources_.end() && existing_channel->second != tcp_sender_resource->channel()) { + // Disconnect the old channel + tcp_sender_resource->channel()->disconnect(); + tcp_sender_resource->channel()->clear(); // Update sender resource with new channel tcp_sender_resource->channel() = existing_channel->second; } @@ -864,17 +852,10 @@ void TCPTransportInterface::perform_listen_operation( { TransportReceiverInterface* receiver = it->second.first; ReceiverInUseCV* receiver_in_use = it->second.second; - receiver_in_use->cv.wait(scopedLock, [&]() - { - return receiver_in_use->in_use == false; - }); - if (TCPChannelResource::eConnectionStatus::eConnecting < channel->connection_status()) - { - receiver_in_use->in_use = true; - scopedLock.unlock(); - receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator); - scopedLock.lock(); - } + receiver_in_use->in_use = true; + scopedLock.unlock(); + receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator); + scopedLock.lock(); receiver_in_use->in_use = false; receiver_in_use->cv.notify_one(); } diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index eebb92d3781..e23328a9abe 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -603,10 +603,8 @@ TEST_P(TransportTCP, TCPv6_copy) EXPECT_EQ(tcpv6_transport_copy, tcpv6_transport); } -// Test connection is successfully restablished after dropping and relaunching a TCP client (requester), -// when the server's listening thread for the old client hasn't processed all its messages. +// Test connection is successfully restablished after dropping and relaunching a TCP client (requester) // Issue -> https://github.com/eProsima/Fast-DDS/issues/2409 -// Issue -> https://github.com/eProsima/Fast-DDS/issues/4026 TEST(TransportTCP, Client_reconnection) { TCPReqRepHelloWorldReplier* replier; @@ -614,7 +612,7 @@ TEST(TransportTCP, Client_reconnection) const uint16_t nmsgs = 5; replier = new TCPReqRepHelloWorldReplier; - replier->init(1, 0, global_port, 0, nullptr, true); + replier->init(1, 0, global_port); ASSERT_TRUE(replier->isInitialized()); diff --git a/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp b/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp index d5c5d07e370..30776253694 100644 --- a/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp +++ b/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp @@ -65,8 +65,7 @@ void TCPReqRepHelloWorldReplier::init( int domainId, uint16_t listeningPort, uint32_t maxInitialPeer, - const char* certs_folder, - bool use_busy_listener) + const char* certs_folder) { ParticipantAttributes pattr; pattr.domainId = domainId; @@ -133,14 +132,9 @@ void TCPReqRepHelloWorldReplier::init( puattr.topic.topicDataType = type_.getName(); puattr.topic.topicName = "HelloWorldTopicReply"; configPublisher("Reply"); - if (use_busy_listener) - { - reply_listener_.use_busy_listener(true); - } reply_publisher_ = Domain::createPublisher(participant_, puattr, &reply_listener_); ASSERT_NE(reply_publisher_, nullptr); - initialized_ = true; } diff --git a/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp b/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp index 3c76fa16581..4dfceeea3c5 100644 --- a/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp +++ b/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp @@ -28,7 +28,6 @@ #include #include -#include #include #include #include @@ -92,7 +91,6 @@ class TCPReqRepHelloWorldReplier RequestListener( TCPReqRepHelloWorldReplier& replier) : replier_(replier) - , use_busy_listener_(false) { } @@ -108,16 +106,6 @@ class TCPReqRepHelloWorldReplier { replier_.matched(); } - else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING && use_busy_listener_) - { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - } - - void use_busy_listener( - const bool& value) - { - use_busy_listener_ = value; } private: @@ -126,7 +114,6 @@ class TCPReqRepHelloWorldReplier const RequestListener&) = delete; TCPReqRepHelloWorldReplier& replier_; - bool use_busy_listener_; } reply_listener_; @@ -138,8 +125,7 @@ class TCPReqRepHelloWorldReplier int domainId, uint16_t listeningPort, uint32_t maxInitialPeer = 0, - const char* certs_folder = nullptr, - bool use_busy_listener = false); + const char* certs_folder = nullptr); bool isInitialized() const { return initialized_;