diff --git a/rosbag2_storage/include/rosbag2_storage/qos.hpp b/rosbag2_storage/include/rosbag2_storage/qos.hpp index baba001ce0..e24e6d7bbf 100644 --- a/rosbag2_storage/include/rosbag2_storage/qos.hpp +++ b/rosbag2_storage/include/rosbag2_storage/qos.hpp @@ -15,9 +15,9 @@ #ifndef ROSBAG2_STORAGE__QOS_HPP_ #define ROSBAG2_STORAGE__QOS_HPP_ -#include #include #include +#include #include "rclcpp/node_interfaces/node_graph_interface.hpp" #include "rclcpp/qos.hpp" @@ -179,11 +179,11 @@ struct ROSBAG2_STORAGE_PUBLIC convert> }; template<> -struct ROSBAG2_STORAGE_PUBLIC convert> +struct ROSBAG2_STORAGE_PUBLIC convert> { - static Node encode(const std::map & rhs); + static Node encode(const std::unordered_map & rhs); static bool decode( - const Node & node, std::map & rhs, int version = 9); + const Node & node, std::unordered_map & rhs, int version = 9); }; } // namespace YAML diff --git a/rosbag2_storage/include/rosbag2_storage/yaml.hpp b/rosbag2_storage/include/rosbag2_storage/yaml.hpp index a71a830cc4..a1f400f977 100644 --- a/rosbag2_storage/include/rosbag2_storage/yaml.hpp +++ b/rosbag2_storage/include/rosbag2_storage/yaml.hpp @@ -30,6 +30,7 @@ # pragma warning(pop) #endif +#include "rclcpp/duration.hpp" #include "rosbag2_storage/bag_metadata.hpp" #include "rosbag2_storage/qos.hpp" @@ -41,7 +42,7 @@ template void optional_assign(const Node & node, std::string field, T & assign_to) { if (node[field]) { - assign_to = node[field].as(); + YAML::convert::decode(node[field], assign_to); } } @@ -66,6 +67,41 @@ struct convert> } }; +template<> +struct convert +{ + static Node encode(const rclcpp::Duration & duration) + { + Node node; + node["sec"] = duration.nanoseconds() / 1000000000; + node["nsec"] = duration.nanoseconds() % 1000000000; + return node; + } + + static bool decode(const Node & node, rclcpp::Duration & duration) + { + duration = rclcpp::Duration(node["sec"].as(), node["nsec"].as()); + return true; + } +}; + +template<> +struct convert +{ + static Node encode(const std::chrono::milliseconds & duration) + { + Node node; + node["milliseconds"] = duration.count(); + return node; + } + + static bool decode(const Node & node, std::chrono::milliseconds & duration) + { + duration = std::chrono::milliseconds(node["milliseconds"].as()); + return true; + } +}; + template<> struct convert { diff --git a/rosbag2_storage/src/rosbag2_storage/qos.cpp b/rosbag2_storage/src/rosbag2_storage/qos.cpp index 8c3e165d68..68c3fb2830 100644 --- a/rosbag2_storage/src/rosbag2_storage/qos.cpp +++ b/rosbag2_storage/src/rosbag2_storage/qos.cpp @@ -15,7 +15,6 @@ #include #include - #include "rosbag2_storage/qos.hpp" #include "rosbag2_storage/logging.hpp" #include "rmw/qos_string_conversions.h" @@ -275,8 +274,8 @@ bool convert>::decode( return true; } -Node convert>::encode( - const std::map & rhs) +Node convert>::encode( + const std::unordered_map & rhs) { Node node{NodeType::Sequence}; for (const auto & [key, value] : rhs) { @@ -285,8 +284,8 @@ Node convert>::encode( return node; } -bool convert>::decode( - const Node & node, std::map & rhs, int version) +bool convert>::decode( + const Node & node, std::unordered_map & rhs, int version) { if (!node.IsMap()) { return false; @@ -294,8 +293,12 @@ bool convert>::decode( rhs.clear(); for (const auto & element : node) { - rhs[element.first.as()] = decode_for_version( - element.second, version); + // Using rosbag2_storage::Rosbag2QoS for decoding because rclcpp::QoS is not default + // constructable. Note: It is safe to use upcast when inserting into the unordered_map + rhs.insert( + {element.first.as(), + decode_for_version(element.second, version) + }); } return true; } diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index ae068dbcef..d5b1bc2599 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -31,6 +31,7 @@ find_package(ament_cmake_ros REQUIRED) find_package(keyboard_handler REQUIRED) find_package(rcl REQUIRED) find_package(rclcpp REQUIRED) +find_package(rclcpp_components REQUIRED) find_package(rcutils REQUIRED) find_package(rmw REQUIRED) find_package(rosbag2_compression REQUIRED) @@ -44,10 +45,12 @@ find_package(yaml_cpp_vendor REQUIRED) add_library(${PROJECT_NAME} SHARED src/rosbag2_transport/bag_rewrite.cpp src/rosbag2_transport/player.cpp + src/rosbag2_transport/play_options.cpp src/rosbag2_transport/reader_writer_factory.cpp src/rosbag2_transport/recorder.cpp src/rosbag2_transport/record_options.cpp - src/rosbag2_transport/topic_filter.cpp) + src/rosbag2_transport/topic_filter.cpp + src/rosbag2_transport/config_options_from_node_params.cpp) target_include_directories(${PROJECT_NAME} PUBLIC $ $) @@ -55,6 +58,7 @@ target_link_libraries(${PROJECT_NAME} keyboard_handler::keyboard_handler rcl::rcl rclcpp::rclcpp + ${rclcpp_components_TARGETS} rcutils::rcutils rmw::rmw rosbag2_compression::rosbag2_compression @@ -65,6 +69,12 @@ target_link_libraries(${PROJECT_NAME} yaml-cpp ) +rclcpp_components_register_node( + ${PROJECT_NAME} PLUGIN "rosbag2_transport::Player" EXECUTABLE player) + +rclcpp_components_register_node( + ${PROJECT_NAME} PLUGIN "rosbag2_transport::Recorder" EXECUTABLE recorder) + # Causes the visibility macros to use dllexport rather than dllimport, # which is appropriate when building the dll but not consuming it. target_compile_definitions(${PROJECT_NAME} PRIVATE "ROSBAG2_TRANSPORT_BUILDING_LIBRARY") @@ -89,6 +99,7 @@ ament_export_libraries(${PROJECT_NAME}) ament_export_targets(export_${PROJECT_NAME}) ament_export_dependencies( + rclcpp_components keyboard_handler rosbag2_cpp rosbag2_compression @@ -109,6 +120,10 @@ function(create_tests_for_rmw_implementation) LINK_LIBS rosbag2_transport ${test_msgs_TARGETS} rosbag2_test_common::rosbag2_test_common ${SKIP_TEST}) + rosbag2_transport_add_gmock(test_composable_player + test/rosbag2_transport/test_composable_player.cpp + LINK_LIBS rosbag2_transport ${test_msgs_TARGETS} rosbag2_test_common::rosbag2_test_common) + rosbag2_transport_add_gmock(test_play_loop test/rosbag2_transport/test_play_loop.cpp LINK_LIBS rosbag2_transport ${test_msgs_TARGETS} rosbag2_test_common::rosbag2_test_common @@ -203,10 +218,15 @@ function(create_tests_for_rmw_implementation) test/rosbag2_transport/test_record_services.cpp LINK_LIBS rosbag2_transport ${test_msgs_TARGETS} rosbag2_test_common::rosbag2_test_common) - rosbag2_transport_add_gmock(test_component_parameters - test/rosbag2_transport/test_composable_recorder.cpp + rosbag2_transport_add_gmock(test_composable_recorder + test/rosbag2_transport/test_composable_recorder.cpp LINK_LIBS rosbag2_transport ${test_msgs_TARGETS} rosbag2_test_common::rosbag2_test_common) + rosbag2_transport_add_gmock(test_load_composable_components + test/rosbag2_transport/test_load_composable_components.cpp + LINK_LIBS rosbag2_transport ${test_msgs_TARGETS} rosbag2_test_common::rosbag2_test_common + ${rclcpp_components_TARGETS}) + if(${rmw_implementation} MATCHES "rmw_cyclonedds(.*)") ament_add_test_label(test_play_services__rmw_cyclonedds_cpp xfail) endif() diff --git a/rosbag2_transport/include/rosbag2_transport/config_options_from_node_params.hpp b/rosbag2_transport/include/rosbag2_transport/config_options_from_node_params.hpp new file mode 100644 index 0000000000..d8d87cf645 --- /dev/null +++ b/rosbag2_transport/include/rosbag2_transport/config_options_from_node_params.hpp @@ -0,0 +1,37 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_TRANSPORT__CONFIG_OPTIONS_FROM_NODE_PARAMS_HPP_ +#define ROSBAG2_TRANSPORT__CONFIG_OPTIONS_FROM_NODE_PARAMS_HPP_ + +#include + +#include "rclcpp/node.hpp" +#include "rosbag2_transport/play_options.hpp" +#include "rosbag2_transport/record_options.hpp" +#include "rosbag2_storage/storage_options.hpp" + +namespace rosbag2_transport +{ +rosbag2_transport::PlayOptions +get_play_options_from_node_params(rclcpp::Node & node); + +rosbag2_transport::RecordOptions +get_record_options_from_node_params(rclcpp::Node & node); + +rosbag2_storage::StorageOptions +get_storage_options_from_node_params(rclcpp::Node & node); +} // namespace rosbag2_transport + +#endif // ROSBAG2_TRANSPORT__CONFIG_OPTIONS_FROM_NODE_PARAMS_HPP_ diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 4f880d0614..432bd2d15d 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -15,15 +15,16 @@ #ifndef ROSBAG2_TRANSPORT__PLAY_OPTIONS_HPP_ #define ROSBAG2_TRANSPORT__PLAY_OPTIONS_HPP_ +#include #include #include #include #include #include "keyboard_handler/keyboard_handler.hpp" -#include "rclcpp/duration.hpp" -#include "rclcpp/time.hpp" -#include "rclcpp/qos.hpp" +#include "rclcpp/rclcpp.hpp" +#include "rosbag2_storage/yaml.hpp" +#include "rosbag2_transport/visibility_control.hpp" namespace rosbag2_transport { @@ -103,4 +104,15 @@ struct PlayOptions } // namespace rosbag2_transport +namespace YAML +{ +template<> +struct ROSBAG2_TRANSPORT_PUBLIC convert +{ + static Node encode(const rosbag2_transport::PlayOptions & play_options); + static bool decode(const Node & node, rosbag2_transport::PlayOptions & play_options); +}; + +} // namespace YAML + #endif // ROSBAG2_TRANSPORT__PLAY_OPTIONS_HPP_ diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 20945571da..8ca438b09f 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -80,11 +80,35 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC static constexpr callback_handle_t invalid_callback_handle = 0; + /// \brief Constructor and entry point for the composable player. + /// Will call Player(node_name, node_options) constructor with node_name = "rosbag2_player". + /// \param node_options Node options which will be used during construction of the underlying + /// node. + ROSBAG2_TRANSPORT_PUBLIC + explicit Player(const rclcpp::NodeOptions & node_options); + + /// \brief Default constructor and entry point for the composable player. + /// Will construct Player class and initialize play_options, storage_options from node + /// parameters. At the end will call Player::play() to automatically start playback in a + /// separate thread. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. ROSBAG2_TRANSPORT_PUBLIC explicit Player( const std::string & node_name = "rosbag2_player", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Constructor which will construct Player class with provided parameters and default + /// rosbag2_cpp::reader and KeyboardHandler classes. + /// \note The KeyboardHandler class will be initialized with parameter which is disabling + /// signal handlers in it. + /// \param storage_options Storage options which will be applied to the rosbag2_cpp::reader + /// after construction. + /// \param play_options Playback settings for Player class. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. ROSBAG2_TRANSPORT_PUBLIC Player( const rosbag2_storage::StorageOptions & storage_options, @@ -92,6 +116,16 @@ class Player : public rclcpp::Node const std::string & node_name = "rosbag2_player", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Constructor which will construct Player class with provided parameters and default + /// KeyboardHandler class initialized with parameter which is disabling signal handlers in it. + /// \param reader Unique pointer to the rosbag2_cpp::Reader class which will be moved to the + /// internal instance of the Player class during construction. + /// \param storage_options Storage options which will be applied to the rosbag2_cpp::reader + /// after construction. + /// \param play_options Playback settings for Player class. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. ROSBAG2_TRANSPORT_PUBLIC Player( std::unique_ptr reader, @@ -100,6 +134,16 @@ class Player : public rclcpp::Node const std::string & node_name = "rosbag2_player", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Constructor which will construct Player class with provided parameters. + /// \param keyboard_handler Keyboard handler class uses to handle user input from keyboard. + /// \param reader Unique pointer to the rosbag2_cpp::Reader class which will be moved to the + /// internal instance of the Player class during construction. + /// \param storage_options Storage options which will be applied to the rosbag2_cpp::reader + /// after construction. + /// \param play_options Playback settings for Player class. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. ROSBAG2_TRANSPORT_PUBLIC Player( std::unique_ptr reader, @@ -109,6 +153,7 @@ class Player : public rclcpp::Node const std::string & node_name = "rosbag2_player", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Default destructor. ROSBAG2_TRANSPORT_PUBLIC virtual ~Player(); @@ -237,6 +282,16 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC size_t get_number_of_registered_on_play_msg_post_callbacks(); + ROSBAG2_TRANSPORT_PUBLIC + /// \brief Getter for the currently stored storage options + /// \return Copy of the currently stored storage options + const rosbag2_storage::StorageOptions & get_storage_options(); + + ROSBAG2_TRANSPORT_PUBLIC + /// \brief Getter for the currently stored play options + /// \return Copy of the currently stored play options + const rosbag2_transport::PlayOptions & get_play_options(); + private: std::unique_ptr pimpl_; }; diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index e96e1f375f..a81037bb99 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -57,11 +57,35 @@ class RecorderImpl; class Recorder : public rclcpp::Node { public: + /// \brief Constructor and entry point for the composable recorder. + /// Will call Recorder(node_name, node_options) constructor with node_name = "rosbag2_recorder". + /// \param node_options Node options which will be used during construction of the underlying + /// node. + ROSBAG2_TRANSPORT_PUBLIC + explicit Recorder(const rclcpp::NodeOptions & node_options); + + /// \brief Default constructor and entry point for the composable recorder. + /// Will construct Recorder class and initialize record_options, storage_options from node + /// parameters. At the end will call Recorder::record() to automatically start recording in a + /// separate thread. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. ROSBAG2_TRANSPORT_PUBLIC explicit Recorder( const std::string & node_name = "rosbag2_recorder", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Constructor which will construct Recorder class with provided parameters and default + /// KeyboardHandler class initialized with parameter which is disabling signal handlers in it. + /// \param writer Shared pointer to the instance of the rosbag2_cpp::Writer class. Shall not be + /// null_ptr. + /// \param storage_options Storage options which will be applied to the rosbag2_cpp::writer class + /// when recording will be started. + /// \param record_options Settings for Recorder class + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. ROSBAG2_TRANSPORT_PUBLIC Recorder( std::shared_ptr writer, @@ -70,6 +94,16 @@ class Recorder : public rclcpp::Node const std::string & node_name = "rosbag2_recorder", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Constructor which will construct Recorder class with provided parameters + /// \param writer Shared pointer to the instance of the rosbag2_cpp::Writer class. Shall not be + /// null_ptr. + /// \param keyboard_handler Keyboard handler class uses to handle user input from keyboard. + /// \param storage_options Storage options which will be applied to the rosbag2_cpp::writer class + /// when recording will be started. + /// \param record_options Settings for Recorder class + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. ROSBAG2_TRANSPORT_PUBLIC Recorder( std::shared_ptr writer, @@ -79,6 +113,7 @@ class Recorder : public rclcpp::Node const std::string & node_name = "rosbag2_recorder", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Default destructor. ROSBAG2_TRANSPORT_PUBLIC virtual ~Recorder(); diff --git a/rosbag2_transport/package.xml b/rosbag2_transport/package.xml index edf7bb3ded..9e0afed1d5 100644 --- a/rosbag2_transport/package.xml +++ b/rosbag2_transport/package.xml @@ -14,6 +14,7 @@ ament_cmake_ros rclcpp + rclcpp_components rosbag2_compression rosbag2_cpp rosbag2_interfaces diff --git a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp new file mode 100644 index 0000000000..49dc587ac5 --- /dev/null +++ b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp @@ -0,0 +1,333 @@ +// Copyright 2023 Patrick Roncagliolo and Michael Orlov. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rclcpp/logging.hpp" +#include "rosbag2_storage/qos.hpp" +#include "rosbag2_transport/play_options.hpp" +#include "rosbag2_transport/config_options_from_node_params.hpp" + +namespace rosbag2_transport +{ + +namespace param_utils +{ +rcl_interfaces::msg::ParameterDescriptor int_param_description( + std::string description, int64_t min, + int64_t max) +{ + rcl_interfaces::msg::ParameterDescriptor d{}; + rcl_interfaces::msg::IntegerRange r{}; + d.description = std::move(description); + r.from_value = min; + r.to_value = max; + d.integer_range.push_back(r); + return d; +} + +rcl_interfaces::msg::ParameterDescriptor float_param_description( + std::string description, float min, + float max) +{ + rcl_interfaces::msg::ParameterDescriptor d{}; + rcl_interfaces::msg::FloatingPointRange r{}; + d.description = std::move(description); + r.from_value = min; + r.to_value = max; + d.floating_point_range.push_back(r); + return d; +} + +template +typename std::enable_if::is_integer, T>::type +declare_integer_node_params( + rclcpp::Node & node, + const std::string & description, + int64_t min = std::numeric_limits::min(), + int64_t max = std::numeric_limits::max(), + T default_value = 0) +{ + T output_value{default_value}; + try { + auto param_desc = param_utils::int_param_description(description, min, max); + output_value = + static_cast(node.declare_parameter(description, default_value, param_desc)); + } catch (const rclcpp::exceptions::InvalidParameterValueException & e) { + std::ostringstream oss; + oss << e.what(); + oss << " Parameter shall be in range [" << min << " , " << max << "]"; + throw rclcpp::exceptions::InvalidParameterValueException(oss.str()); + } + return output_value; +} + +rclcpp::Duration get_duration_from_node_param( + rclcpp::Node & node, + const std::string & node_name, + int64_t default_value_sec = 0, + int64_t default_value_nsec = 0) +{ + constexpr int64_t NANOSECONDS_PER_SECOND = 1000LL * 1000LL * 1000LL; + constexpr int64_t MAX_NANOSEC = NANOSECONDS_PER_SECOND - 1; + constexpr int64_t MIN_NANOSEC = -1 * MAX_NANOSEC; + constexpr int64_t MAX_SEC = std::numeric_limits::max() / NANOSECONDS_PER_SECOND - 1; + constexpr int64_t MIN_SEC = std::numeric_limits::min() / NANOSECONDS_PER_SECOND + 1; + + if (default_value_sec > MAX_SEC || default_value_sec < MIN_SEC) { + std::ostringstream oss; + oss << "default_value_sec = " << default_value_sec << " for " << node_name; + oss << " is out of range. Shall be in range [" << MIN_SEC << " , " << MAX_SEC << "]"; + throw std::out_of_range(oss.str()); + } + if (default_value_nsec > MAX_NANOSEC || default_value_nsec < MIN_NANOSEC) { + std::ostringstream oss; + oss << "default_value_nsec = " << default_value_nsec << " for " << node_name; + oss << " is out of range. Shall be in range [" << MIN_NANOSEC << " , " << MAX_NANOSEC << "]"; + throw std::out_of_range(oss.str()); + } + const auto sec = declare_integer_node_params( + node, node_name + ".sec", MIN_SEC, MAX_SEC, default_value_sec); + const auto nsec = declare_integer_node_params( + node, node_name + ".nsec", MIN_NANOSEC, MAX_NANOSEC, default_value_nsec); + + auto total_nanoseconds = + static_cast(std::abs(sec)) * NANOSECONDS_PER_SECOND + std::abs(nsec); + if (sec < 0 || nsec < 0) { + total_nanoseconds *= -1; + } + return rclcpp::Duration::from_nanoseconds(total_nanoseconds); +} + +} // namespace param_utils + +PlayOptions get_play_options_from_node_params(rclcpp::Node & node) +{ + PlayOptions play_options{}; + play_options.read_ahead_queue_size = param_utils::declare_integer_node_params( + node, "play.read_ahead_queue_size", 1, std::numeric_limits::max(), 1000); + + play_options.node_prefix = node.declare_parameter("play.node_prefix", ""); + + auto desc_rate = param_utils::float_param_description( + "Playback rate (hz)", + 0.000001f, + std::numeric_limits::max()); + play_options.rate = + static_cast(node.declare_parameter("play.rate", 1.0f, desc_rate)); + + play_options.topics_to_filter = node.declare_parameter>( + "play.topics_to_filter", std::vector()); + + play_options.topics_regex_to_filter = + node.declare_parameter("play.topics_regex_to_filter", ""); + + play_options.topics_regex_to_exclude = + node.declare_parameter("play.topics_regex_to_exclude", ""); + + std::string qos_profile_overrides_path = + node.declare_parameter("play.qos_profile_overrides_path", ""); + + if (!qos_profile_overrides_path.empty()) { + try { + YAML::Node yaml_file = YAML::LoadFile(qos_profile_overrides_path); + for (auto topic_qos : yaml_file) { + play_options.topic_qos_profile_overrides + .emplace( + topic_qos.first.as(), + topic_qos.second.as()); + } + } catch (const YAML::Exception & ex) { + throw std::runtime_error( + std::string("Exception on parsing QoS ovverrides file: ") + ex.what()); + } + } + + play_options.loop = node.declare_parameter("play.loop", false); + + auto topic_remapping_options = node.declare_parameter>( + "play.topic_remapping_options", std::vector()); + + if (!topic_remapping_options.empty()) { + RCLCPP_WARN( + node.get_logger(), + "Remappings shall be applied through standard CLI/Launch options." + "'topic_remapping_options' content will be ignored."); + } + + play_options.clock_publish_frequency = + node.declare_parameter("play.clock_publish_frequency", 0.0); + + play_options.clock_publish_on_topic_publish = + node.declare_parameter("play.clock_publish_on_topic_publish", false); + + play_options.clock_trigger_topics = node.declare_parameter>( + "play.clock_trigger_topics", std::vector()); + + play_options.delay = param_utils::get_duration_from_node_param(node, "play.delay", 0, 0); + + play_options.playback_duration = param_utils::get_duration_from_node_param( + node, "play.playback_duration", -1, 0); + + play_options.playback_until_timestamp = param_utils::get_duration_from_node_param( + node, "play.playback_until_timestamp", 0, -1).nanoseconds(); + + play_options.start_paused = node.declare_parameter("play.start_paused", false); + + play_options.start_offset = param_utils::get_duration_from_node_param( + node, "play.start_offset", 0, 0).nanoseconds(); + + play_options.disable_keyboard_controls = + node.declare_parameter("play.disable_keyboard_controls", false); + + play_options.wait_acked_timeout = param_utils::get_duration_from_node_param( + node, "play.wait_acked_timeout", 0, -1).nanoseconds(); + + play_options.disable_loan_message = + node.declare_parameter("play.disable_loan_message", false); + + return play_options; +} + +RecordOptions get_record_options_from_node_params(rclcpp::Node & node) +{ + RecordOptions record_options{}; + record_options.all = node.declare_parameter("record.all", false); + + record_options.is_discovery_disabled = + node.declare_parameter("record.is_discovery_disabled", false); + + record_options.topics = node.declare_parameter>( + "record.topics", std::vector()); + + record_options.rmw_serialization_format = + node.declare_parameter("record.rmw_serialization_format", "cdr"); + + record_options.topic_polling_interval = param_utils::get_duration_from_node_param( + node, "record.topic_polling_interval", + 0, 1000000).to_chrono(); + + record_options.regex = node.declare_parameter("record.regex", ""); + record_options.exclude = node.declare_parameter("record.exclude", ""); + record_options.node_prefix = node.declare_parameter("record.node_prefix", ""); + record_options.compression_mode = node.declare_parameter( + "record.compression_mode", + ""); + record_options.compression_format = node.declare_parameter( + "record.compression_format", ""); + + record_options.compression_queue_size = param_utils::declare_integer_node_params( + node, "record.compression_queue_size", 0, std::numeric_limits::max(), 1); + + record_options.compression_threads = param_utils::declare_integer_node_params( + node, "record.compression_threads", 0, std::numeric_limits::max(), + record_options.compression_threads); + + std::string qos_profile_overrides_path = + node.declare_parameter("record.qos_profile_overrides_path", ""); + + if (!qos_profile_overrides_path.empty()) { + try { + YAML::Node yaml_file = YAML::LoadFile(qos_profile_overrides_path); + for (auto topic_qos : yaml_file) { + record_options.topic_qos_profile_overrides + .emplace( + topic_qos.first.as(), + topic_qos.second.as()); + } + } catch (const YAML::Exception & ex) { + throw std::runtime_error( + std::string("Exception on parsing QoS ovverrides file: ") + ex.what()); + } + } + + record_options.include_hidden_topics = + node.declare_parameter("record.include_hidden_topics", false); + + record_options.include_unpublished_topics = + node.declare_parameter("record.include_unpublished_topics", false); + + record_options.ignore_leaf_topics = + node.declare_parameter("record.ignore_leaf_topics", false); + + record_options.start_paused = node.declare_parameter("record.start_paused", false); + + record_options.use_sim_time = node.get_parameter("use_sim_time").get_value(); + + if (record_options.use_sim_time && record_options.is_discovery_disabled) { + throw std::invalid_argument( + "'use_sim_time' and 'is_discovery_disabled' both set, but are incompatible settings. " + "The `/clock` topic needs to be discovered to record with sim time."); + } + return record_options; +} + +rosbag2_storage::StorageOptions +get_storage_options_from_node_params(rclcpp::Node & node) +{ + rosbag2_storage::StorageOptions storage_options{}; + + storage_options.uri = node.declare_parameter("storage.uri", ""); + + storage_options.storage_id = node.declare_parameter("storage.storage_id", ""); + + storage_options.storage_config_uri = + node.declare_parameter("storage.storage_config_uri", ""); + + storage_options.max_bagfile_size = param_utils::declare_integer_node_params( + node, "storage.max_bagfile_size", 0, + std::numeric_limits::max(), storage_options.max_bagfile_size); + + storage_options.max_bagfile_duration = param_utils::declare_integer_node_params( + node, "storage.max_bagfile_duration", 0, + std::numeric_limits::max(), storage_options.max_bagfile_duration); + + storage_options.max_cache_size = param_utils::declare_integer_node_params( + node, "storage.max_cache_size", 0, + std::numeric_limits::max(), 100 * 1024 * 1024); + + storage_options.storage_preset_profile = + node.declare_parameter("storage.storage_preset_profile", ""); + + storage_options.snapshot_mode = node.declare_parameter("storage.snapshot_mode", false); + + auto list_of_key_value_strings = node.declare_parameter>( + "storage.custom_data", + std::vector()); + for (const auto & key_value_string : list_of_key_value_strings) { + auto delimiter_pos = key_value_string.find("=", 0); + if (delimiter_pos == std::string::npos) { + std::stringstream ss; + ss << "The storage.custom_data expected to be as list of the key=value strings. " + "The `=` not found in the " << key_value_string; + throw std::invalid_argument(ss.str()); + } + auto key_string = key_value_string.substr(0, delimiter_pos); + auto value_string = key_value_string.substr(delimiter_pos + 1); + storage_options.custom_data[key_string] = value_string; + } + + storage_options.start_time_ns = param_utils::declare_integer_node_params( + node, "storage.start_time_ns", std::numeric_limits::min(), + std::numeric_limits::max(), storage_options.start_time_ns); + + storage_options.end_time_ns = param_utils::declare_integer_node_params( + node, "storage.end_time_ns", std::numeric_limits::min(), + std::numeric_limits::max(), storage_options.end_time_ns); + + return storage_options; +} + +} // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/play_options.cpp b/rosbag2_transport/src/rosbag2_transport/play_options.cpp new file mode 100644 index 0000000000..1515ba37cd --- /dev/null +++ b/rosbag2_transport/src/rosbag2_transport/play_options.cpp @@ -0,0 +1,110 @@ +// Copyright 2023 Patrick Roncagliolo. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rclcpp/duration.hpp" +#include "rosbag2_storage/qos.hpp" +#include "rosbag2_transport/play_options.hpp" + +namespace YAML +{ + +Node convert::encode( + const rosbag2_transport::PlayOptions & play_options) +{ + Node node; + node["read_ahead_queue_size"] = play_options.read_ahead_queue_size; + node["node_prefix"] = play_options.node_prefix; + node["rate"] = play_options.rate; + node["topics_to_filter"] = play_options.topics_to_filter; + node["topics_regex_to_filter"] = play_options.topics_regex_to_filter; + node["topics_regex_to_exclude"] = play_options.topics_regex_to_exclude; + node["topic_qos_profile_overrides"] = + YAML::convert>::encode( + play_options.topic_qos_profile_overrides); + node["loop"] = play_options.loop; + node["topic_remapping_options"] = play_options.topic_remapping_options; + node["clock_publish_frequency"] = play_options.clock_publish_frequency; + node["clock_publish_on_topic_publish"] = play_options.clock_publish_on_topic_publish; + node["clock_trigger_topics"] = play_options.clock_trigger_topics; + node["delay"] = play_options.delay; + node["playback_duration"] = play_options.playback_duration; + node["playback_until_timestamp"] = YAML::convert::encode( + std::chrono::nanoseconds(play_options.playback_until_timestamp)); + + node["start_paused"] = play_options.start_paused; + node["start_offset"] = YAML::convert::encode( + std::chrono::nanoseconds(play_options.start_offset)); + + node["disable_keyboard_controls"] = play_options.disable_keyboard_controls; + node["wait_acked_timeout"] = YAML::convert::encode( + std::chrono::nanoseconds(play_options.wait_acked_timeout)); + + node["disable_loan_message"] = play_options.disable_loan_message; + + return node; +} + +bool convert::decode( + const Node & node, rosbag2_transport::PlayOptions & play_options) +{ + optional_assign(node, "read_ahead_queue_size", play_options.read_ahead_queue_size); + optional_assign(node, "node_prefix", play_options.node_prefix); + optional_assign(node, "rate", play_options.rate); + optional_assign>( + node, "topics_to_filter", play_options.topics_to_filter); + optional_assign(node, "topics_regex_to_filter", play_options.topics_regex_to_filter); + optional_assign( + node, "topics_regex_to_exclude", play_options.topics_regex_to_exclude); + + optional_assign>( + node, "topic_qos_profile_overrides", play_options.topic_qos_profile_overrides); + + optional_assign(node, "clock_publish_frequency", play_options.clock_publish_frequency); + + optional_assign( + node, "clock_publish_on_topic_publish", play_options.clock_publish_on_topic_publish); + + optional_assign>( + node, "clock_trigger_topics", play_options.clock_trigger_topics); + + optional_assign(node, "delay", play_options.delay); + optional_assign(node, "playback_duration", play_options.playback_duration); + + rclcpp::Duration playback_until_timestamp( + std::chrono::nanoseconds(play_options.playback_until_timestamp)); + optional_assign(node, "playback_until_timestamp", playback_until_timestamp); + play_options.playback_until_timestamp = playback_until_timestamp.nanoseconds(); + + optional_assign(node, "start_paused", play_options.start_paused); + + rclcpp::Duration start_offset(std::chrono::nanoseconds(play_options.start_offset)); + optional_assign(node, "start_offset", start_offset); + play_options.start_offset = start_offset.nanoseconds(); + + optional_assign(node, "disable_keyboard_controls", play_options.disable_keyboard_controls); + + rclcpp::Duration wait_acked_timeout(std::chrono::nanoseconds(play_options.wait_acked_timeout)); + optional_assign(node, "wait_acked_timeout", wait_acked_timeout); + play_options.wait_acked_timeout = wait_acked_timeout.nanoseconds(); + + optional_assign(node, "disable_loan_message", play_options.disable_loan_message); + + return true; +} + +} // namespace YAML diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index c1392fbbd2..e6233bcd3b 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -33,6 +33,7 @@ #include "rosbag2_cpp/reader.hpp" #include "rosbag2_storage/storage_filter.hpp" #include "rosbag2_storage/qos.hpp" +#include "rosbag2_transport/config_options_from_node_params.hpp" namespace { @@ -189,6 +190,14 @@ class PlayerImpl /// \return Number of registered on_play_msg_post_callbacks size_t get_number_of_registered_on_play_msg_post_callbacks(); + /// \brief Getter for the currently stored storage options + /// \return Copy of the currently stored storage options + const rosbag2_storage::StorageOptions & get_storage_options(); + + /// \brief Getter for the currently stored play options + /// \return Copy of the currently stored play options + const rosbag2_transport::PlayOptions & get_play_options(); + protected: struct play_msg_callback_data { @@ -1207,16 +1216,40 @@ void PlayerImpl::publish_clock_update(const rclcpp::Time & time) } } +const rosbag2_storage::StorageOptions & PlayerImpl::get_storage_options() +{ + return storage_options_; +} + +const rosbag2_transport::PlayOptions & PlayerImpl::get_play_options() +{ + return play_options_; +} + /////////////////////////////// // Player public interface +Player::Player(const rclcpp::NodeOptions & node_options) +: Player("rosbag2_player", node_options) {} + Player::Player(const std::string & node_name, const rclcpp::NodeOptions & node_options) : rclcpp::Node(node_name, node_options) { - // TODO(karsten1987): Use this constructor later with parameter parsing. - // The reader, storage_options as well as play_options can be loaded via parameter. - // That way, the player can be used as a simple component in a component manager. - throw rclcpp::exceptions::UnimplementedError(); + rosbag2_storage::StorageOptions storage_options = get_storage_options_from_node_params(*this); + PlayOptions play_options = get_play_options_from_node_params(*this); + + #ifndef _WIN32 + auto keyboard_handler = std::make_shared(false); + #else + // We don't have signal handler option in constructor for windows version + auto keyboard_handler = std::shared_ptr(new KeyboardHandler()); + #endif + + auto reader = std::make_unique(); + + pimpl_ = std::make_unique( + this, std::move(reader), keyboard_handler, storage_options, play_options); + pimpl_->play(); } Player::Player( @@ -1365,4 +1398,21 @@ size_t Player::get_number_of_registered_on_play_msg_post_callbacks() return pimpl_->get_number_of_registered_on_play_msg_post_callbacks(); } +const rosbag2_storage::StorageOptions & Player::get_storage_options() +{ + return pimpl_->get_storage_options(); +} + +const rosbag2_transport::PlayOptions & Player::get_play_options() +{ + return pimpl_->get_play_options(); +} + } // namespace rosbag2_transport + +#include "rclcpp_components/register_node_macro.hpp" + +// Register the component with class_loader. +// This acts as a sort of entry point, allowing the component to be +// discoverable when its library is being loaded into a running process. +RCLCPP_COMPONENTS_REGISTER_NODE(rosbag2_transport::Player) diff --git a/rosbag2_transport/src/rosbag2_transport/record_options.cpp b/rosbag2_transport/src/rosbag2_transport/record_options.cpp index 383b97d568..ec6a0a9813 100644 --- a/rosbag2_transport/src/rosbag2_transport/record_options.cpp +++ b/rosbag2_transport/src/rosbag2_transport/record_options.cpp @@ -17,27 +17,12 @@ #include #include "rosbag2_storage/qos.hpp" +#include "rosbag2_transport/play_options.hpp" #include "rosbag2_transport/record_options.hpp" namespace YAML { -template<> -struct convert -{ - static Node encode(const std::chrono::milliseconds & millis) - { - Node node{millis.count()}; - return node; - } - - static bool decode(const Node & node, std::chrono::milliseconds & millis) - { - millis = std::chrono::milliseconds{node.as()}; - return true; - } -}; - Node convert::encode( const rosbag2_transport::RecordOptions & record_options) { @@ -54,11 +39,9 @@ Node convert::encode( node["compression_format"] = record_options.compression_format; node["compression_queue_size"] = record_options.compression_queue_size; node["compression_threads"] = record_options.compression_threads; - std::map qos_overrides( - record_options.topic_qos_profile_overrides.begin(), - record_options.topic_qos_profile_overrides.end()); - node["topic_qos_profile_overrides"] = convert>::encode(qos_overrides); + node["topic_qos_profile_overrides"] = + convert>::encode( + record_options.topic_qos_profile_overrides); node["include_hidden_topics"] = record_options.include_hidden_topics; node["include_unpublished_topics"] = record_options.include_unpublished_topics; return node; @@ -72,8 +55,10 @@ bool convert::decode( optional_assign>(node, "topics", record_options.topics); optional_assign( node, "rmw_serialization_format", record_options.rmw_serialization_format); + optional_assign( node, "topic_polling_interval", record_options.topic_polling_interval); + optional_assign(node, "regex", record_options.regex); optional_assign(node, "exclude", record_options.exclude); optional_assign(node, "node_prefix", record_options.node_prefix); @@ -82,13 +67,12 @@ bool convert::decode( optional_assign(node, "compression_queue_size", record_options.compression_queue_size); optional_assign(node, "compression_threads", record_options.compression_threads); - // yaml-cpp doesn't implement unordered_map - std::map qos_overrides; + std::unordered_map qos_overrides; if (node["topic_qos_profile_overrides"]) { - qos_overrides = YAML::decode_for_version>( + qos_overrides = YAML::decode_for_version>( node["topic_qos_profile_overrides"], version); } - record_options.topic_qos_profile_overrides.insert(qos_overrides.begin(), qos_overrides.end()); + record_options.topic_qos_profile_overrides = qos_overrides; optional_assign(node, "include_hidden_topics", record_options.include_hidden_topics); optional_assign( diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index c3baafeff3..86b4f7e43e 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -38,6 +38,7 @@ #include "rosbag2_storage/qos.hpp" #include "logging.hpp" +#include "rosbag2_transport/config_options_from_node_params.hpp" #include "rosbag2_transport/topic_filter.hpp" namespace rosbag2_transport @@ -626,15 +627,31 @@ void RecorderImpl::warn_if_new_qos_for_subscribed_topic(const std::string & topi /////////////////////////////// // Recorder public interface +Recorder::Recorder(const rclcpp::NodeOptions & node_options) +: Recorder("rosbag2_recorder", node_options) {} + Recorder::Recorder( const std::string & node_name, const rclcpp::NodeOptions & node_options) : rclcpp::Node(node_name, node_options) { - // TODO(karsten1987): Use this constructor later with parameter parsing. - // The reader, storage_options as well as record_options can be loaded via parameter. - // That way, the recorder can be used as a simple component in a component manager. - throw rclcpp::exceptions::UnimplementedError(); + rosbag2_storage::StorageOptions storage_options = get_storage_options_from_node_params(*this); + + RecordOptions record_options = get_record_options_from_node_params(*this); + + #ifndef _WIN32 + auto keyboard_handler = std::make_shared(false); + #else + // We don't have signal handler option in constructor for windows version + auto keyboard_handler = std::shared_ptr(new KeyboardHandler()); + #endif + + auto writer = std::make_unique(); + + pimpl_ = std::make_unique( + this, std::move(writer), keyboard_handler, + storage_options, record_options); + pimpl_->record(); } Recorder::Recorder( @@ -672,8 +689,7 @@ Recorder::Recorder( storage_options, record_options)) {} -Recorder::~Recorder() -{} +Recorder::~Recorder() = default; void Recorder::record() { @@ -758,3 +774,10 @@ Recorder::stop_discovery() } } // namespace rosbag2_transport + +#include "rclcpp_components/register_node_macro.hpp" + +// Register the component with class_loader. +// This acts as a sort of entry point, allowing the component to be +// discoverable when its library is being loaded into a running process. +RCLCPP_COMPONENTS_REGISTER_NODE(rosbag2_transport::Recorder) diff --git a/rosbag2_transport/test/resources/player_node_params.yaml b/rosbag2_transport/test/resources/player_node_params.yaml new file mode 100644 index 0000000000..56b75ee5f0 --- /dev/null +++ b/rosbag2_transport/test/resources/player_node_params.yaml @@ -0,0 +1,47 @@ +player_params_node: + ros__parameters: + play: + read_ahead_queue_size: 3 + node_prefix: "test" + rate: 13.0 + topics_to_filter: ["/foo", "/bar"] + topics_regex_to_filter: "[xyz]/topic" + topics_regex_to_exclude: "[abc]/topic" + loop: false + clock_publish_frequency: 19.0 + clock_publish_on_topic_publish: true + clock_trigger_topics: ["/triggers/clock"] + # Negative durations are invalid. + delay: + sec: 0 + nsec: 1 + # Negative timestamps will make the playback to not stop. + playback_duration: + sec: -1 + nsec: 00000000 + # Negative timestamps will make the playback to not stop. + playback_until_timestamp: + sec: -2 + nsec: -500000000 + start_paused: true + # Negative durations are invalid. + start_offset: + sec: 0 + nsec: 999999999 + disable_keyboard_controls: true + # Negative value means that published messages do not need to be acknowledged. + wait_acked_timeout: + sec: 0 + nsec: -999999999 + disable_loan_message: false + + storage: + uri: "path/to/some_bag" + storage_id: "sqlite3" + storage_config_uri: "" + max_bagfile_size: 12345 + max_bagfile_duration: 54321 + max_cache_size: 9898 + storage_preset_profile: "resilient" + snapshot_mode: false + custom_data: ["key1=value1", "key2=value2"] \ No newline at end of file diff --git a/rosbag2_transport/test/resources/qos_profile_overrides.yaml b/rosbag2_transport/test/resources/qos_profile_overrides.yaml new file mode 100644 index 0000000000..03731bb7fb --- /dev/null +++ b/rosbag2_transport/test/resources/qos_profile_overrides.yaml @@ -0,0 +1,19 @@ +/overrided_topic_qos: + history: keep_last + depth: 10 + reliability: reliable + durability: volatile + deadline: + # unspecified/infinity + sec: 0 + nsec: 0 + lifespan: + # unspecified/infinity + sec: 0 + nsec: 0 + liveliness: system_default + liveliness_lease_duration: + # unspecified/infinity + sec: 0 + nsec: 0 + avoid_ros_namespace_conventions: false \ No newline at end of file diff --git a/rosbag2_transport/test/resources/recorder_node_params.yaml b/rosbag2_transport/test/resources/recorder_node_params.yaml new file mode 100644 index 0000000000..b5ca93b7b1 --- /dev/null +++ b/rosbag2_transport/test/resources/recorder_node_params.yaml @@ -0,0 +1,38 @@ +recorder_params_node: + ros__parameters: + + use_sim_time: false + + record: + all: true + is_discovery_disabled: true + topics: ["topic", "other_topic"] + rmw_serialization_format: "cdr" + topic_polling_interval: + sec: 0 + nsec: 10000000 + regex: "[xyz]/topic" + exclude: "*" + node_prefix: "prefix" + compression_mode: "stream" + compression_format: "h264" + compression_queue_size: 10 + compression_threads: 2 + qos_profile_overrides_path: "" + include_hidden_topics: true + include_unpublished_topics: true + ignore_leaf_topics: false + start_paused: false + + storage: + uri: "path/to/some_bag" + storage_id: "sqlite3" + storage_config_uri: "" + max_bagfile_size: 2147483646 + max_bagfile_duration: 2147483646 + max_cache_size: 989888 + storage_preset_profile: "none" + snapshot_mode: false + custom_data: ["key1=value1", "key2=value2"] + start_time_ns: 0 + end_time_ns: 100000 \ No newline at end of file diff --git a/rosbag2_transport/test/rosbag2_transport/composition_manager_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/composition_manager_test_fixture.hpp new file mode 100644 index 0000000000..872d52b88f --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/composition_manager_test_fixture.hpp @@ -0,0 +1,108 @@ +// Copyright 2023, Patrick Roncagliolo and Michael Orlov +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_TRANSPORT__COMPOSITION_MANAGER_TEST_FIXTURE_HPP_ +#define ROSBAG2_TRANSPORT__COMPOSITION_MANAGER_TEST_FIXTURE_HPP_ + +#include + +#include +#include +#include + +#include "composition_interfaces/srv/load_node.hpp" +#include "rclcpp_components/component_manager.hpp" +#include "rclcpp_components/component_manager_isolated.hpp" +#include "rosbag2_test_common/temporary_directory_fixture.hpp" + +class CompositionManagerTestFixture + : public rosbag2_test_common::ParametrizedTemporaryDirectoryFixture +{ +public: + void SetUp() override + { + rclcpp::init(0, nullptr); + auto bag_name = get_test_name() + "_" + GetParam(); + root_bag_path_ = std::filesystem::path(temporary_dir_path_) / bag_name; + + // Clean up potentially leftover bag files. + // There may be leftovers if the system reallocates a temp directory + // used by a previous test execution and the test did not have a clean exit. + std::filesystem::remove_all(root_bag_path_); + + exec_ = std::make_shared(); + node_ = rclcpp::Node::make_shared("test_component_manager"); + using ComponentManagerIsolated = + rclcpp_components::ComponentManagerIsolated; + composition_manager_ = std::make_shared(exec_); + + exec_->add_node(composition_manager_); + exec_->add_node(node_); + + load_node_client_ = node_->create_client( + "/ComponentManager/_container/load_node"); + + if (!load_node_client_->wait_for_service(std::chrono::seconds(20))) { + ASSERT_TRUE(false) << "service not available after waiting"; + } + + unload_node_client_ = node_->create_client( + "/ComponentManager/_container/unload_node"); + + if (!unload_node_client_->wait_for_service(std::chrono::seconds(20))) { + ASSERT_TRUE(false) << "service not available after waiting"; + } + } + + void TearDown() override + { + rclcpp::shutdown(); + composition_manager_.reset(); // Need to force destruction to invoke composed recorder + // destructor before trying to delete files which is currently opened for writing. + std::filesystem::remove_all(root_bag_path_); + } + + void unload_node(uint64_t node_id) + { + auto unload_node_request = std::make_shared(); + unload_node_request->unique_id = node_id; + auto unload_node_future = unload_node_client_->async_send_request(unload_node_request); + // Wait for the response + auto unload_node_ret = + exec_->spin_until_future_complete(unload_node_future, std::chrono::seconds(10)); + auto unload_node_response = unload_node_future.get(); + EXPECT_EQ(unload_node_ret, rclcpp::FutureReturnCode::SUCCESS); + EXPECT_EQ(unload_node_response->success, true); + EXPECT_EQ(unload_node_response->error_message, ""); + } + + std::string get_test_name() const + { + const auto * test_info = UnitTest::GetInstance()->current_test_info(); + std::string test_name = test_info->name(); + // Replace any slashes in the test name, since it is used in paths + std::replace(test_name.begin(), test_name.end(), '/', '_'); + return test_name; + } + +protected: + std::filesystem::path root_bag_path_; + std::shared_ptr node_; + std::shared_ptr exec_; + std::shared_ptr> load_node_client_; + std::shared_ptr> unload_node_client_; + std::shared_ptr composition_manager_; +}; + +#endif // ROSBAG2_TRANSPORT__COMPOSITION_MANAGER_TEST_FIXTURE_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp index 7a0143a1c9..e08ac4c8ab 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp @@ -20,6 +20,8 @@ #include #include +#include "rosbag2_cpp/reader.hpp" +#include "rosbag2_storage/storage_options.hpp" #include "rosbag2_transport/player.hpp" class MockPlayer : public rosbag2_transport::Player @@ -33,6 +35,12 @@ class MockPlayer : public rosbag2_transport::Player : Player(std::move(reader), storage_options, play_options, node_name) {} + explicit MockPlayer( + const std::string & node_name = "rosbag2_mock_composable_player", + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()) + : Player(node_name, node_options) + {} + std::vector get_list_of_publishers() { std::vector pub_list; @@ -48,8 +56,6 @@ class MockPlayer : public rosbag2_transport::Player return pub_list; } - using rosbag2_transport::Player::wait_for_playback_to_start; - size_t get_number_of_registered_pre_callbacks() { return get_number_of_registered_on_play_msg_pre_callbacks(); @@ -59,6 +65,10 @@ class MockPlayer : public rosbag2_transport::Player { return get_number_of_registered_on_play_msg_post_callbacks(); } + + using rosbag2_transport::Player::wait_for_playback_to_start; + using rosbag2_transport::Player::get_storage_options; + using rosbag2_transport::Player::get_play_options; }; #endif // ROSBAG2_TRANSPORT__MOCK_PLAYER_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/mock_recorder.hpp b/rosbag2_transport/test/rosbag2_transport/mock_recorder.hpp index 644727da47..e433803b92 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_recorder.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_recorder.hpp @@ -32,6 +32,12 @@ class MockRecorder : public rosbag2_transport::Recorder : Recorder(writer, storage_options, record_options) {} + explicit MockRecorder( + const std::string & node_name = "rosbag2_mock_recorder", + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()) + : Recorder(node_name, node_options) + {} + template bool wait_for_topic_to_be_discovered( const std::string & topic_name_to_wait_for, @@ -65,6 +71,9 @@ class MockRecorder : public rosbag2_transport::Recorder } return available_for_recording; } + + using rosbag2_transport::Recorder::get_storage_options; + using rosbag2_transport::Recorder::get_record_options; }; #endif // ROSBAG2_TRANSPORT__MOCK_RECORDER_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp new file mode 100644 index 0000000000..9e9bedc841 --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp @@ -0,0 +1,221 @@ +// Copyright 2023, Patrick Roncagliolo and Michael Orlov +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include + +#include "composition_manager_test_fixture.hpp" +#include "mock_player.hpp" +#include "rosbag2_test_common/subscription_manager.hpp" +#include "rosbag2_test_common/tested_storage_ids.hpp" +#include "test_msgs/msg/basic_types.hpp" + +using namespace std::chrono_literals; // NOLINT +using namespace ::testing; // NOLINT + +class ComposablePlayerTests + : public ::testing::Test, public WithParamInterface +{ +public: + void SetUp() override + { + rclcpp::init(0, nullptr); + } + + void TearDown() override + { + rclcpp::shutdown(); + } +}; + +class ComposablePlayerIntegrationTests : public CompositionManagerTestFixture +{ +public: + using srvPlay = rosbag2_interfaces::srv::Play; + using srvIsPaused = rosbag2_interfaces::srv::IsPaused; + using srvResume = rosbag2_interfaces::srv::Resume; + + void SetUp() override + { + CompositionManagerTestFixture::SetUp(); + sub_ = std::make_shared(); + is_paused_client_ = node_->create_client(player_node_name_ + "/is_paused"); + resume_client_ = node_->create_client(player_node_name_ + "/resume"); + play_client_ = node_->create_client(player_node_name_ + "/play"); + } + + bool player_is_paused_service_request() + { + // Wait for service to be ready + EXPECT_TRUE(is_paused_client_->wait_for_service(std::chrono::seconds(10))); + auto is_paused_request = std::make_shared(); + auto is_paused_future = is_paused_client_->async_send_request(is_paused_request); + auto ret = exec_->spin_until_future_complete(is_paused_future, 10s); // Wait for the result. + EXPECT_EQ(ret, rclcpp::FutureReturnCode::SUCCESS); + auto result = std::make_shared(); + EXPECT_NO_THROW({result = is_paused_future.get();}); + EXPECT_TRUE(result); + return result->paused; + } + + void player_resume_service_request() + { + // Wait for service to be ready + EXPECT_TRUE(resume_client_->wait_for_service(std::chrono::seconds(10))); + auto resume_request = std::make_shared(); + auto resume_future = resume_client_->async_send_request(resume_request); + auto ret = exec_->spin_until_future_complete(resume_future, 10s); // Wait for the result. + EXPECT_EQ(ret, rclcpp::FutureReturnCode::SUCCESS); + auto result = std::make_shared(); + EXPECT_NO_THROW({result = resume_future.get();}); + EXPECT_TRUE(result); + } + + bool player_play_service_request() + { + // Wait for service to be ready + EXPECT_TRUE(play_client_->wait_for_service(std::chrono::seconds(10))); + auto play_request = std::make_shared(); + play_request->start_offset = rclcpp::Time(0, 0); + play_request->playback_until_timestamp = rclcpp::Time(0, -1); + play_request->playback_duration = rclcpp::Duration(-1, 0); + auto play_future = play_client_->async_send_request(play_request); + auto ret = exec_->spin_until_future_complete(play_future, 10s); // Wait for the result. + EXPECT_EQ(ret, rclcpp::FutureReturnCode::SUCCESS); + auto result = std::make_shared(); + EXPECT_NO_THROW({result = play_future.get();}); + EXPECT_TRUE(result); + return result->success; + } + +protected: + const std::string player_node_name_ = "/rosbag2_player"; + std::shared_ptr> is_paused_client_; + std::shared_ptr> play_client_; + std::shared_ptr> resume_client_; + std::shared_ptr sub_; + const std::string topic_name_ = "/topic1"; + const size_t num_msgs_in_bag_ = 5; +}; + +TEST_P(ComposablePlayerTests, player_can_parse_parameters_from_file) { + // _SRC_RESOURCES_DIR_PATH defined in CMakeLists.txt + rclcpp::NodeOptions opts; + opts.arguments( + { + "--ros-args", + "--params-file", _SRC_RESOURCES_DIR_PATH "/player_node_params.yaml" + }); + opts.append_parameter_override( + "play.qos_profile_overrides_path", + _SRC_RESOURCES_DIR_PATH "/qos_profile_overrides.yaml"); + opts.append_parameter_override("storage.storage_id", GetParam()); + const std::string uri_str = (std::filesystem::path( + _SRC_RESOURCES_DIR_PATH) / GetParam() / "test_bag_for_seek").generic_string(); + opts.append_parameter_override("storage.uri", uri_str); + + auto player = std::make_shared("player_params_node", opts); + auto play_options = player->get_play_options(); + auto storage_options = player->get_storage_options(); + + EXPECT_EQ(play_options.read_ahead_queue_size, 3); + EXPECT_EQ(play_options.node_prefix, "test"); + EXPECT_EQ(play_options.rate, 13.0); + std::vector topics_to_filter {"/foo", "/bar"}; + EXPECT_EQ(play_options.topics_to_filter, topics_to_filter); + EXPECT_EQ(play_options.topics_regex_to_filter, "[xyz]/topic"); + EXPECT_EQ(play_options.topics_regex_to_exclude, "[abc]/topic"); + std::unordered_map topic_qos_profile_overrides{ + std::pair{ + "/overrided_topic_qos", + rclcpp::QoS{rclcpp::KeepLast(10)}.reliable().durability_volatile()} + }; + EXPECT_EQ(play_options.topic_qos_profile_overrides, topic_qos_profile_overrides); + EXPECT_EQ(play_options.loop, false); + EXPECT_EQ(play_options.clock_publish_frequency, 19.0); + std::vector clock_trigger_topics {"/triggers/clock"}; + EXPECT_EQ(play_options.clock_trigger_topics, clock_trigger_topics); + EXPECT_EQ(play_options.delay.nanoseconds(), 1); + EXPECT_DOUBLE_EQ(play_options.playback_duration.seconds(), -1.0); + EXPECT_EQ(play_options.playback_until_timestamp, -2500000000LL); + EXPECT_EQ(play_options.start_offset, 999999999); + EXPECT_EQ(play_options.wait_acked_timeout, -999999999); + EXPECT_EQ(play_options.disable_loan_message, false); + + EXPECT_EQ(storage_options.uri, uri_str); + EXPECT_EQ(storage_options.storage_id, GetParam()); + EXPECT_EQ(storage_options.storage_config_uri, ""); + EXPECT_EQ(storage_options.max_bagfile_size, 12345); + EXPECT_EQ(storage_options.max_bagfile_duration, 54321); + EXPECT_EQ(storage_options.max_cache_size, 9898); + EXPECT_EQ(storage_options.storage_preset_profile, "resilient"); + EXPECT_EQ(storage_options.snapshot_mode, false); + std::unordered_map custom_data{ + std::pair{"key1", "value1"}, + std::pair{"key2", "value2"} + }; + EXPECT_EQ(storage_options.custom_data, custom_data); +} + +TEST_P(ComposablePlayerIntegrationTests, player_can_automatically_play_file_after_composition) { + const size_t expected_number_of_messages = num_msgs_in_bag_; + auto load_node_request = std::make_shared(); + load_node_request->package_name = "rosbag2_transport"; + load_node_request->plugin_name = "rosbag2_transport::Player"; + + const std::string uri_str = (std::filesystem::path( + _SRC_RESOURCES_DIR_PATH) / GetParam() / "test_bag_for_seek").generic_string(); + rclcpp::Parameter uri("storage.uri", rclcpp::ParameterValue(uri_str)); + rclcpp::Parameter start_paused("play.start_paused", rclcpp::ParameterValue(true)); + + load_node_request->parameters.push_back(uri.to_parameter_msg()); + load_node_request->parameters.push_back(start_paused.to_parameter_msg()); + + auto load_node_future = load_node_client_->async_send_request(load_node_request); + auto ret = exec_->spin_until_future_complete(load_node_future, 10s); // Wait for the result. + ASSERT_EQ(ret, rclcpp::FutureReturnCode::SUCCESS); + auto result = load_node_future.get(); + EXPECT_EQ(result->success, true); + EXPECT_EQ(result->error_message, ""); + EXPECT_EQ(result->full_node_name, player_node_name_); + + sub_->add_subscription(topic_name_, expected_number_of_messages); + + ASSERT_TRUE(player_is_paused_service_request()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched({topic_name_}, std::chrono::seconds(30))); + + player_resume_service_request(); + + sub_->spin_subscriptions_sync(); + auto replayed_topic = sub_->get_received_messages(topic_name_); + ASSERT_THAT(replayed_topic, SizeIs(expected_number_of_messages)); +} + +INSTANTIATE_TEST_SUITE_P( + ParametrizedComposablePlayerTests, + ComposablePlayerTests, + ValuesIn(rosbag2_test_common::kTestedStorageIDs) +); + +INSTANTIATE_TEST_SUITE_P( + ParametrizedComposablePlayerIntegrationTests, + ComposablePlayerIntegrationTests, + ValuesIn(rosbag2_test_common::kTestedStorageIDs) +); diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp index 34490c4c08..54dbbe055e 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp @@ -13,40 +13,142 @@ // limitations under the License. #include + +#include #include -#include "rclcpp/rclcpp.hpp" -#include "rosbag2_transport_test_fixture.hpp" +#include "composition_manager_test_fixture.hpp" +#include "rosbag2_cpp/reader.hpp" +#include "rosbag2_test_common/publication_manager.hpp" +#include "rosbag2_test_common/memory_management.hpp" +#include "rosbag2_test_common/tested_storage_ids.hpp" +#include "rosbag2_test_common/temporary_directory_fixture.hpp" #include "rosbag2_transport/recorder.hpp" +#include "test_msgs/message_fixtures.hpp" + +using namespace std::chrono_literals; // NOLINT +using namespace ::testing; // NOLINT +using namespace rosbag2_test_common; // NOLINT -class ComposableRecorderTestFixture : public Rosbag2TransportTestFixture +class ComposableRecorderIntegrationTests : public CompositionManagerTestFixture { public: - ComposableRecorderTestFixture() - : Rosbag2TransportTestFixture() + std::string get_bag_file_name(int split_index) const + { + std::stringstream bag_file_name; + bag_file_name << get_test_name() << "_" << GetParam() << "_" << split_index; + + return bag_file_name.str(); + } + + std::filesystem::path get_bag_file_path(int split_index) + { + return root_bag_path_ / get_relative_bag_file_path(split_index); + } + + std::filesystem::path get_relative_bag_file_path(int split_index) const + { + const auto storage_id = GetParam(); + return std::filesystem::path( + rosbag2_test_common::bag_filename_for_storage_id(get_bag_file_name(split_index), storage_id)); + } + + void wait_for_metadata(std::chrono::duration timeout = std::chrono::seconds(10)) const + { + rosbag2_storage::MetadataIo metadata_io; + const auto start_time = std::chrono::steady_clock::now(); + const auto bag_path = root_bag_path_.generic_string(); + + while (std::chrono::steady_clock::now() - start_time < timeout && rclcpp::ok()) { + if (metadata_io.metadata_file_exists(bag_path)) { + return; + } + std::this_thread::sleep_for(50ms); + } + ASSERT_EQ(metadata_io.metadata_file_exists(bag_path), true) + << "Could not find metadata file: \"" << bag_path.c_str() << "\""; + } + + void wait_for_storage_file(std::chrono::duration timeout = std::chrono::seconds(10)) + { + const auto storage_path = get_bag_file_path(0); + const auto start_time = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start_time < timeout && rclcpp::ok()) { + if (std::filesystem::exists(storage_path)) { + return; + } + std::this_thread::sleep_for(50ms); // wait a bit to not query constantly + } + ASSERT_EQ(std::filesystem::exists(storage_path), true) + << "Could not find storage file: \"" << storage_path.generic_string() << "\""; + } + + template + std::vector> get_messages_for_topic(const std::string & topic) + { + auto filter = rosbag2_storage::StorageFilter{}; + filter.topics.push_back(topic); + + std::unique_ptr reader = std::make_unique(); + reader->open(root_bag_path_.generic_string()); + reader->set_filter(filter); + + auto messages = std::vector>{}; + while (reader->has_next()) { + auto msg = reader->read_next(); + messages.push_back(memory_management_.deserialize_message(msg->serialized_data)); + } + return messages; + } + +protected: + MemoryManagement memory_management_{}; +}; + +class ComposableRecorderTests + : public rosbag2_test_common::ParametrizedTemporaryDirectoryFixture +{ +public: + void SetUp() override { rclcpp::init(0, nullptr); + auto bag_name = get_test_name() + "_" + GetParam(); + root_bag_path_ = std::filesystem::path(temporary_dir_path_) / bag_name; + + // Clean up potentially leftover bag files. + // There may be leftovers if the system reallocates a temp directory + // used by a previous test execution and the test did not have a clean exit. + std::filesystem::remove_all(root_bag_path_); } - ~ComposableRecorderTestFixture() override + void TearDown() override { rclcpp::shutdown(); + std::filesystem::remove_all(root_bag_path_); } + + std::string get_test_name() const + { + const auto * test_info = UnitTest::GetInstance()->current_test_info(); + std::string test_name = test_info->name(); + // Replace any slashes in the test name, since it is used in paths + std::replace(test_name.begin(), test_name.end(), '/', '_'); + return test_name; + } + + std::filesystem::path root_bag_path_; }; -class ComposableRecorder : public rosbag2_transport::Recorder +class MockComposableRecorder : public rosbag2_transport::Recorder { public: static const char demo_attribute_name_[]; bool demo_attribute_value{false}; - explicit ComposableRecorder(const rclcpp::NodeOptions & options) - : rosbag2_transport::Recorder( - std::make_shared(), - rosbag2_storage::StorageOptions(), - rosbag2_transport::RecordOptions(), - "test_recorder_component", - options) + explicit MockComposableRecorder( + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions(), + const std::string & node_name = "rosbag2_mock_composable_recorder") + : Recorder(node_name, node_options) { // Declare demo attribute parameter for the underlying node with default value equal to false. // However, if node was created with option to override this parameter it will be settled up @@ -65,20 +167,149 @@ class ComposableRecorder : public rosbag2_transport::Recorder } return ret_value; } + + using rosbag2_transport::Recorder::get_storage_options; + using rosbag2_transport::Recorder::get_record_options; }; -const char ComposableRecorder::demo_attribute_name_[] = "demo_attribute"; +const char MockComposableRecorder::demo_attribute_name_[] = "demo_attribute"; -TEST_F(ComposableRecorderTestFixture, recorder_inner_params_passed_as_append_override) +TEST_P(ComposableRecorderTests, recorder_inner_params_passed_as_append_override) { std::vector parameters; - parameters.emplace_back(ComposableRecorder::demo_attribute_name_, true); + parameters.emplace_back(MockComposableRecorder::demo_attribute_name_, true); + parameters.emplace_back("storage.uri", rclcpp::ParameterValue(root_bag_path_.generic_string())); auto options = rclcpp::NodeOptions() .use_global_arguments(false) .parameter_overrides(parameters); - auto recorder = std::make_shared(options); + auto recorder = std::make_shared(options); // Check that rosbag2_transport::Recorder inner params will not erase our // parameter_overrides options ASSERT_TRUE(recorder->get_value_of_bool_parameter(recorder->demo_attribute_name_)); ASSERT_TRUE(recorder->demo_attribute_value); } + +TEST_P(ComposableRecorderTests, recorder_can_parse_parameters_from_file) { + // _SRC_RESOURCES_DIR_PATH defined in CMakeLists.txt + rclcpp::NodeOptions opts; + opts.arguments( + { + "--ros-args", + "--params-file", _SRC_RESOURCES_DIR_PATH "/recorder_node_params.yaml" + }); + opts.append_parameter_override( + "record.qos_profile_overrides_path", + _SRC_RESOURCES_DIR_PATH "/qos_profile_overrides.yaml"); + opts.append_parameter_override("storage.uri", root_bag_path_.generic_string()); + opts.append_parameter_override("storage.storage_id", GetParam()); + + auto recorder = std::make_shared(opts, "recorder_params_node"); + auto record_options = recorder->get_record_options(); + auto storage_options = recorder->get_storage_options(); + + EXPECT_EQ(record_options.all, true); + EXPECT_EQ(record_options.is_discovery_disabled, true); + std::vector topics {"/topic", "/other_topic"}; + EXPECT_EQ(record_options.topics, topics); + EXPECT_EQ(record_options.rmw_serialization_format, "cdr"); + EXPECT_TRUE(record_options.topic_polling_interval == 0.01s); + EXPECT_EQ(record_options.regex, "[xyz]/topic"); + EXPECT_EQ(record_options.exclude, "*"); + EXPECT_EQ(record_options.node_prefix, "prefix"); + EXPECT_EQ(record_options.compression_mode, "stream"); + EXPECT_EQ(record_options.compression_format, "h264"); + EXPECT_EQ(record_options.compression_queue_size, 10); + EXPECT_EQ(record_options.compression_threads, 2); + std::unordered_map topic_qos_profile_overrides{ + std::pair{ + "/overrided_topic_qos", + rclcpp::QoS{rclcpp::KeepLast(10)}.reliable().durability_volatile()} + }; + EXPECT_EQ(record_options.topic_qos_profile_overrides, topic_qos_profile_overrides); + EXPECT_EQ(record_options.include_hidden_topics, true); + EXPECT_EQ(record_options.include_unpublished_topics, true); + EXPECT_EQ(record_options.ignore_leaf_topics, false); + EXPECT_EQ(record_options.start_paused, false); + EXPECT_EQ(record_options.use_sim_time, false); + + EXPECT_EQ(storage_options.uri, root_bag_path_.generic_string()); + EXPECT_EQ(storage_options.storage_id, GetParam()); + EXPECT_EQ(storage_options.storage_config_uri, ""); + EXPECT_EQ(storage_options.max_bagfile_size, 2147483646); + EXPECT_EQ(storage_options.max_bagfile_duration, 2147483646); + EXPECT_EQ(storage_options.max_cache_size, 989888); + EXPECT_EQ(storage_options.storage_preset_profile, "none"); + EXPECT_EQ(storage_options.snapshot_mode, false); + std::unordered_map custom_data{ + std::pair{"key1", "value1"}, + std::pair{"key2", "value2"} + }; + EXPECT_EQ(storage_options.custom_data, custom_data); + EXPECT_EQ(storage_options.start_time_ns, 0); + EXPECT_EQ(storage_options.end_time_ns, 100000); +} + +TEST_P( + ComposableRecorderIntegrationTests, + recorder_can_automatically_start_recording_after_composition) { + const size_t num_messages_to_publish = 5; + auto string_message = get_messages_strings()[0]; + string_message->string_value = "Hello World"; + const std::string test_topic_name = "/composable_recorder_test_string_topic"; + + rosbag2_test_common::PublicationManager pub_manager; + pub_manager.setup_publisher(test_topic_name, string_message, num_messages_to_publish); + + // Load composable recorder node + auto load_node_request = std::make_shared(); + load_node_request->package_name = "rosbag2_transport"; + load_node_request->plugin_name = "rosbag2_transport::Recorder"; + + rclcpp::Parameter uri("storage.uri", rclcpp::ParameterValue(root_bag_path_.generic_string())); + rclcpp::Parameter storage_id("storage.storage_id", GetParam()); + rclcpp::Parameter disable_all("record.all", false); + rclcpp::Parameter topics("record.topics", std::vector{test_topic_name}); + + load_node_request->parameters.push_back(uri.to_parameter_msg()); + load_node_request->parameters.push_back(storage_id.to_parameter_msg()); + load_node_request->parameters.push_back(disable_all.to_parameter_msg()); + load_node_request->parameters.push_back(topics.to_parameter_msg()); + + auto load_node_future = load_node_client_->async_send_request(load_node_request); + // Wait for the response + auto load_node_ret = exec_->spin_until_future_complete(load_node_future, 10s); + ASSERT_EQ(load_node_ret, rclcpp::FutureReturnCode::SUCCESS); + auto load_node_response = load_node_future.get(); + EXPECT_EQ(load_node_response->success, true); + EXPECT_EQ(load_node_response->error_message, ""); + EXPECT_EQ(load_node_response->full_node_name, "/rosbag2_recorder"); + + ASSERT_TRUE(pub_manager.wait_for_matched(test_topic_name.c_str())) << + "Expected to find " << test_topic_name.c_str() << " subscription"; + + wait_for_storage_file(); + + pub_manager.run_publishers(); + + // Unload composed recorder node + unload_node(load_node_response->unique_id); + + wait_for_metadata(); + auto test_topic_messages = get_messages_for_topic(test_topic_name); + // We shutdown node right after publishing messages. We can't guarantee that all published + // messages will be delivered and recorded. Some messages could be still in the DDS or pub/sub + // queues. Make sure that we recorded at least one. + EXPECT_THAT(test_topic_messages, SizeIs(Ge(1))); +} + +INSTANTIATE_TEST_SUITE_P( + ParametrizedComposableRecorderIntegrationTests, + ComposableRecorderIntegrationTests, + ValuesIn(rosbag2_test_common::kTestedStorageIDs) +); + +INSTANTIATE_TEST_SUITE_P( + ParametrizedComposableRecorderTests, + ComposableRecorderTests, + ValuesIn(rosbag2_test_common::kTestedStorageIDs) +); diff --git a/rosbag2_transport/test/rosbag2_transport/test_load_composable_components.cpp b/rosbag2_transport/test/rosbag2_transport/test_load_composable_components.cpp new file mode 100644 index 0000000000..10829e46b7 --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_load_composable_components.cpp @@ -0,0 +1,97 @@ +// Copyright 2023, Patrick Roncagliolo +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "composition_manager_test_fixture.hpp" +#include "rosbag2_test_common/tested_storage_ids.hpp" + +using namespace std::chrono_literals; // NOLINT + +TEST_P(CompositionManagerTestFixture, test_load_play_component) +{ + std::string path{_SRC_RESOURCES_DIR_PATH "/player_node_params.yaml"}; + auto pm = rclcpp::parameter_map_from_yaml_file(path); + auto pl = rclcpp::parameters_from_map(pm, "/player_params_node"); + auto request = std::make_shared(); + request->package_name = "rosbag2_transport"; + request->plugin_name = "rosbag2_transport::Player"; + for (const auto & p : pl) { + request->parameters.push_back(p.to_parameter_msg()); + } + + rclcpp::Parameter qos_profile_overrides_path("play.qos_profile_overrides_path", + rclcpp::ParameterValue(_SRC_RESOURCES_DIR_PATH "/qos_profile_overrides.yaml")); + + const std::string uri_str = (std::filesystem::path( + _SRC_RESOURCES_DIR_PATH) / GetParam() / "test_bag_for_seek").generic_string(); + rclcpp::Parameter uri("storage.uri", rclcpp::ParameterValue(uri_str)); + rclcpp::Parameter storage_id("storage.storage_id", GetParam()); + + request->parameters.push_back(qos_profile_overrides_path.to_parameter_msg()); + request->parameters.push_back(uri.to_parameter_msg()); + request->parameters.push_back(storage_id.to_parameter_msg()); + + auto future = load_node_client_->async_send_request(request); + auto ret = exec_->spin_until_future_complete(future, 10s); // Wait for the result. + ASSERT_EQ(ret, rclcpp::FutureReturnCode::SUCCESS); + auto result = future.get(); + EXPECT_EQ(result->success, true); + EXPECT_EQ(result->error_message, ""); + EXPECT_EQ(result->full_node_name, "/rosbag2_player"); + EXPECT_EQ(result->unique_id, 1u); + + // Unload composed player node + unload_node(result->unique_id); +} + +TEST_P(CompositionManagerTestFixture, test_load_record_component) +{ + std::string path{_SRC_RESOURCES_DIR_PATH "/recorder_node_params.yaml"}; + auto pm = rclcpp::parameter_map_from_yaml_file(path); + auto pl = rclcpp::parameters_from_map(pm, "/recorder_params_node"); + auto request = std::make_shared(); + request->package_name = "rosbag2_transport"; + request->plugin_name = "rosbag2_transport::Recorder"; + for (const auto & p : pl) { + request->parameters.push_back(p.to_parameter_msg()); + } + + rclcpp::Parameter qos_profile_overrides_path("record.qos_profile_overrides_path", + rclcpp::ParameterValue(_SRC_RESOURCES_DIR_PATH "/qos_profile_overrides.yaml")); + rclcpp::Parameter uri("storage.uri", rclcpp::ParameterValue(root_bag_path_.generic_string())); + rclcpp::Parameter storage_id("storage.storage_id", GetParam()); + + request->parameters.push_back(qos_profile_overrides_path.to_parameter_msg()); + request->parameters.push_back(uri.to_parameter_msg()); + request->parameters.push_back(storage_id.to_parameter_msg()); + + auto future = load_node_client_->async_send_request(request); + auto ret = exec_->spin_until_future_complete(future, 10s); // Wait for the result. + ASSERT_EQ(ret, rclcpp::FutureReturnCode::SUCCESS); + auto result = future.get(); + EXPECT_EQ(result->success, true); + EXPECT_EQ(result->error_message, ""); + EXPECT_EQ(result->full_node_name, "/rosbag2_recorder"); + EXPECT_EQ(result->unique_id, 1u); + + // Unload composed recorder node + unload_node(result->unique_id); +} + +INSTANTIATE_TEST_SUITE_P( + ParametrizedLoadComposableComponentsTests, + CompositionManagerTestFixture, + ValuesIn(rosbag2_test_common::kTestedStorageIDs) +); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_options.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_options.cpp index e305229152..d95d21c644 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_options.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_options.cpp @@ -49,5 +49,5 @@ TEST(record_options, test_yaml_serialization) CHECK(is_discovery_disabled); CHECK(topics); CHECK(rmw_serialization_format); - #undef CMP + #undef CHECK }