From 5cb4d2166d1768872216dd82b9706791006d33c1 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 11 Oct 2023 19:43:29 +0000 Subject: [PATCH 1/6] Support users holding onto shared pointers in the message memory pool Before this commit, the MessageMemoryPool would actually reuse messages in the pool, even if the user had taken additional shared_ptr copies. This commit fixes things so that we properly handle that situation. In particular, we only return shared_ptrs to the free pool once their use_count has dropped to 1 (meaning only this class still has a reference to it). We also add in locks, since in a multi-threaded scenario we need to protect against multiple threads accessing the pool at the same time. With this in place, things work as expected when users hold shared_ptr copies. We also add in a test for this situation. One note about performance: this update preserves the "no-allocations-at-runtime" aspect of the MessagePool. However, there are some tradeoffs with CPU time here, particularly with very large message pools. This could probably be optimized further to do less work when trying to add items back to the free_list, but I view that as a further enhancement. Signed-off-by: Chris Lalancette --- .../message_pool_memory_strategy.hpp | 98 +++++++++++++++---- .../test_message_pool_memory_strategy.cpp | 30 ++++-- 2 files changed, 97 insertions(+), 31 deletions(-) diff --git a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp index 0e7d4366e5..82f9cbc6f0 100644 --- a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp @@ -15,7 +15,11 @@ #ifndef RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_ #define RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_ +#include #include +#include +#include +#include #include "rosidl_runtime_cpp/traits.hpp" @@ -52,11 +56,10 @@ class MessagePoolMemoryStrategy /// Default constructor MessagePoolMemoryStrategy() - : next_array_index_(0) { for (size_t i = 0; i < Size; ++i) { - pool_[i].msg_ptr_ = std::make_shared(); - pool_[i].used = false; + pool_[i] = std::make_shared(); + free_list_.push_back(i); } } @@ -68,16 +71,25 @@ class MessagePoolMemoryStrategy */ std::shared_ptr borrow_message() { - size_t current_index = next_array_index_; - next_array_index_ = (next_array_index_ + 1) % Size; - if (pool_[current_index].used) { - throw std::runtime_error("Tried to access message that was still in use! Abort."); + std::lock_guard lock(pool_mutex_); + if (free_list_.size() == 0) { + for (size_t i = 0; i < Size; ++i) { + if (pool_[i].use_count() == 1) { + free_list_.push_back(i); + break; + } + } + if (free_list_.size() == 0) { + throw std::runtime_error("No more free slots in the pool!"); + } } - pool_[current_index].msg_ptr_->~MessageT(); - new (pool_[current_index].msg_ptr_.get())MessageT; - pool_[current_index].used = true; - return pool_[current_index].msg_ptr_; + size_t current_index = free_list_.pop_front(); + + pool_[current_index]->~MessageT(); + new (pool_[current_index].get())MessageT; + + return pool_[current_index]; } /// Return a message to the message pool. @@ -87,24 +99,68 @@ class MessagePoolMemoryStrategy */ void return_message(std::shared_ptr & msg) { - for (size_t i = 0; i < Size; ++i) { - if (pool_[i].msg_ptr_ == msg) { - pool_[i].used = false; - return; + (void)msg; + + // What we really want to do here is to figure out whether the user has taken an additional + // reference to the message, and only add it to the free list if that is *not* the case. + // However, we can't really do that for the currently passed-in msg; it can have an arbitrary + // reference count due to the mechanisms of rclcpp. Instead, we look at all the rest of the + // pointers, and add the ones that the user has released into the free pool. + // We do the same thing in borrow_message(), so if the user has a pool of size 1 + // (or only one free slot), we'll always find it. + + std::lock_guard lock(pool_mutex_); + if (free_list_.size() == 0) { + for (size_t i = 0; i < Size; ++i) { + if (pool_[i].use_count() == 1) { + free_list_.push_back(i); + } } } - throw std::runtime_error("Unrecognized message ptr in return_message."); } protected: - struct PoolMember + template + class CyclicSizeTArray { - std::shared_ptr msg_ptr_; - bool used; +public: + void push_back(const size_t v) + { + if (size_ + 1 > N) { + throw std::runtime_error("Tried to push too many items into the array"); + } + array_[(front_ + size_) % N] = v; + ++size_; + } + + size_t pop_front() + { + if (size_ < 1) { + throw std::runtime_error("Tried to pop item from empty array"); + } + + size_t val = array_[front_]; + + front_ = (front_ + 1) % N; + --size_; + + return val; + } + + size_t size() const + { + return size_; + } + +private: + size_t front_ = 0; + size_t size_ = 0; + std::array array_; }; - std::array pool_; - size_t next_array_index_; + std::mutex pool_mutex_; + std::array, Size> pool_; + CyclicSizeTArray free_list_; }; } // namespace message_pool_memory_strategy diff --git a/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp b/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp index b68bc96ab6..e615052509 100644 --- a/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp @@ -56,18 +56,28 @@ TEST_F(TestMessagePoolMemoryStrategy, borrow_too_many) { // Size is 1, borrowing second time should fail RCLCPP_EXPECT_THROW_EQ( message_memory_strategy_->borrow_message(), - std::runtime_error("Tried to access message that was still in use! Abort.")); + std::runtime_error("No more free slots in the pool!")); EXPECT_NO_THROW(message_memory_strategy_->return_message(message)); } -TEST_F(TestMessagePoolMemoryStrategy, return_unrecognized) { - auto message = message_memory_strategy_->borrow_message(); - ASSERT_NE(nullptr, message); +TEST_F(TestMessagePoolMemoryStrategy, borrow_hold_reference) { + { + auto message = message_memory_strategy_->borrow_message(); + ASSERT_NE(nullptr, message); - auto unrecognized = std::make_shared(); - // Unrecognized does not belong to pool - RCLCPP_EXPECT_THROW_EQ( - message_memory_strategy_->return_message(unrecognized), - std::runtime_error("Unrecognized message ptr in return_message.")); - EXPECT_NO_THROW(message_memory_strategy_->return_message(message)); + // Return it. + EXPECT_NO_THROW(message_memory_strategy_->return_message(message)); + + // But we are still holding the reference, so we expect that there is still no room in the pool. + RCLCPP_EXPECT_THROW_EQ( + message_memory_strategy_->borrow_message(), + std::runtime_error("No more free slots in the pool!")); + } + + // Now that we've dropped the reference (left the scope), we expect to be able to borrow again. + + auto message2 = message_memory_strategy_->borrow_message(); + ASSERT_NE(nullptr, message2); + + EXPECT_NO_THROW(message_memory_strategy_->return_message(message2)); } From 2fc8b5c18ea3e7e4d5a67efef08265031a078e2a Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 13 Oct 2023 12:32:42 +0000 Subject: [PATCH 2/6] Fixes from review. Signed-off-by: Chris Lalancette --- .../rclcpp/strategies/message_pool_memory_strategy.hpp | 2 +- .../rclcpp/strategies/test_message_pool_memory_strategy.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp index 82f9cbc6f0..e4cfbb2be0 100644 --- a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp @@ -80,7 +80,7 @@ class MessagePoolMemoryStrategy } } if (free_list_.size() == 0) { - throw std::runtime_error("No more free slots in the pool!"); + throw std::runtime_error("No more free slots in the pool"); } } diff --git a/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp b/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp index e615052509..69198adf59 100644 --- a/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp @@ -56,7 +56,7 @@ TEST_F(TestMessagePoolMemoryStrategy, borrow_too_many) { // Size is 1, borrowing second time should fail RCLCPP_EXPECT_THROW_EQ( message_memory_strategy_->borrow_message(), - std::runtime_error("No more free slots in the pool!")); + std::runtime_error("No more free slots in the pool")); EXPECT_NO_THROW(message_memory_strategy_->return_message(message)); } @@ -71,7 +71,7 @@ TEST_F(TestMessagePoolMemoryStrategy, borrow_hold_reference) { // But we are still holding the reference, so we expect that there is still no room in the pool. RCLCPP_EXPECT_THROW_EQ( message_memory_strategy_->borrow_message(), - std::runtime_error("No more free slots in the pool!")); + std::runtime_error("No more free slots in the pool")); } // Now that we've dropped the reference (left the scope), we expect to be able to borrow again. From e1af7ac54f22df9dcd42af43d67fe3ef7e300a1f Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 17 Oct 2023 20:48:59 +0000 Subject: [PATCH 3/6] Utilize shared_ptr as we should. Signed-off-by: Chris Lalancette --- .../message_pool_memory_strategy.hpp | 57 ++++++++----------- 1 file changed, 23 insertions(+), 34 deletions(-) diff --git a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp index e4cfbb2be0..6761ae0e3d 100644 --- a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp @@ -16,6 +16,7 @@ #define RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_ #include +#include #include #include #include @@ -58,11 +59,18 @@ class MessagePoolMemoryStrategy MessagePoolMemoryStrategy() { for (size_t i = 0; i < Size; ++i) { - pool_[i] = std::make_shared(); + pool_[i] = new MessageT; free_list_.push_back(i); } } + ~MessagePoolMemoryStrategy() + { + for (size_t i = 0; i < Size; ++i) { + delete pool_[i]; + } + } + /// Borrow a message from the message pool. /** * Manage the message pool ring buffer. @@ -73,23 +81,21 @@ class MessagePoolMemoryStrategy { std::lock_guard lock(pool_mutex_); if (free_list_.size() == 0) { - for (size_t i = 0; i < Size; ++i) { - if (pool_[i].use_count() == 1) { - free_list_.push_back(i); - break; - } - } - if (free_list_.size() == 0) { - throw std::runtime_error("No more free slots in the pool"); - } + throw std::runtime_error("No more free slots in the pool"); } size_t current_index = free_list_.pop_front(); - pool_[current_index]->~MessageT(); - new (pool_[current_index].get())MessageT; - - return pool_[current_index]; + return std::shared_ptr( + pool_[current_index], [this](MessageT * p) { + for (size_t i = 0; i < Size; ++i) { + if (pool_[i] == p) { + *p = {}; + free_list_.push_back(i); + break; + } + } + }); } /// Return a message to the message pool. @@ -100,28 +106,11 @@ class MessagePoolMemoryStrategy void return_message(std::shared_ptr & msg) { (void)msg; - - // What we really want to do here is to figure out whether the user has taken an additional - // reference to the message, and only add it to the free list if that is *not* the case. - // However, we can't really do that for the currently passed-in msg; it can have an arbitrary - // reference count due to the mechanisms of rclcpp. Instead, we look at all the rest of the - // pointers, and add the ones that the user has released into the free pool. - // We do the same thing in borrow_message(), so if the user has a pool of size 1 - // (or only one free slot), we'll always find it. - - std::lock_guard lock(pool_mutex_); - if (free_list_.size() == 0) { - for (size_t i = 0; i < Size; ++i) { - if (pool_[i].use_count() == 1) { - free_list_.push_back(i); - } - } - } } protected: template - class CyclicSizeTArray + class CircularArray { public: void push_back(const size_t v) @@ -159,8 +148,8 @@ class MessagePoolMemoryStrategy }; std::mutex pool_mutex_; - std::array, Size> pool_; - CyclicSizeTArray free_list_; + std::array pool_; + CircularArray free_list_; }; } // namespace message_pool_memory_strategy From 331fa3cd9ab32074931c636ce4c08a8baef8b303 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Sat, 21 Oct 2023 21:20:31 +0000 Subject: [PATCH 4/6] Make sure to destruct and reconstruct the messages. Signed-off-by: Chris Lalancette --- .../include/rclcpp/strategies/message_pool_memory_strategy.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp index 6761ae0e3d..27a389417d 100644 --- a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp @@ -90,7 +90,8 @@ class MessagePoolMemoryStrategy pool_[current_index], [this](MessageT * p) { for (size_t i = 0; i < Size; ++i) { if (pool_[i] == p) { - *p = {}; + p->~MessageT(); + new(p) MessageT(); free_list_.push_back(i); break; } From 3234210a6feb7a70021a3e220ed6a08f5b8d0617 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 2 Nov 2023 14:17:29 +0000 Subject: [PATCH 5/6] Only do destruction during the custom destructor. We allocate memory during class initialization, and delete it during destruction. We then run the constructor when we hand the pointer out, and the destructor (only) when we return it to the pool. This keeps things consistent. Signed-off-by: Chris Lalancette --- .../message_pool_memory_strategy.hpp | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp index 27a389417d..474c0a8d6f 100644 --- a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp @@ -24,6 +24,8 @@ #include "rosidl_runtime_cpp/traits.hpp" +#include "rclcpp/logger.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/visibility_control.hpp" @@ -59,15 +61,26 @@ class MessagePoolMemoryStrategy MessagePoolMemoryStrategy() { for (size_t i = 0; i < Size; ++i) { - pool_[i] = new MessageT; + pool_[i] = static_cast(malloc(sizeof(MessageT))); free_list_.push_back(i); } } ~MessagePoolMemoryStrategy() { - for (size_t i = 0; i < Size; ++i) { - delete pool_[i]; + // The user may have held onto shared pointers after a borrow_message(). In that case, + // freeing the memory from the pool may lead to UB. If we detect the situation where this + // class is being destroyed before the shared pointers, warn the user. + + if (free_list_.size() != Size) { + RCLCPP_WARN( + rclcpp::get_logger("MessagePool"), + "User code is holding onto shared pointers from the message pool; this will leak memory"); + } + + while (free_list_.size() != 0) { + size_t index = free_list_.pop_front(); + free(pool_[index]); } } @@ -87,11 +100,12 @@ class MessagePoolMemoryStrategy size_t current_index = free_list_.pop_front(); return std::shared_ptr( - pool_[current_index], [this](MessageT * p) { + new(pool_[current_index]) MessageT(), + [this](MessageT * p) { + std::lock_guard lock(pool_mutex_); for (size_t i = 0; i < Size; ++i) { if (pool_[i] == p) { p->~MessageT(); - new(p) MessageT(); free_list_.push_back(i); break; } From ec86e41b11a1f37d8215b937f536470530de63af Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 10 Nov 2023 00:33:45 +0000 Subject: [PATCH 6/6] Use shared_ptrs for the class variables. This ensures that the custom destructor can run even if the class has been destroyed. Signed-off-by: Chris Lalancette --- .../message_pool_memory_strategy.hpp | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp index 474c0a8d6f..703066fa3f 100644 --- a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp @@ -57,30 +57,24 @@ class MessagePoolMemoryStrategy public: RCLCPP_SMART_PTR_DEFINITIONS(MessagePoolMemoryStrategy) - /// Default constructor MessagePoolMemoryStrategy() { - for (size_t i = 0; i < Size; ++i) { - pool_[i] = static_cast(malloc(sizeof(MessageT))); - free_list_.push_back(i); - } - } + pool_mutex_ = std::make_shared(); - ~MessagePoolMemoryStrategy() - { - // The user may have held onto shared pointers after a borrow_message(). In that case, - // freeing the memory from the pool may lead to UB. If we detect the situation where this - // class is being destroyed before the shared pointers, warn the user. - - if (free_list_.size() != Size) { - RCLCPP_WARN( - rclcpp::get_logger("MessagePool"), - "User code is holding onto shared pointers from the message pool; this will leak memory"); - } + pool_ = std::shared_ptr>( + new std::array, + [](std::array * arr) { + for (size_t i = 0; i < Size; ++i) { + free((*arr)[i]); + } + delete arr; + }); - while (free_list_.size() != 0) { - size_t index = free_list_.pop_front(); - free(pool_[index]); + free_list_ = std::make_shared>(); + + for (size_t i = 0; i < Size; ++i) { + (*pool_)[i] = static_cast(malloc(sizeof(MessageT))); + free_list_->push_back(i); } } @@ -92,21 +86,22 @@ class MessagePoolMemoryStrategy */ std::shared_ptr borrow_message() { - std::lock_guard lock(pool_mutex_); - if (free_list_.size() == 0) { + std::lock_guard lock(*pool_mutex_); + if (free_list_->size() == 0) { throw std::runtime_error("No more free slots in the pool"); } - size_t current_index = free_list_.pop_front(); + size_t current_index = free_list_->pop_front(); return std::shared_ptr( - new(pool_[current_index]) MessageT(), - [this](MessageT * p) { - std::lock_guard lock(pool_mutex_); + new((*pool_)[current_index]) MessageT(), + [pool = this->pool_, pool_mutex = this->pool_mutex_, + free_list = this->free_list_](MessageT * p) { + std::lock_guard lock(*pool_mutex); for (size_t i = 0; i < Size; ++i) { - if (pool_[i] == p) { + if ((*pool)[i] == p) { p->~MessageT(); - free_list_.push_back(i); + free_list->push_back(i); break; } } @@ -115,7 +110,8 @@ class MessagePoolMemoryStrategy /// Return a message to the message pool. /** - * Manage metadata in the message pool ring buffer to release the message. + * This does nothing since the message isn't returned to the pool until the user has dropped + * all references. * \param[in] msg Shared pointer to the message to return. */ void return_message(std::shared_ptr & msg) @@ -162,9 +158,13 @@ class MessagePoolMemoryStrategy std::array array_; }; - std::mutex pool_mutex_; - std::array pool_; - CircularArray free_list_; + // It's very important that these are shared_ptrs, since users of this class might hold a + // reference to a pool item longer than the lifetime of the class. In that scenario, the + // shared_ptr ensures that the lifetime of these variables outlives this class, and hence ensures + // the custom destructor for each pool item can successfully run. + std::shared_ptr pool_mutex_; + std::shared_ptr> pool_; + std::shared_ptr> free_list_; }; } // namespace message_pool_memory_strategy