Skip to content

Commit

Permalink
Implement service play
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Nov 14, 2023
1 parent 6a39627 commit 230dfb5
Show file tree
Hide file tree
Showing 22 changed files with 1,081 additions and 101 deletions.
11 changes: 9 additions & 2 deletions ros2bag/ros2bag/verb/burst.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ros2bag.api import add_standard_reader_args
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_positive_float
from ros2bag.api import convert_service_to_service_event_topic
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
from ros2bag.verb import VerbExtension
Expand All @@ -41,8 +42,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'delay of message playback.')
parser.add_argument(
'--topics', type=str, default=[], nargs='+',
help='topics to replay, separated by space. If none specified, all topics will be '
'replayed.')
help='topics to replay, separated by space. At least one topic needs to be '
"specified. If this parameter isn\'t specified, all topics will be replayed.")
parser.add_argument(
'--services', type=str, default=[], nargs='+',
help='services to replay, separated by space. At least one service needs to be '
"specified. If this parameter isn\'t specified, all services will be replayed.")
parser.add_argument(
'--qos-profile-overrides-path', type=FileType('r'),
help='Path to a yaml file defining overrides of the QoS profile for specific topics.')
Expand Down Expand Up @@ -90,6 +95,8 @@ def main(self, *, args): # noqa: D102
play_options.node_prefix = NODE_NAME_PREFIX
play_options.rate = 1.0
play_options.topics_to_filter = args.topics
# Convert service name to service event topic name
play_options.services_to_filter = convert_service_to_service_event_topic(args.services)
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = False
play_options.topic_remapping_options = topic_remapping
Expand Down
40 changes: 36 additions & 4 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
from ros2bag.api import add_standard_reader_args
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_positive_float
from ros2bag.api import convert_service_to_service_event_topic
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
from ros2bag.api import print_warn
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import Player
Expand Down Expand Up @@ -51,14 +53,25 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'--topics', type=str, default=[], nargs='+',
help='Space-delimited list of topics to play.')
parser.add_argument(
'--services', type=str, default=[], nargs='+',
help='Space-delimited list of services to play.')
parser.add_argument(
'-e', '--regex', default='',
help='filter topics by regular expression to replay, separated by space. If none '
'specified, all topics will be replayed.')
help='Play only topics and services containing provided regular expression.')
parser.add_argument(
'-x', '--exclude', default='',
help='regular expressions to exclude topics from replay, separated by space. If none '
'specified, all topics will be replayed. This argument is deprecated and please '
'use --exclude-topics.')
parser.add_argument(
'--exclude-topics', default='',
help='regular expressions to exclude topics from replay, separated by space. If none '
'specified, all topics will be replayed.')
parser.add_argument(
'--exclude-services', default='',
help='regular expressions to exclude services from replay, separated by space. If '
'none specified, all services will be replayed.')
parser.add_argument(
'--qos-profile-overrides-path', type=FileType('r'),
help='Path to a yaml file defining overrides of the QoS profile for specific topics.')
Expand Down Expand Up @@ -163,6 +176,10 @@ def main(self, *, args): # noqa: D102
except (InvalidQoSProfileException, ValueError) as e:
return print_error(str(e))

if args.exclude and args.exclude_topics:
return print_error(str('-x/--exclude and --exclude_topics cannot be used at the '
'same time.'))

storage_config_file = ''
if args.storage_config_file:
storage_config_file = args.storage_config_file.name
Expand All @@ -182,8 +199,23 @@ def main(self, *, args): # noqa: D102
play_options.node_prefix = NODE_NAME_PREFIX
play_options.rate = args.rate
play_options.topics_to_filter = args.topics
play_options.topics_regex_to_filter = args.regex
play_options.topics_regex_to_exclude = args.exclude

# Convert service name to service event topic name
play_options.services_to_filter = convert_service_to_service_event_topic(args.services)

play_options.regex_to_filter = args.regex

