This example demonstrates use of the ROS2 bag reader.
The doctests in this example can be run from the command line by invoking a command with the following form:
> python3 -m doctest path/to/example.md
In order to illustrate the reader, a bag file must be available. Create a temporary directory to contain such a file.
>>> import tempfile
>>> from os import sep
>>> tempdir = tempfile.TemporaryDirectory()
>>> bag_name = 'bag_test'
>>> bag_path = f'{tempdir.name}{sep}{bag_name}'
Open a bag for storage.
>>> import rosbag2_py
>>> storage_options = rosbag2_py.StorageOptions(uri=bag_path,
... max_cache_size=0,
... storage_id='mcap')
>>> converter_options = rosbag2_py.ConverterOptions('', '')
>>> writer = rosbag2_py.SequentialWriter()
>>> writer.open(storage_options, converter_options)
Initialize topics in the bag.
>>> topic = 'test'
>>> topic_metadata \
... = rosbag2_py.TopicMetadata(name=topic,
... type='example_interfaces/msg/String',
... serialization_format='cdr')
>>> writer.create_topic(topic_metadata)
Write a message to the topic in the bag file.
>>> import rclpy.clock
>>> from rclpy.serialization import serialize_message
>>> from example_interfaces import msg
>>> clock = rclpy.clock.Clock()
>>> message = msg.String(data='Hello World!')
>>> writer.write(topic, serialize_message(message), clock.now().nanoseconds)
Write a second message.
>>> message.data = 'Goodybye World!'
>>> writer.write(topic, serialize_message(message), clock.now().nanoseconds)
Close the bag file.
>>> writer.close()
Initialize a bag reader.
>>> from nml_bag import Reader
>>> reader = Reader(f'{bag_path}{sep}{bag_name}_0.mcap',
... storage_id='mcap',
... serialization_format='cdr')
Display the topics.
>>> from pprint import pprint
>>> pprint(reader.topics)
['test']
Display the mapping between topics and message types.
>>> pprint(reader.type_map)
{'test': 'example_interfaces/msg/String'}
Display the messages stored in the bag.
>>> pprint(reader.records) # doctest: +ELLIPSIS
[{'data': 'Hello World!',
'time_ns': ...,
'topic': 'test',
'type': 'example_interfaces/msg/String'},
{'data': 'Goodybye World!',
'time_ns': ...,
'topic': 'test',
'type': 'example_interfaces/msg/String'}]
Clean up the temporary files and directory.
>>> tempdir.cleanup()