From 54f82cb919c4b43afc6b0e8ef768e06f3e41f27e Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Fri, 27 Oct 2023 13:47:27 +0800 Subject: [PATCH] Maintain the future queue of the request by timeout Signed-off-by: Barry Xu --- .../player_service_client.hpp | 31 +++++++- .../src/rosbag2_transport/player.cpp | 8 +- .../player_service_client.cpp | 76 ++++++++++++++++++- 3 files changed, 109 insertions(+), 6 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/player_service_client.hpp b/rosbag2_transport/include/rosbag2_transport/player_service_client.hpp index f25fea6e5d..e6a20d2bd4 100644 --- a/rosbag2_transport/include/rosbag2_transport/player_service_client.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player_service_client.hpp @@ -15,14 +15,20 @@ #ifndef ROSBAG2_TRANSPORT__PLAYER_SERVICE_CLIENT_HPP_ #define ROSBAG2_TRANSPORT__PLAYER_SERVICE_CLIENT_HPP_ +#include +#include #include +#include #include +#include "rclcpp/generic_client.hpp" #include "rclcpp/rclcpp.hpp" namespace rosbag2_transport { +class PlayerServiceClientManager; + class PlayerServiceClient final { public: @@ -30,7 +36,8 @@ class PlayerServiceClient final std::shared_ptr cli, std::string service_name, const std::string & service_event_type, - const rclcpp::Logger logger); + const rclcpp::Logger logger, + std::shared_ptr player_service_client_manager); // Can call this function if check_include_request_message() return true void async_send_request(const rclcpp::SerializedMessage & message); @@ -47,6 +54,7 @@ class PlayerServiceClient final std::shared_ptr client_; std::string service_name_; const rclcpp::Logger logger_; + std::shared_ptr player_service_client_manager_; enum class introspection_type { UNKNOW = 0, @@ -65,6 +73,27 @@ class PlayerServiceClient final uint8_t get_msg_event_type(const rclcpp::SerializedMessage & message); }; +class PlayerServiceClientManager final +{ +public: + PlayerServiceClientManager( + size_t request_future_timeout, size_t maximum_request_future_queue = 30); + + // Timeout future will be discarded and check queue. + bool request_future_queue_is_full(); + + bool register_request_future(rclcpp::GenericClient::FutureAndRequestId & request_future); + +private: + using time_point = std::chrono::steady_clock::time_point; + using ptr_future_and_request_id = std::unique_ptr; + std::map request_futures_list_; + std::mutex request_futures_list_lock_; + + std::chrono::seconds request_future_timeout_; + size_t maximum_request_future_queue_; +}; + } // namespace rosbag2_transport #endif // ROSBAG2_TRANSPORT__PLAYER_SERVICE_CLIENT_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index b920c25e5a..0ebddf8021 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -327,6 +327,8 @@ class PlayerImpl // defaults std::shared_ptr keyboard_handler_; std::vector keyboard_callbacks_; + + std::shared_ptr player_service_client_manager_; }; PlayerImpl::PlayerImpl( @@ -338,7 +340,8 @@ PlayerImpl::PlayerImpl( : owner_(owner), storage_options_(storage_options), play_options_(play_options), - keyboard_handler_(std::move(keyboard_handler)) + keyboard_handler_(std::move(keyboard_handler)), + player_service_client_manager_(std::make_shared(5 * 60)) { { std::lock_guard lk(reader_mutex_); @@ -931,7 +934,8 @@ void PlayerImpl::prepare_publishers() try { auto cli = owner_->create_generic_client(service_name, service_type); auto player_cli = std::make_shared( - std::move(cli), service_name, topic.type, owner_->get_logger()); + std::move(cli), service_name, topic.type, owner_->get_logger(), + player_service_client_manager_); senders_.insert(std::make_pair(topic.name, player_cli)); } catch (const std::runtime_error & e) { RCLCPP_WARN( diff --git a/rosbag2_transport/src/rosbag2_transport/player_service_client.cpp b/rosbag2_transport/src/rosbag2_transport/player_service_client.cpp index 230bf6b84e..ac1d4564d3 100644 --- a/rosbag2_transport/src/rosbag2_transport/player_service_client.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player_service_client.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rosbag2_transport/player_service_client.hpp" #include "rclcpp/logger.hpp" @@ -30,10 +32,12 @@ PlayerServiceClient::PlayerServiceClient( std::shared_ptr cli, std::string service_name, const std::string & service_event_type, - const rclcpp::Logger logger) + const rclcpp::Logger logger, + std::shared_ptr player_service_client_manager) : client_(std::move(cli)), service_name_(std::move(service_name)), - logger_(logger) + logger_(logger), + player_service_client_manager_(player_service_client_manager) { ts_lib_ = rclcpp::get_typesupport_library( service_event_type, "rosidl_typesupport_cpp"); @@ -128,8 +132,9 @@ void PlayerServiceClient::async_send_request(const rclcpp::SerializedMessage & m // members_[0]: info, members_[1]: request, members_[2]: response auto request_offset = message_members_->members_[1].offset_; auto request_addr = reinterpret_cast(ros_message.get()) + request_offset; - client_->async_send_request( + auto future_and_request_id = client_->async_send_request( reinterpret_cast(*reinterpret_cast(request_addr))); + player_service_client_manager_->register_request_future(future_and_request_id); } else { RCLCPP_ERROR( logger_, "Service request hasn't been sent. The '%s' service isn't ready !", @@ -170,4 +175,69 @@ uint8_t PlayerServiceClient::get_msg_event_type( return msg.event_type; } +PlayerServiceClientManager::PlayerServiceClientManager( + size_t requst_future_timeout, + size_t maximum_request_future_queue) +: request_future_timeout_(std::chrono::seconds(requst_future_timeout)), + maximum_request_future_queue_(maximum_request_future_queue) +{ +} + +bool PlayerServiceClientManager::request_future_queue_is_full() +{ + std::lock_guard lock(request_futures_list_lock_); + + // Remove complete request future + std::vector remove_keys; + for (auto & request_future : request_futures_list_) { + if (request_future.second->wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + remove_keys.emplace_back(request_future.first); + } + } + for (auto & key : remove_keys) { + request_futures_list_.erase(key); + } + + // Remove all timeout request future + auto current_time = std::chrono::steady_clock::now(); + time_point time_threshold; + bool remove_action = false; + for (auto rit = request_futures_list_.rbegin(); rit != request_futures_list_.rend(); ++rit) { + if ((current_time - rit->first) >= request_future_timeout_) { + time_threshold = rit->first; + remove_action = true; + } + } + + if (remove_action) { + while (true) { + auto begin = request_futures_list_.begin(); + request_futures_list_.erase(begin); + if (begin->first == time_threshold) { + break; + } + } + } + + if (request_futures_list_.size() == maximum_request_future_queue_) { + return true; + } + + return false; +} + +bool PlayerServiceClientManager::register_request_future( + rclcpp::GenericClient::FutureAndRequestId & request_future) +{ + auto future_and_request_id = + std::make_unique(std::move(request_future)); + + if (!request_future_queue_is_full()) { + request_futures_list_[std::chrono::steady_clock::now()] = std::move(future_and_request_id); + return true; + } + + return false; +} + } // namespace rosbag2_transport