if args.exclude:
print(print_warn(str('-x/--exclude argument is deprecated. Please use '
'--exclude-topics.')))
play_options.topics_regex_to_exclude = args.exclude
else:
play_options.topics_regex_to_exclude = args.exclude_topics

if args.exclude_services:
play_options.services_regex_to_exclude = args.exclude_services + '/_service_event'
else:
play_options.services_regex_to_exclude = args.exclude_services
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = args.loop
play_options.topic_remapping_options = topic_remapping
Expand Down
17 changes: 12 additions & 5 deletions rosbag2_py/src/rosbag2_py/_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,22 @@ PYBIND11_MODULE(_storage, m) {

pybind11::class_<rosbag2_storage::StorageFilter>(m, "StorageFilter")
.def(
pybind11::init<std::vector<std::string>, std::string, std::string>(),
pybind11::init<
std::vector<std::string>, std::vector<std::string>, std::string, std::string, std::string>(),
pybind11::arg("topics") = std::vector<std::string>(),
pybind11::arg("topics_regex") = "",
pybind11::arg("topics_regex_to_exclude") = "")
pybind11::arg("services") = std::vector<std::string>(),
pybind11::arg("regex") = "",
pybind11::arg("topics_regex_to_exclude") = "",
pybind11::arg("services_regex_to_exclude") = "")
.def_readwrite("topics", &rosbag2_storage::StorageFilter::topics)
.def_readwrite("topics_regex", &rosbag2_storage::StorageFilter::topics_regex)
.def_readwrite("services", &rosbag2_storage::StorageFilter::services)
.def_readwrite("regex", &rosbag2_storage::StorageFilter::regex)
.def_readwrite(
"topics_regex_to_exclude",
&rosbag2_storage::StorageFilter::topics_regex_to_exclude);
&rosbag2_storage::StorageFilter::topics_regex_to_exclude)
.def_readwrite(
"services_regex_to_exclude",
&rosbag2_storage::StorageFilter::services_regex_to_exclude);

