Skip to content

Latest commit

 

History

History
150 lines (102 loc) · 3.22 KB

example.md

File metadata and controls

150 lines (102 loc) · 3.22 KB

Usage example

This example demonstrates use of the ROS2 bag reader.

The doctests in this example can be run from the command line by invoking a command with the following form:

> python3 -m doctest path/to/example.md

Write to a bag file

In order to illustrate the reader, a bag file must be available. Create a temporary directory to contain such a file.

>>> import tempfile
>>> from os import sep
>>> tempdir = tempfile.TemporaryDirectory()
>>> bag_name = 'bag_test'
>>> bag_path = f'{tempdir.name}{sep}{bag_name}'

Open a bag for storage.

>>> import rosbag2_py
>>> storage_options = rosbag2_py.StorageOptions(uri=bag_path,
...                                             max_cache_size=0,
...                                             storage_id='mcap')
>>> converter_options = rosbag2_py.ConverterOptions('', '')
>>> writer = rosbag2_py.SequentialWriter()
>>> writer.open(storage_options, converter_options)

Initialize topics in the bag.

>>> topic = 'test'
>>> topic_metadata \
...  = rosbag2_py.TopicMetadata(name=topic,
...                             type='example_interfaces/msg/String',
...                             serialization_format='cdr')
>>> writer.create_topic(topic_metadata)

Write a message to the topic in the bag file.

>>> import rclpy.clock
>>> from rclpy.serialization import serialize_message
>>> from example_interfaces import msg
>>> clock = rclpy.clock.Clock()
>>> message = msg.String(data='Hello World!')
>>> writer.write(topic, serialize_message(message), clock.now().nanoseconds)

Write a second message.

>>> message.data = 'Goodybye World!'
>>> writer.write(topic, serialize_message(message), clock.now().nanoseconds)

Close the bag file.

>>> writer.close()

Read from the bag file

Initialize a bag reader.

>>> from nml_bag import Reader
>>> reader = Reader(f'{bag_path}{sep}{bag_name}_0.mcap', 
...                 storage_id='mcap', 
...                 serialization_format='cdr')

Display the topics.

>>> from pprint import pprint
>>> pprint(reader.topics)
['test']

Display the mapping between topics and message types.

>>> pprint(reader.type_map)
{'test': 'example_interfaces/msg/String'}

Display the messages stored in the bag.

>>> pprint(reader.records) # doctest: +ELLIPSIS
[{'data': 'Hello World!',
  'time_ns': ...,
  'topic': 'test',
  'type': 'example_interfaces/msg/String'},
 {'data': 'Goodybye World!',
  'time_ns': ...,
  'topic': 'test',
  'type': 'example_interfaces/msg/String'}]

Cleanup

Clean up the temporary files and directory.

>>> tempdir.cleanup()