Skip to content

Commit

Permalink
Revert "TCP deadlock on channel reuse (#4099)" (#4181)
Browse files Browse the repository at this point in the history
* Revert "TCP deadlock on channel reuse (#4099)"

This reverts commit dd4c434.

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Refs #20055: Separate builtin transports tests into individual cases

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Refs #20055: Mark large_data tests as flaky due to TCP

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

---------

Signed-off-by: EduPonz <eduardoponz@eprosima.com>
  • Loading branch information
EduPonz committed Jan 9, 2024
1 parent 0b1b377 commit 306c0e7
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 210 deletions.
33 changes: 7 additions & 26 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,21 +222,6 @@ void TCPTransportInterface::bind_socket(
auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel);
assert(it_remove != unbound_channel_resources_.end());
unbound_channel_resources_.erase(it_remove);

unbound_lock.unlock();

// Look for an existing channel that matches this physical locator
auto existing_channel = channel_resources_.find(channel->locator());
// If the channel exists, check if the channel reference wait until it finishes its tasks
if (existing_channel != channel_resources_.end())
{
// Disconnect the old channel
existing_channel->second->disconnect();
scopedLock.unlock();
existing_channel->second->clear();
scopedLock.lock();
}

channel_resources_[channel->locator()] = channel;

}
Expand Down Expand Up @@ -645,6 +630,9 @@ bool TCPTransportInterface::OpenOutputChannel(
if (existing_channel != channel_resources_.end() &&
existing_channel->second != tcp_sender_resource->channel())
{
// Disconnect the old channel
tcp_sender_resource->channel()->disconnect();
tcp_sender_resource->channel()->clear();
// Update sender resource with new channel
tcp_sender_resource->channel() = existing_channel->second;
}
Expand Down Expand Up @@ -864,17 +852,10 @@ void TCPTransportInterface::perform_listen_operation(
{
TransportReceiverInterface* receiver = it->second.first;
ReceiverInUseCV* receiver_in_use = it->second.second;
receiver_in_use->cv.wait(scopedLock, [&]()
{
return receiver_in_use->in_use == false;
});
if (TCPChannelResource::eConnectionStatus::eConnecting < channel->connection_status())
{
receiver_in_use->in_use = true;
scopedLock.unlock();
receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
scopedLock.lock();
}
receiver_in_use->in_use = true;
scopedLock.unlock();
receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
scopedLock.lock();
receiver_in_use->in_use = false;
receiver_in_use->cv.notify_one();
}
Expand Down
6 changes: 6 additions & 0 deletions test/blackbox/XFAIL_DDS_PIM.list
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ BlackboxTests_DDS_PIM.LivelinessQos.ThreeWriters_ThreeReaders.Intraprocess
BlackboxTests_DDS_PIM.LivelinessQos.ThreeWriters_ThreeReaders.Transport
BlackboxTests_DDS_PIM.LivelinessQos.TwoWriters_OneReader_ManualByParticipant.Intraprocess
BlackboxTests_DDS_PIM.PersistenceLargeData.PubSubAsReliablePubPersistentWithFrag.Transport
BlackboxTests_DDS_PIM.ChainingTransportTests.builtin_transports_api_large_data
BlackboxTests_DDS_PIM.ChainingTransportTests.builtin_transports_api_large_datav6
BlackboxTests_DDS_PIM.ChainingTransportTests.builtin_transports_env_large_data
BlackboxTests_DDS_PIM.ChainingTransportTests.builtin_transports_env_large_datav6
BlackboxTests_DDS_PIM.ChainingTransportTests.builtin_transports_xml_large_data
BlackboxTests_DDS_PIM.ChainingTransportTests.builtin_transports_xml_large_datav6
Loading

0 comments on commit 306c0e7

Please sign in to comment.