pybind11::class_<rosbag2_storage::MessageDefinition>(m, "MessageDefinition")
.def(
Expand Down
4 changes: 3 additions & 1 deletion rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,10 @@ PYBIND11_MODULE(_transport, m) {
.def_readwrite("node_prefix", &PlayOptions::node_prefix)
.def_readwrite("rate", &PlayOptions::rate)
.def_readwrite("topics_to_filter", &PlayOptions::topics_to_filter)
.def_readwrite("topics_regex_to_filter", &PlayOptions::topics_regex_to_filter)
.def_readwrite("services_to_filter", &PlayOptions::services_to_filter)
.def_readwrite("regex_to_filter", &PlayOptions::regex_to_filter)
.def_readwrite("topics_regex_to_exclude", &PlayOptions::topics_regex_to_exclude)
.def_readwrite("services_regex_to_exclude", &PlayOptions::services_regex_to_exclude)
.def_property(
"topic_qos_profile_overrides",
&PlayOptions::getTopicQoSProfileOverrides,
Expand Down
20 changes: 15 additions & 5 deletions rosbag2_storage/include/rosbag2_storage/storage_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,28 @@ struct StorageFilter
{
// Topic names to whitelist when reading a bag. Only messages matching these
// specified topics will be returned. If list is empty, the filter is ignored
// and all messages are returned.
// and all messages of topics are returned.
std::vector<std::string> topics;

// Regular expression of topic names to whitelist when playing a bag.
// Only messages matching these specified topics will be played.
// Service names to whitelist when reading a bag. Only messages matching these
// specified service will be returned. If list is empty, the filter is ignored
// and all messages of services are returned.
std::vector<std::string> services;

// Regular expression of topic names and service name to whitelist when playing a bag.
// Only messages matching these specified topics or services will be played.
// If list is empty, the filter is ignored and all messages are played.
std::string topics_regex = "";
std::string regex = "";

// Regular expression of topic names to exclude when playing a bag.
// Only messages not matching these specified topics will be played.
// If list is empty, the filter is ignored and all messages are played.
// If list is empty, the filter is ignored and all messages of topics are played.
std::string topics_regex_to_exclude = "";

// Regular expression of topic names to exclude when playing a bag.
// Only messages not matching these specified services will be played.
// If list is empty, the filter is ignored and all messages of services are played.
std::string services_regex_to_exclude = "";
};

} // namespace rosbag2_storage
Expand Down
61 changes: 49 additions & 12 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,26 +539,63 @@ void MCAPStorage::reset_iterator()
options.endTime = mcap::MaxTime;
}
options.readOrder = read_order_;
if (!storage_filter_.topics.empty()) {
options.topicFilter = [this](std::string_view topic) {

auto filter_topic = [this](std::string_view topic) {
if (!storage_filter_.topics.empty()) {
for (const auto & match_topic : storage_filter_.topics) {
if (match_topic == topic) {
return true;
}
}
return false;
};
}
}

if (!storage_filter_.services.empty()) {
for (const auto & match_service : storage_filter_.services) {
if (match_service == topic) {
return true;
}
}
}

bool topics_regex_to_exclude_match = false;
bool services_regex_to_exclude_match = false;
std::string topic_string(topic);

if (!storage_filter_.topics_regex_to_exclude.empty()) {
std::smatch m;
std::regex re(storage_filter_.topics_regex_to_exclude);
topics_regex_to_exclude_match = std::regex_match(topic_string, m, re);
}

if (!storage_filter_.services_regex_to_exclude.empty()) {
std::smatch m;
std::regex re(storage_filter_.services_regex_to_exclude);
services_regex_to_exclude_match = std::regex_match(topic_string, m, re);
}

#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_FILTER_TOPIC_REGEX
if (!storage_filter_.topics_regex.empty()) {
options.topicFilter = [this](std::string_view topic) {
if (!storage_filter_.regex.empty()) {
std::smatch m;
std::string topic_string(topic);
std::regex re(storage_filter_.topics_regex);
return std::regex_match(topic_string, m, re);
};
}
std::regex re(storage_filter_.regex);

if (std::regex_match(topic_string, m, re) && !topics_regex_to_exclude_match &&
!services_regex_to_exclude_match) {
return true;
} else {
return false;
}
}
#endif

if ((storage_filter_.topics.empty() && !topics_regex_to_exclude_match) &&
(storage_filter_.services.empty() && !services_regex_to_exclude_match)) {
return true;
}

return false;
};
options.topicFilter = filter_topic;

linear_view_ =
std::make_unique<mcap::LinearMessageView>(mcap_reader_->readMessages(OnProblem, options));
linear_iterator_ = std::make_unique<mcap::LinearMessageView::Iterator>(linear_view_->begin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ void SqliteStorage::prepare_for_reading()
"FROM messages JOIN topics ON messages.topic_id = topics.id WHERE ";
std::vector<std::string> where_conditions;

std::string topic_and_service_list;
// add topic filter
if (!storage_filter_.topics.empty()) {
// Construct string for selected topics
Expand All @@ -531,13 +532,38 @@ void SqliteStorage::prepare_for_reading()
topic_list += ",";
}
}
where_conditions.push_back("(topics.name IN (" + topic_list + "))");
topic_and_service_list = "(topics.name IN (" + topic_list + "))";
}
// add topic filter based on regular expression
if (!storage_filter_.topics_regex.empty()) {

// add service filter
if (!storage_filter_.services.empty()) {
// Construct string for selected topics
where_conditions.push_back("(topics.name REGEXP '" + storage_filter_.topics_regex + "')");
std::string service_list{""};
for (auto & service : storage_filter_.services) {
service_list += "'" + service + "'";
if (&service != &storage_filter_.topics.back()) {
service_list += ",";
}
}

if (!topic_and_service_list.empty()) {
topic_and_service_list.append(" OR ");
}
topic_and_service_list.append("(topics.name IN (" + service_list + "))");
}

std::string list_and_regex = topic_and_service_list;
// add topic filter based on regular expression
if (!storage_filter_.regex.empty()) {
std::string regex = "(topics.name REGEXP '" + storage_filter_.regex + "')";
list_and_regex = list_and_regex + std::string(!list_and_regex.empty() ? " OR " : "") + regex;
}

if (!list_and_regex.empty()) {
where_conditions.push_back(list_and_regex);
}

std::string exclude_topics_services;
// exclude topics based on regular expressions
if (!storage_filter_.topics_regex_to_exclude.empty()) {
// Construct string for selected topics
Expand All @@ -546,6 +572,14 @@ void SqliteStorage::prepare_for_reading()
"(SELECT topics.name FROM topics WHERE topics.name REGEXP '" +
storage_filter_.topics_regex_to_exclude + "'))");
}
// exclude service based on regular expressions
if (!storage_filter_.services_regex_to_exclude.empty()) {
// Construct string for selected topics
where_conditions.push_back(
"(topics.name NOT IN "
"(SELECT topics.name FROM topics WHERE topics.name REGEXP '" +
storage_filter_.services_regex_to_exclude + "'))");
}

