diff --git a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp index fae42314fea..a01fdf690d6 100644 --- a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp +++ b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp @@ -329,7 +329,7 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received( typename eprosima::ProxyPool::smart_ptr& temp_proxy_data, const AsyncCallback& callback, std::unordered_map::smart_ptr, + std::vector>>& async_get_type_callbacks) { xtypes::TypeIdentfierWithSize type_identifier_with_size = @@ -341,7 +341,7 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received( is_type_identifier_known(type_identifier_with_size)) { // The type is already known, invoke the callback - callback(temp_proxy_data); + callback(temp_proxy_data.get()); return RETCODE_OK; } @@ -352,7 +352,9 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received( if (it != async_get_type_callbacks.end()) { // TypeIdentfierWithSize exists, add the callback - it->second.push_back(std::make_pair(std::move(temp_proxy_data), callback)); + // Make a copy of the proxy to free the EDP pool + ProxyType* temp_proxy_data_copy(new ProxyType(*temp_proxy_data)); + it->second.push_back(std::make_pair(temp_proxy_data_copy, callback)); // Return without sending new request return RETCODE_NO_DATA; } @@ -365,8 +367,10 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received( { // Store the sent requests and callback add_async_get_type_request(get_type_dependencies_request, type_identifier_with_size); - std::vector::smart_ptr, AsyncCallback>> types; - types.push_back(std::make_pair(std::move(temp_proxy_data), callback)); + std::vector> types; + // Make a copy of the proxy to free the EDP pool + ProxyType* temp_proxy_data_copy(new ProxyType(*temp_proxy_data)); + types.push_back(std::make_pair(temp_proxy_data_copy, callback)); async_get_type_callbacks.emplace(type_identifier_with_size, std::move(types)); return RETCODE_NO_DATA; @@ -442,6 +446,11 @@ bool TypeLookupManager::remove_async_get_type_callback( auto writer_it = async_get_type_writer_callbacks_.find(type_identifier_with_size); if (writer_it != async_get_type_writer_callbacks_.end()) { + // Delete the proxies and remove the entry + for (auto& proxy_callback_pair : writer_it->second) + { + delete proxy_callback_pair.first; + } async_get_type_writer_callbacks_.erase(writer_it); removed = true; } @@ -449,6 +458,11 @@ bool TypeLookupManager::remove_async_get_type_callback( auto reader_it = async_get_type_reader_callbacks_.find(type_identifier_with_size); if (reader_it != async_get_type_reader_callbacks_.end()) { + // Delete the proxies and remove the entry + for (auto& proxy_callback_pair : reader_it->second) + { + delete proxy_callback_pair.first; + } async_get_type_reader_callbacks_.erase(reader_it); removed = true; } diff --git a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp index e98d08f2003..fda0dd71f2a 100644 --- a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp +++ b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.hpp @@ -105,9 +105,9 @@ namespace builtin { const SampleIdentity INVALID_SAMPLE_IDENTITY; using AsyncGetTypeWriterCallback = std::function< - void (eprosima::ProxyPool::smart_ptr&)>; + void (eprosima::fastrtps::rtps::WriterProxyData*)>; using AsyncGetTypeReaderCallback = std::function< - void (eprosima::ProxyPool::smart_ptr&)>; + void (eprosima::fastrtps::rtps::ReaderProxyData*)>; /** * Class TypeLookupManager that implements the TypeLookup Service described in the DDS-XTYPES 1.3 specification. @@ -211,7 +211,7 @@ class TypeLookupManager typename eprosima::ProxyPool::smart_ptr& temp_proxy_data, const AsyncCallback& callback, std::unordered_map::smart_ptr, + std::vector>>& async_get_type_callbacks); /** @@ -427,12 +427,12 @@ class TypeLookupManager //! Collection of all the WriterProxyData and their callbacks related to a TypeIdentfierWithSize, hashed by its TypeIdentfierWithSize. std::unordered_map < xtypes::TypeIdentfierWithSize, - std::vector::smart_ptr, + std::vector>> async_get_type_writer_callbacks_; //! Collection of all the ReaderProxyData and their callbacks related to a TypeIdentfierWithSize, hashed by its TypeIdentfierWithSize. std::unordered_map < xtypes::TypeIdentfierWithSize, - std::vector::smart_ptr, + std::vector>> async_get_type_reader_callbacks_; //! Collection of all SampleIdentity and the TypeIdentfierWithSize it originated from, hashed by its SampleIdentity. diff --git a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.cpp b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.cpp index 8823824099e..327fdc5cefa 100644 --- a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.cpp +++ b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupReplyListener.cpp @@ -300,11 +300,11 @@ void TypeLookupReplyListener::onNewCacheChangeAdded( return; } - // Add reply to the processing queue - replies_queue_.push(ReplyWithServerGUID{reply, change->writerGUID}); { - // Notify processor std::unique_lock guard(replies_processor_cv_mutex_); + // Add reply to the processing queue + replies_queue_.push(ReplyWithServerGUID{reply, change->writerGUID}); + // Notify processor replies_processor_cv_.notify_all(); } } diff --git a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupRequestListener.cpp b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupRequestListener.cpp index b185ed4b4bb..80cf9001e64 100644 --- a/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupRequestListener.cpp +++ b/src/cpp/fastdds/builtin/type_lookup_service/TypeLookupRequestListener.cpp @@ -435,11 +435,11 @@ void TypeLookupRequestListener::onNewCacheChangeAdded( TypeLookup_Request request; if (typelookup_manager_->receive(*change, request)) { - // Add request to the processing queue - requests_queue_.push(request); { - // Notify processor std::unique_lock guard(request_processor_cv_mutex_); + // Add request to the processing queue + requests_queue_.push(request); + // Notify processor request_processor_cv_.notify_all(); } } diff --git a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp index 66feb870f72..4e7e2953492 100644 --- a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp @@ -86,7 +86,7 @@ void EDPBasePUBListener::add_writer_from_change( // Callback function to continue after typelookup is complete fastdds::dds::builtin::AsyncGetTypeWriterCallback after_typelookup_callback = [reader, change, edp, &network, writer_added_callback] - (eprosima::ProxyPool::smart_ptr& temp_writer_data) + (eprosima::fastrtps::rtps::WriterProxyData* temp_writer_data) { //LOAD INFORMATION IN DESTINATION WRITER PROXY DATA auto copy_data_fun = [&temp_writer_data, &network]( @@ -114,9 +114,6 @@ void EDPBasePUBListener::add_writer_from_change( WriterProxyData* writer_data = edp->mp_PDP->addWriterProxyData(temp_writer_data->guid(), participant_guid, copy_data_fun); - // release temporary proxy - temp_writer_data.reset(); - if (writer_data != nullptr) { edp->pairing_writer_proxy_with_any_local_reader(participant_guid, writer_data); @@ -148,8 +145,11 @@ void EDPBasePUBListener::add_writer_from_change( else { EPROSIMA_LOG_INFO(RTPS_EDP, "EDPBasePUBListener: No TypeInformation. Trying fallback mechanism"); - after_typelookup_callback(temp_writer_data); + after_typelookup_callback(temp_writer_data.get()); } + // Release temporary proxy + temp_writer_data.reset(); + // Take the reader lock again if needed. reader->getMutex().lock(); @@ -226,8 +226,9 @@ void EDPBaseSUBListener::add_reader_from_change( // Callback function to continue after typelookup is complete fastdds::dds::builtin::AsyncGetTypeReaderCallback after_typelookup_callback = [reader, change, edp, &network, reader_added_callback] - (eprosima::ProxyPool::smart_ptr& temp_reader_data) + (eprosima::fastrtps::rtps::ReaderProxyData* temp_reader_data) { + //LOAD INFORMATION IN DESTINATION READER PROXY DATA auto copy_data_fun = [&temp_reader_data, &network]( ReaderProxyData* data, bool updating, @@ -254,9 +255,6 @@ void EDPBaseSUBListener::add_reader_from_change( ReaderProxyData* reader_data = edp->mp_PDP->addReaderProxyData(temp_reader_data->guid(), participant_guid, copy_data_fun); - // Release the temporary proxy - temp_reader_data.reset(); - if (reader_data != nullptr) //ADDED NEW DATA { edp->pairing_reader_proxy_with_any_local_writer(participant_guid, reader_data); @@ -288,8 +286,10 @@ void EDPBaseSUBListener::add_reader_from_change( else { EPROSIMA_LOG_INFO(RTPS_EDP, "EDPBasePUBListener: No TypeInformation. Trying fallback mechanism"); - after_typelookup_callback(temp_reader_data); + after_typelookup_callback(temp_reader_data.get()); } + // Release the temporary proxy + temp_reader_data.reset(); // Take the reader lock again if needed. reader->getMutex().lock();