Skip to content

Commit

Permalink
Fix Proxy block in TypeLookup Service (#4901)
Browse files Browse the repository at this point in the history
* Refs #20953: Fix Data Races

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20953: Make a proxy copy in TLUManager

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

---------

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>
  • Loading branch information
cferreiragonz authored Jun 10, 2024
1 parent 72642f5 commit 8c89074
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 26 deletions.
24 changes: 19 additions & 5 deletions src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
typename eprosima::ProxyPool<ProxyType>::smart_ptr& temp_proxy_data,
const AsyncCallback& callback,
std::unordered_map<xtypes::TypeIdentfierWithSize,
std::vector<std::pair<typename eprosima::ProxyPool<ProxyType>::smart_ptr,
std::vector<std::pair<ProxyType*,
AsyncCallback>>>& async_get_type_callbacks)
{
xtypes::TypeIdentfierWithSize type_identifier_with_size =
Expand All @@ -341,7 +341,7 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
is_type_identifier_known(type_identifier_with_size))
{
// The type is already known, invoke the callback
callback(temp_proxy_data);
callback(temp_proxy_data.get());
return RETCODE_OK;
}

Expand All @@ -352,7 +352,9 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
if (it != async_get_type_callbacks.end())
{
// TypeIdentfierWithSize exists, add the callback
it->second.push_back(std::make_pair(std::move(temp_proxy_data), callback));
// Make a copy of the proxy to free the EDP pool
ProxyType* temp_proxy_data_copy(new ProxyType(*temp_proxy_data));
it->second.push_back(std::make_pair(temp_proxy_data_copy, callback));
// Return without sending new request
return RETCODE_NO_DATA;
}
Expand All @@ -365,8 +367,10 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
{
// Store the sent requests and callback
add_async_get_type_request(get_type_dependencies_request, type_identifier_with_size);
std::vector<std::pair<typename eprosima::ProxyPool<ProxyType>::smart_ptr, AsyncCallback>> types;
types.push_back(std::make_pair(std::move(temp_proxy_data), callback));
std::vector<std::pair<ProxyType*, AsyncCallback>> types;
// Make a copy of the proxy to free the EDP pool
ProxyType* temp_proxy_data_copy(new ProxyType(*temp_proxy_data));
types.push_back(std::make_pair(temp_proxy_data_copy, callback));
async_get_type_callbacks.emplace(type_identifier_with_size, std::move(types));

return RETCODE_NO_DATA;
Expand Down Expand Up @@ -442,13 +446,23 @@ bool TypeLookupManager::remove_async_get_type_callback(
auto writer_it = async_get_type_writer_callbacks_.find(type_identifier_with_size);
if (writer_it != async_get_type_writer_callbacks_.end())
{
// Delete the proxies and remove the entry
for (auto& proxy_callback_pair : writer_it->second)
{
delete proxy_callback_pair.first;
}
async_get_type_writer_callbacks_.erase(writer_it);
removed = true;
}
// Check if the key is in the reader map
auto reader_it = async_get_type_reader_callbacks_.find(type_identifier_with_size);
if (reader_it != async_get_type_reader_callbacks_.end())
{
// Delete the proxies and remove the entry
for (auto& proxy_callback_pair : reader_it->second)
{
delete proxy_callback_pair.first;
}
async_get_type_reader_callbacks_.erase(reader_it);
removed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ namespace builtin {
const SampleIdentity INVALID_SAMPLE_IDENTITY;

using AsyncGetTypeWriterCallback = std::function<
void (eprosima::ProxyPool<eprosima::fastrtps::rtps::WriterProxyData>::smart_ptr&)>;
void (eprosima::fastrtps::rtps::WriterProxyData*)>;
using AsyncGetTypeReaderCallback = std::function<
void (eprosima::ProxyPool<eprosima::fastrtps::rtps::ReaderProxyData>::smart_ptr&)>;
void (eprosima::fastrtps::rtps::ReaderProxyData*)>;

/**
* Class TypeLookupManager that implements the TypeLookup Service described in the DDS-XTYPES 1.3 specification.
Expand Down Expand Up @@ -211,7 +211,7 @@ class TypeLookupManager
typename eprosima::ProxyPool<ProxyType>::smart_ptr& temp_proxy_data,
const AsyncCallback& callback,
std::unordered_map<xtypes::TypeIdentfierWithSize,
std::vector<std::pair<typename eprosima::ProxyPool<ProxyType>::smart_ptr,
std::vector<std::pair<ProxyType*,
AsyncCallback>>>& async_get_type_callbacks);

/**
Expand Down Expand Up @@ -427,12 +427,12 @@ class TypeLookupManager

//! Collection of all the WriterProxyData and their callbacks related to a TypeIdentfierWithSize, hashed by its TypeIdentfierWithSize.
std::unordered_map < xtypes::TypeIdentfierWithSize,
std::vector<std::pair<eprosima::ProxyPool<eprosima::fastrtps::rtps::WriterProxyData>::smart_ptr,
std::vector<std::pair<eprosima::fastrtps::rtps::WriterProxyData*,
AsyncGetTypeWriterCallback>>> async_get_type_writer_callbacks_;

//! Collection of all the ReaderProxyData and their callbacks related to a TypeIdentfierWithSize, hashed by its TypeIdentfierWithSize.
std::unordered_map < xtypes::TypeIdentfierWithSize,
std::vector<std::pair<eprosima::ProxyPool<eprosima::fastrtps::rtps::ReaderProxyData>::smart_ptr,
std::vector<std::pair<eprosima::fastrtps::rtps::ReaderProxyData*,
AsyncGetTypeReaderCallback>>> async_get_type_reader_callbacks_;

//! Collection of all SampleIdentity and the TypeIdentfierWithSize it originated from, hashed by its SampleIdentity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,11 @@ void TypeLookupReplyListener::onNewCacheChangeAdded(
return;
}

// Add reply to the processing queue
replies_queue_.push(ReplyWithServerGUID{reply, change->writerGUID});
{
// Notify processor
std::unique_lock<std::mutex> guard(replies_processor_cv_mutex_);
// Add reply to the processing queue
replies_queue_.push(ReplyWithServerGUID{reply, change->writerGUID});
// Notify processor
replies_processor_cv_.notify_all();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,11 @@ void TypeLookupRequestListener::onNewCacheChangeAdded(
TypeLookup_Request request;
if (typelookup_manager_->receive(*change, request))
{
// Add request to the processing queue
requests_queue_.push(request);
{
// Notify processor
std::unique_lock<std::mutex> guard(request_processor_cv_mutex_);
// Add request to the processing queue
requests_queue_.push(request);
// Notify processor
request_processor_cv_.notify_all();
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void EDPBasePUBListener::add_writer_from_change(
// Callback function to continue after typelookup is complete
fastdds::dds::builtin::AsyncGetTypeWriterCallback after_typelookup_callback =
[reader, change, edp, &network, writer_added_callback]
(eprosima::ProxyPool<eprosima::fastrtps::rtps::WriterProxyData>::smart_ptr& temp_writer_data)
(eprosima::fastrtps::rtps::WriterProxyData* temp_writer_data)
{
//LOAD INFORMATION IN DESTINATION WRITER PROXY DATA
auto copy_data_fun = [&temp_writer_data, &network](
Expand Down Expand Up @@ -114,9 +114,6 @@ void EDPBasePUBListener::add_writer_from_change(
WriterProxyData* writer_data =
edp->mp_PDP->addWriterProxyData(temp_writer_data->guid(), participant_guid, copy_data_fun);

// release temporary proxy
temp_writer_data.reset();

if (writer_data != nullptr)
{
edp->pairing_writer_proxy_with_any_local_reader(participant_guid, writer_data);
Expand Down Expand Up @@ -148,8 +145,11 @@ void EDPBasePUBListener::add_writer_from_change(
else
{
EPROSIMA_LOG_INFO(RTPS_EDP, "EDPBasePUBListener: No TypeInformation. Trying fallback mechanism");
after_typelookup_callback(temp_writer_data);
after_typelookup_callback(temp_writer_data.get());
}
// Release temporary proxy
temp_writer_data.reset();


// Take the reader lock again if needed.
reader->getMutex().lock();
Expand Down Expand Up @@ -226,8 +226,9 @@ void EDPBaseSUBListener::add_reader_from_change(
// Callback function to continue after typelookup is complete
fastdds::dds::builtin::AsyncGetTypeReaderCallback after_typelookup_callback =
[reader, change, edp, &network, reader_added_callback]
(eprosima::ProxyPool<eprosima::fastrtps::rtps::ReaderProxyData>::smart_ptr& temp_reader_data)
(eprosima::fastrtps::rtps::ReaderProxyData* temp_reader_data)
{
//LOAD INFORMATION IN DESTINATION READER PROXY DATA
auto copy_data_fun = [&temp_reader_data, &network](
ReaderProxyData* data,
bool updating,
Expand All @@ -254,9 +255,6 @@ void EDPBaseSUBListener::add_reader_from_change(
ReaderProxyData* reader_data =
edp->mp_PDP->addReaderProxyData(temp_reader_data->guid(), participant_guid, copy_data_fun);

// Release the temporary proxy
temp_reader_data.reset();

if (reader_data != nullptr) //ADDED NEW DATA
{
edp->pairing_reader_proxy_with_any_local_writer(participant_guid, reader_data);
Expand Down Expand Up @@ -288,8 +286,10 @@ void EDPBaseSUBListener::add_reader_from_change(
else
{
EPROSIMA_LOG_INFO(RTPS_EDP, "EDPBasePUBListener: No TypeInformation. Trying fallback mechanism");
after_typelookup_callback(temp_reader_data);
after_typelookup_callback(temp_reader_data.get());
}
// Release the temporary proxy
temp_reader_data.reset();

// Take the reader lock again if needed.
reader->getMutex().lock();
Expand Down

0 comments on commit 8c89074

Please sign in to comment.