const std::string direction_op = read_order_.reverse ? "<" : ">";
const std::string order_direction = read_order_.reverse ? "DESC" : "ASC";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ TEST_F(StorageTestFixture, read_next_returns_filtered_messages_regex) {
readable_storage->open({db_filename, kPluginID});

rosbag2_storage::StorageFilter storage_filter;
storage_filter.topics_regex = "topic.*";
storage_filter.regex = "topic.*";
readable_storage->set_filter(storage_filter);

EXPECT_TRUE(readable_storage->has_next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
#ifndef ROSBAG2_TEST_COMMON__CLIENT_MANAGER_HPP_
#define ROSBAG2_TEST_COMMON__CLIENT_MANAGER_HPP_

#include <chrono>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "rcl/service_introspection.h"

#include "rclcpp/rclcpp.hpp" // rclcpp must be included before the Windows specific includes.


namespace rosbag2_test_common
{
template<typename ServiceT>
Expand All @@ -32,14 +33,14 @@ class ClientManager : public rclcpp::Node
public:
explicit ClientManager(
std::string service_name,
size_t client_number = 1,
size_t number_of_clients = 1,
bool service_event_contents = false,
bool client_event_contents = true)
: Node("service_client_manager_" + std::to_string(rclcpp::Clock().now().nanoseconds()),
rclcpp::NodeOptions().start_parameter_services(false).start_parameter_event_publisher(
false).enable_rosout(false)),
service_name_(service_name),
client_number_(client_number),
service_name_(std::move(service_name)),
number_of_clients_(number_of_clients),
enable_service_event_contents_(service_event_contents),
enable_client_event_contents_(client_event_contents)
{
Expand Down Expand Up @@ -71,7 +72,7 @@ class ClientManager : public rclcpp::Node
introspection_state = RCL_SERVICE_INTROSPECTION_OFF;
}

for (size_t i = 0; i < client_number_; i++) {
for (size_t i = 0; i < number_of_clients_; i++) {
auto client = create_client<ServiceT>(service_name_);
client->configure_introspection(
get_clock(), rclcpp::SystemDefaultsQoS(), introspection_state);
Expand Down Expand Up @@ -113,7 +114,7 @@ class ClientManager : public rclcpp::Node
exec_, get_node_base_interface(), result, timeout) != rclcpp::FutureReturnCode::SUCCESS)
{
RCLCPP_INFO(
rclcpp::get_logger("service_client_manager"), "Failed to get response !");
this->get_logger(), "Failed to get response !");
return false;
}
}
Expand All @@ -127,7 +128,7 @@ class ClientManager : public rclcpp::Node
typename rclcpp::Service<ServiceT>::SharedPtr service_;
std::vector<client_shared_ptr> clients_;
const std::string service_name_;
size_t client_number_;
size_t number_of_clients_;
bool enable_service_event_contents_;
bool enable_client_event_contents_;
};
Expand Down
Loading

0 comments on commit 230dfb5

Please sign in to comment.