Skip to content

Commit

Permalink
add export of Masayuki Masuda's rosbag code
Browse files Browse the repository at this point in the history
  • Loading branch information
EricCousineau-TRI committed Jun 13, 2024
1 parent f2df1c6 commit 9bae5d6
Show file tree
Hide file tree
Showing 7 changed files with 599 additions and 0 deletions.
4 changes: 4 additions & 0 deletions ros/rosbag2_py_lib_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Rough export of internal code base for library-based rosbag2 Python recording mechanism.
It comes from internal codebase, using Bazel, and leveraging some components from `drake-ros`.

Relates <https://github.com/ros2/rosbag2/issues/1678>
44 changes: 44 additions & 0 deletions ros/rosbag2_py_lib_example/demonstration_rosbag_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""
This script contains a class for managing rosbag record used in demonstration.
"""

import atexit
import signal
import subprocess

from example.rosbag_interface import Recorder


class RecorderWithPolling:
def __init__(self, recorder, proc=None):
self._recorder = recorder
self._proc = proc

def start(self, output):
self._recorder.start(output)

def stop(self):
self._recorder.stop()

def cancel(self):
self._recorder.cancel()

def poll(self):
assert self._recorder.is_recording()
if self._proc is not None:
ret_code = self._proc.poll()
assert ret_code is None, ret_code


class RosbagConfig:
def __init__(
self,
storage_id: str = "mcap",
regex: str = "",
):
self._storage_id = storage_id
self._regex = regex

def create(self):
recorder = Recorder(storage_id=self._storage_id, regex=self._regex)
return RecorderWithPolling(recorder)
72 changes: 72 additions & 0 deletions ros/rosbag2_py_lib_example/rosbag_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Custom rosbag recording interface.
This is done so that we minimize the time needed to begin recording when
launching from an existing process.
"""

import os
import shutil

from rosbag2_py import StorageOptions

# ros2bag_py.Recorder.start() will mess with signal handlers. For more
# details, see https://github.com/ros2/rosbag2/issues/1678
# So we use own binding recorder
from example import rosbag_recording


def _delete_path(path):
if os.path.exists(path):
if os.path.isfile(path):
os.remove(path)
print(f"File {path} deleted.")
elif os.path.isdir(path):
shutil.rmtree(path)
print(f"Directory {path} and its contents deleted.")


class Recorder:
# TODO(masayuki): allow other options
def __init__(
self, storage_id="mcap", rmw_serialization_format="cdr", regex=""
):
self._recorder = rosbag_recording.Recorder()
self._storage_id = storage_id
self._rmw_serialization_format = rmw_serialization_format
self._regex = regex

self._output = None
self.num = 0

def start(self, output="/tmp/test.bag"):
assert output is not None
print(f"start rosbag recording. save path: {output}")

if os.path.exists(output):
raise RuntimeError(f"rosbag path already exists! {output}")
self._output = output

storage_options = StorageOptions(
uri=output,
storage_id=self._storage_id,
)
record_options = rosbag_recording.RecordOptions()
record_options.all = True
record_options.exclude = self._regex
record_options.rmw_serialization_format = (
self._rmw_serialization_format
)
self._recorder.record(storage_options, record_options)

def stop(self):
print(f"stop rosbag recording. file will be saved at {self._output}")
self._recorder.cancel()

def cancel(self):
print("cancel rosbag recording.")
self._recorder.cancel()
_delete_path(self._output)

def is_recording(self):
return self._recorder.is_recording()
113 changes: 113 additions & 0 deletions ros/rosbag2_py_lib_example/rosbag_recording_py.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#include <stdexcept>
#include <string>

#include <pybind11/pybind11.h>
#include <rclcpp/rclcpp.hpp>
#include <rosbag2_storage/storage_options.hpp>
#include <rosbag2_storage/yaml.hpp>
#include <rosbag2_transport/reader_writer_factory.hpp>
#include <rosbag2_transport/record_options.hpp>
#include <rosbag2_transport/recorder.hpp>

#include "drake/common/drake_assert.h"

namespace py = pybind11;

namespace example {

// from https://github.com/ros2/rosbag2/blob/8c94497/rosbag2_py/src/rosbag2_py/_transport.cpp#L174-L262
class Recorder {
public:
Recorder() : is_recording_(false), record_count_(0) {
DRAKE_DEMAND(rclcpp::ok());
}

~Recorder() {
cancel();
}

void spin_node() {
{
std::lock_guard<std::mutex> lock(wait_for_start_th_mutex_);
is_recording_ = true;
wait_for_start_th_cv_.notify_one();
}
executor_.spin();
}

void record(
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::RecordOptions & record_options) {
if (is_recording_) {
throw std::runtime_error("already recording!!");
}
record_count_++;

DRAKE_DEMAND(!record_options.rmw_serialization_format.empty());

auto writer =
rosbag2_transport::ReaderWriterFactory::make_writer(record_options);
auto node_name = "rosbag2_recorder_" + std::to_string(record_count_);
recorder_ = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer), storage_options, record_options, node_name);
recorder_->record();
executor_.add_node(recorder_);

recording_thread_ = std::thread(&Recorder::spin_node, this);
// ensure spin start
std::unique_lock<std::mutex> lock(wait_for_start_th_mutex_);
wait_for_start_th_cv_.wait(lock, [&]{ return is_recording_; });
}

void cancel() {
if (!is_recording_) {
return;
}
is_recording_ = false;
recorder_->stop();
executor_.cancel();
if (recording_thread_.joinable()) {
recording_thread_.join();
}
executor_.remove_node(recorder_);
}

bool is_recording() const {
return is_recording_;
}

private:
bool is_recording_;
int record_count_;
std::thread recording_thread_;
std::condition_variable wait_for_start_th_cv_;
std::mutex wait_for_start_th_mutex_;
std::shared_ptr<rosbag2_transport::Recorder> recorder_;
rclcpp::executors::SingleThreadedExecutor executor_;
};


PYBIND11_MODULE(rosbag_recording, m) {
py::class_<Recorder>(m, "Recorder")
.def(py::init<>())
.def("record", &Recorder::record,
py::arg("storage_options"), py::arg("record_options"))
.def("cancel", &Recorder::cancel)
.def("is_recording", &Recorder::is_recording);

{
// we need to bind this because Recorder.record() couldn't accept
// ros2bag_py.RecordOptions() and we don't not bind
// rosbag2_storage::StorageOptions() because import register issue
// occur when import rosbag_recording(this) and ros2bag_py both.
using RecordOptions = rosbag2_transport::RecordOptions;
py::class_<RecordOptions>(m, "RecordOptions")
.def(py::init<>())
.def_readwrite("all", &RecordOptions::all)
.def_readwrite("exclude", &RecordOptions::exclude)
.def_readwrite(
"rmw_serialization_format", &RecordOptions::rmw_serialization_format);
}
}

} // namespace example
Loading

0 comments on commit 9bae5d6

Please sign in to comment.