Skip to content

Commit

Permalink
Maintain the future queue of the request by timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Nov 14, 2023
1 parent 230dfb5 commit 6ee96bc
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,29 @@
#ifndef ROSBAG2_TRANSPORT__PLAYER_SERVICE_CLIENT_HPP_
#define ROSBAG2_TRANSPORT__PLAYER_SERVICE_CLIENT_HPP_

#include <chrono>
#include <map>
#include <memory>
#include <mutex>
#include <string>

#include "rclcpp/generic_client.hpp"
#include "rclcpp/rclcpp.hpp"

namespace rosbag2_transport
{

class PlayerServiceClientManager;

class PlayerServiceClient final
{
public:
explicit PlayerServiceClient(
std::shared_ptr<rclcpp::GenericClient> cli,
std::string service_name,
const std::string & service_event_type,
const rclcpp::Logger logger);
const rclcpp::Logger logger,
std::shared_ptr<PlayerServiceClientManager> player_service_client_manager);

// Can call this function if check_include_request_message() return true
void async_send_request(const rclcpp::SerializedMessage & message);
Expand All @@ -47,6 +54,7 @@ class PlayerServiceClient final
std::shared_ptr<rclcpp::GenericClient> client_;
std::string service_name_;
const rclcpp::Logger logger_;
std::shared_ptr<PlayerServiceClientManager> player_service_client_manager_;
enum class introspection_type
{
UNKNOW = 0,
Expand All @@ -65,6 +73,27 @@ class PlayerServiceClient final
uint8_t get_msg_event_type(const rclcpp::SerializedMessage & message);
};

class PlayerServiceClientManager final
{
public:
PlayerServiceClientManager(
size_t request_future_timeout, size_t maximum_request_future_queue = 30);

// Timeout future will be discarded and check queue.
bool request_future_queue_is_full();

bool register_request_future(rclcpp::GenericClient::FutureAndRequestId & request_future);

private:
using time_point = std::chrono::steady_clock::time_point;
using ptr_future_and_request_id = std::unique_ptr<rclcpp::GenericClient::FutureAndRequestId>;
std::map<time_point, ptr_future_and_request_id> request_futures_list_;
std::mutex request_futures_list_lock_;

std::chrono::seconds request_future_timeout_;
size_t maximum_request_future_queue_;
};

} // namespace rosbag2_transport

#endif // ROSBAG2_TRANSPORT__PLAYER_SERVICE_CLIENT_HPP_
8 changes: 6 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ class PlayerImpl
// defaults
std::shared_ptr<KeyboardHandler> keyboard_handler_;
std::vector<KeyboardHandler::callback_handle_t> keyboard_callbacks_;

std::shared_ptr<PlayerServiceClientManager> player_service_client_manager_;
};

PlayerImpl::PlayerImpl(
Expand All @@ -338,7 +340,8 @@ PlayerImpl::PlayerImpl(
: owner_(owner),
storage_options_(storage_options),
play_options_(play_options),
keyboard_handler_(std::move(keyboard_handler))
keyboard_handler_(std::move(keyboard_handler)),
player_service_client_manager_(std::make_shared<PlayerServiceClientManager>(5 * 60))
{
{
std::lock_guard<std::mutex> lk(reader_mutex_);
Expand Down Expand Up @@ -931,7 +934,8 @@ void PlayerImpl::prepare_publishers()
try {
auto cli = owner_->create_generic_client(service_name, service_type);
auto player_cli = std::make_shared<PlayerServiceClient>(
std::move(cli), service_name, topic.type, owner_->get_logger());
std::move(cli), service_name, topic.type, owner_->get_logger(),
player_service_client_manager_);
senders_.insert(std::make_pair(topic.name, player_cli));
} catch (const std::runtime_error & e) {
RCLCPP_WARN(
Expand Down
76 changes: 73 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/player_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <memory>

#include "rosbag2_transport/player_service_client.hpp"

#include "rclcpp/logger.hpp"
Expand All @@ -30,10 +32,12 @@ PlayerServiceClient::PlayerServiceClient(
std::shared_ptr<rclcpp::GenericClient> cli,
std::string service_name,
const std::string & service_event_type,
const rclcpp::Logger logger)
const rclcpp::Logger logger,
std::shared_ptr<PlayerServiceClientManager> player_service_client_manager)
: client_(std::move(cli)),
service_name_(std::move(service_name)),
logger_(logger)
logger_(logger),
player_service_client_manager_(player_service_client_manager)
{
ts_lib_ = rclcpp::get_typesupport_library(
service_event_type, "rosidl_typesupport_cpp");
Expand Down Expand Up @@ -128,8 +132,9 @@ void PlayerServiceClient::async_send_request(const rclcpp::SerializedMessage & m
// members_[0]: info, members_[1]: request, members_[2]: response
auto request_offset = message_members_->members_[1].offset_;
auto request_addr = reinterpret_cast<size_t>(ros_message.get()) + request_offset;
client_->async_send_request(
auto future_and_request_id = client_->async_send_request(
reinterpret_cast<void *>(*reinterpret_cast<size_t *>(request_addr)));
player_service_client_manager_->register_request_future(future_and_request_id);
} else {
RCLCPP_ERROR(
logger_, "Service request hasn't been sent. The '%s' service isn't ready !",
Expand Down Expand Up @@ -170,4 +175,69 @@ uint8_t PlayerServiceClient::get_msg_event_type(
return msg.event_type;
}

PlayerServiceClientManager::PlayerServiceClientManager(
size_t requst_future_timeout,
size_t maximum_request_future_queue)
: request_future_timeout_(std::chrono::seconds(requst_future_timeout)),
maximum_request_future_queue_(maximum_request_future_queue)
{
}

bool PlayerServiceClientManager::request_future_queue_is_full()
{
std::lock_guard<std::mutex> lock(request_futures_list_lock_);

// Remove complete request future
std::vector<time_point> remove_keys;
for (auto & request_future : request_futures_list_) {
if (request_future.second->wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
remove_keys.emplace_back(request_future.first);
}
}
for (auto & key : remove_keys) {
request_futures_list_.erase(key);
}

// Remove all timeout request future
auto current_time = std::chrono::steady_clock::now();
time_point time_threshold;
bool remove_action = false;
for (auto rit = request_futures_list_.rbegin(); rit != request_futures_list_.rend(); ++rit) {
if ((current_time - rit->first) >= request_future_timeout_) {
time_threshold = rit->first;
remove_action = true;
}
}

if (remove_action) {
while (true) {
auto begin = request_futures_list_.begin();
request_futures_list_.erase(begin);
if (begin->first == time_threshold) {
break;
}
}
}

if (request_futures_list_.size() == maximum_request_future_queue_) {
return true;
}

return false;
}

bool PlayerServiceClientManager::register_request_future(
rclcpp::GenericClient::FutureAndRequestId & request_future)
{
auto future_and_request_id =
std::make_unique<rclcpp::GenericClient::FutureAndRequestId>(std::move(request_future));

if (!request_future_queue_is_full()) {
request_futures_list_[std::chrono::steady_clock::now()] = std::move(future_and_request_id);
return true;
}

return false;
}

} // namespace rosbag2_transport

0 comments on commit 6ee96bc

Please sign in to comment.