Skip to content

Commit

Permalink
Refs #19939: Channel disabling relocated and OnDataReceived() callbac…
Browse files Browse the repository at this point in the history
…k protected

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
  • Loading branch information
jepemi committed Dec 5, 2023
1 parent 13ebeba commit 1264331
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 12 deletions.
33 changes: 26 additions & 7 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,21 @@ void TCPTransportInterface::bind_socket(
auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel);
assert(it_remove != unbound_channel_resources_.end());
unbound_channel_resources_.erase(it_remove);

unbound_lock.unlock();

// Look for an existing channel that matches this physical locator
auto existing_channel = channel_resources_.find(channel->locator());
// If the channel exists, check if the channel reference wait until it finishes its tasks
if (existing_channel != channel_resources_.end())
{
// Disconnect the old channel
existing_channel->second->disconnect();
scopedLock.unlock();
existing_channel->second->clear();
scopedLock.lock();
}

channel_resources_[channel->locator()] = channel;

}
Expand Down Expand Up @@ -678,9 +693,6 @@ bool TCPTransportInterface::OpenOutputChannel(
if (existing_channel != channel_resources_.end() &&
existing_channel->second != tcp_sender_resource->channel())
{
// Disconnect the old channel
tcp_sender_resource->channel()->disconnect();
tcp_sender_resource->channel()->clear();
// Update sender resource with new channel
tcp_sender_resource->channel() = existing_channel->second;
}
Expand Down Expand Up @@ -917,10 +929,17 @@ void TCPTransportInterface::perform_listen_operation(
{
TransportReceiverInterface* receiver = it->second.first;
ReceiverInUseCV* receiver_in_use = it->second.second;
receiver_in_use->in_use = true;
scopedLock.unlock();
receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
scopedLock.lock();
receiver_in_use->cv.wait(scopedLock, [&]()
{
return receiver_in_use->in_use == false;
});
if (TCPChannelResource::eConnectionStatus::eConnecting < channel->connection_status())
{
receiver_in_use->in_use = true;
scopedLock.unlock();
receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
scopedLock.lock();
}
receiver_in_use->in_use = false;
receiver_in_use->cv.notify_one();
}
Expand Down
6 changes: 4 additions & 2 deletions test/blackbox/common/BlackboxTestsTransportTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,16 +615,18 @@ TEST_P(TransportTCP, TCPv6_copy)
EXPECT_EQ(tcpv6_transport_copy, tcpv6_transport);
}

// Test connection is successfully restablished after dropping and relaunching a TCP client (requester)
// Test connection is successfully restablished after dropping and relaunching a TCP client (requester),
// when the server's listening thread for the old client hasn't processed all its messages.
// Issue -> https://github.com/eProsima/Fast-DDS/issues/2409
// Issue -> https://github.com/eProsima/Fast-DDS/issues/4026
TEST(TransportTCP, Client_reconnection)
{
TCPReqRepHelloWorldReplier* replier;
TCPReqRepHelloWorldRequester* requester;
const uint16_t nmsgs = 5;

replier = new TCPReqRepHelloWorldReplier;
replier->init(1, 0, global_port);
replier->init(1, 0, global_port, 0, nullptr, true);

ASSERT_TRUE(replier->isInitialized());

Expand Down
13 changes: 11 additions & 2 deletions test/blackbox/common/TCPReqRepHelloWorldReplier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ using namespace eprosima::fastrtps::rtps;
TCPReqRepHelloWorldReplier::TCPReqRepHelloWorldReplier()
: request_listener_(*this)
, reply_listener_(*this)
, reply_bussy_listener_(*this)
, participant_(nullptr)
, request_subscriber_(nullptr)
, reply_publisher_(nullptr)
Expand All @@ -65,7 +66,8 @@ void TCPReqRepHelloWorldReplier::init(
int domainId,
uint16_t listeningPort,
uint32_t maxInitialPeer,
const char* certs_path)
const char* certs_path,
bool use_bussy_listener)
{
ParticipantAttributes pattr;
pattr.domainId = domainId;
Expand Down Expand Up @@ -132,7 +134,14 @@ void TCPReqRepHelloWorldReplier::init(
puattr.topic.topicDataType = type_.getName();
puattr.topic.topicName = "HelloWorldTopicReply";
configPublisher("Reply");
reply_publisher_ = Domain::createPublisher(participant_, puattr, &reply_listener_);
if(use_bussy_listener)
{
reply_publisher_ = Domain::createPublisher(participant_, puattr, &reply_bussy_listener_);
}
else
{
reply_publisher_ = Domain::createPublisher(participant_, puattr, &reply_listener_);
}
ASSERT_NE(reply_publisher_, nullptr);

initialized_ = true;
Expand Down
42 changes: 41 additions & 1 deletion test/blackbox/common/TCPReqRepHelloWorldReplier.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fastrtps/publisher/PublisherListener.h>
#include <fastrtps/attributes/PublisherAttributes.h>

#include <thread>
#include <list>
#include <condition_variable>
#include <asio.hpp>
Expand Down Expand Up @@ -118,14 +119,53 @@ class TCPReqRepHelloWorldReplier
}
reply_listener_;

class RequestBussyListener : public eprosima::fastrtps::PublisherListener
{
public:

RequestBussyListener(
TCPReqRepHelloWorldReplier& replier)
: replier_(replier)
{
}

~RequestBussyListener()
{
}

void onPublicationMatched(
eprosima::fastrtps::Publisher* /*pub*/,
eprosima::fastrtps::rtps::MatchingInfo& info)
{
if (info.status == eprosima::fastrtps::rtps::MATCHED_MATCHING)
{
replier_.matched();
}
else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

private:

RequestBussyListener& operator =(
const RequestBussyListener&) = delete;

TCPReqRepHelloWorldReplier& replier_;

}
reply_bussy_listener_;

TCPReqRepHelloWorldReplier();
virtual ~TCPReqRepHelloWorldReplier();
void init(
int participantId,
int domainId,
uint16_t listeningPort,
uint32_t maxInitialPeer = 0,
const char* certs_path = nullptr);
const char* certs_path = nullptr,
bool use_bussy_listener = false);
bool isInitialized() const
{
return initialized_;
Expand Down

0 comments on commit 1264331

Please sign in to comment.