diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 14c9ccde6f1..2fd9a2d60e2 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -258,6 +258,21 @@ void TCPTransportInterface::bind_socket( auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel); assert(it_remove != unbound_channel_resources_.end()); unbound_channel_resources_.erase(it_remove); + + unbound_lock.unlock(); + + // Look for an existing channel that matches this physical locator + auto existing_channel = channel_resources_.find(channel->locator()); + // If the channel exists, check if the channel reference wait until it finishes its tasks + if (existing_channel != channel_resources_.end()) + { + // Disconnect the old channel + existing_channel->second->disconnect(); + scopedLock.unlock(); + existing_channel->second->clear(); + scopedLock.lock(); + } + channel_resources_[channel->locator()] = channel; } @@ -678,9 +693,6 @@ bool TCPTransportInterface::OpenOutputChannel( if (existing_channel != channel_resources_.end() && existing_channel->second != tcp_sender_resource->channel()) { - // Disconnect the old channel - tcp_sender_resource->channel()->disconnect(); - tcp_sender_resource->channel()->clear(); // Update sender resource with new channel tcp_sender_resource->channel() = existing_channel->second; } @@ -917,10 +929,17 @@ void TCPTransportInterface::perform_listen_operation( { TransportReceiverInterface* receiver = it->second.first; ReceiverInUseCV* receiver_in_use = it->second.second; - receiver_in_use->in_use = true; - scopedLock.unlock(); - receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator); - scopedLock.lock(); + receiver_in_use->cv.wait(scopedLock, [&]() + { + return receiver_in_use->in_use == false; + }); + if (TCPChannelResource::eConnectionStatus::eConnecting < channel->connection_status()) + { + receiver_in_use->in_use = true; + scopedLock.unlock(); + receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator); + scopedLock.lock(); + } receiver_in_use->in_use = false; receiver_in_use->cv.notify_one(); } diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index d22dc6639af..1751654532a 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -615,8 +615,10 @@ TEST_P(TransportTCP, TCPv6_copy) EXPECT_EQ(tcpv6_transport_copy, tcpv6_transport); } -// Test connection is successfully restablished after dropping and relaunching a TCP client (requester) +// Test connection is successfully restablished after dropping and relaunching a TCP client (requester), +// when the server's listening thread for the old client hasn't processed all its messages. // Issue -> https://github.com/eProsima/Fast-DDS/issues/2409 +// Issue -> https://github.com/eProsima/Fast-DDS/issues/4026 TEST(TransportTCP, Client_reconnection) { TCPReqRepHelloWorldReplier* replier; @@ -624,7 +626,7 @@ TEST(TransportTCP, Client_reconnection) const uint16_t nmsgs = 5; replier = new TCPReqRepHelloWorldReplier; - replier->init(1, 0, global_port); + replier->init(1, 0, global_port, 0, nullptr, true); ASSERT_TRUE(replier->isInitialized()); diff --git a/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp b/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp index 4910c44bbe6..6bd5f6b7633 100644 --- a/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp +++ b/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp @@ -65,7 +65,8 @@ void TCPReqRepHelloWorldReplier::init( int domainId, uint16_t listeningPort, uint32_t maxInitialPeer, - const char* certs_path) + const char* certs_path, + bool use_busy_listener) { ParticipantAttributes pattr; pattr.domainId = domainId; @@ -132,9 +133,14 @@ void TCPReqRepHelloWorldReplier::init( puattr.topic.topicDataType = type_.getName(); puattr.topic.topicName = "HelloWorldTopicReply"; configPublisher("Reply"); + if (use_busy_listener) + { + reply_listener_.use_busy_listener(true); + } reply_publisher_ = Domain::createPublisher(participant_, puattr, &reply_listener_); ASSERT_NE(reply_publisher_, nullptr); + initialized_ = true; } diff --git a/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp b/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp index c08100b0faf..fff5d0e74ba 100644 --- a/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp +++ b/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -91,6 +92,7 @@ class TCPReqRepHelloWorldReplier RequestListener( TCPReqRepHelloWorldReplier& replier) : replier_(replier) + , use_busy_listener_(false) { } @@ -106,6 +108,16 @@ class TCPReqRepHelloWorldReplier { replier_.matched(); } + else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING && use_busy_listener_) + { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + } + + void use_busy_listener( + const bool& value) + { + use_busy_listener_ = value; } private: @@ -114,6 +126,7 @@ class TCPReqRepHelloWorldReplier const RequestListener&) = delete; TCPReqRepHelloWorldReplier& replier_; + bool use_busy_listener_; } reply_listener_; @@ -125,7 +138,8 @@ class TCPReqRepHelloWorldReplier int domainId, uint16_t listeningPort, uint32_t maxInitialPeer = 0, - const char* certs_path = nullptr); + const char* certs_path = nullptr, + bool use_busy_listener = false); bool isInitialized() const { return initialized_;