Skip to content

Commit bf04c04

Browse files
author
Anas Abou Allaban
authored
[Compression - 8] Enable reader to read from compressed files/messages (#246)
* Enable reader to read from decompressed files & messages * Fix compression CMakeLists * Add utility functions for reader, add tests for compression options, write docs, fix styles * Add more unit tests, fix copyright years, nits * Constness and counters * Simplify and enhance tests * Fix windows build
1 parent 57a0b9a commit bf04c04

File tree

9 files changed

+397
-20
lines changed

9 files changed

+397
-20
lines changed

rosbag2_compression/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ install(
6060

6161
ament_export_include_directories(include)
6262
ament_export_libraries(${PROJECT_NAME}_zstd)
63-
ament_export_dependencies(rosbag2_storage rcutils zstd_vendor zstd)
63+
ament_export_dependencies(rosbag2_storage rcutils zstd_vendor)
6464

6565
if(BUILD_TESTING)
6666
find_package(ament_cmake_gmock REQUIRED)

rosbag2_cpp/CMakeLists.txt

+12-1
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ find_package(poco_vendor)
3434
find_package(Poco COMPONENTS Foundation)
3535
find_package(rcpputils REQUIRED)
3636
find_package(rcutils REQUIRED)
37+
find_package(rosbag2_compression REQUIRED)
3738
find_package(rosbag2_storage REQUIRED)
3839
find_package(rosidl_generator_cpp REQUIRED)
3940
find_package(rosidl_typesupport_cpp REQUIRED)
4041
find_package(rosidl_typesupport_introspection_cpp REQUIRED)
4142

4243
add_library(${PROJECT_NAME} SHARED
44+
src/rosbag2_cpp/compression_options.cpp
4345
src/rosbag2_cpp/converter.cpp
4446
src/rosbag2_cpp/info.cpp
4547
src/rosbag2_cpp/reader.cpp
@@ -57,6 +59,7 @@ ament_target_dependencies(${PROJECT_NAME}
5759
Poco
5860
rcpputils
5961
rcutils
62+
rosbag2_compression
6063
rosbag2_storage
6164
rosidl_generator_cpp
6265
rosidl_typesupport_introspection_cpp
@@ -88,7 +91,7 @@ install(
8891

8992
ament_export_include_directories(include)
9093
ament_export_libraries(${PROJECT_NAME})
91-
ament_export_dependencies(pluginlib rosbag2_storage rosidl_typesupport_introspection_cpp)
94+
ament_export_dependencies(pluginlib rosbag2_compression rosbag2_storage rosidl_typesupport_introspection_cpp)
9295

9396
if(BUILD_TESTING)
9497
find_package(ament_cmake_gmock REQUIRED)
@@ -109,6 +112,13 @@ if(BUILD_TESTING)
109112
RUNTIME DESTINATION bin)
110113
pluginlib_export_plugin_description_file(rosbag2_cpp test/rosbag2_cpp/converter_test_plugin.xml)
111114

115+
ament_add_gmock(test_compression_options
116+
test/rosbag2_cpp/test_compression_options.cpp)
117+
if(TARGET test_compression_options)
118+
target_include_directories(test_compression_options PRIVATE include)
119+
target_link_libraries(test_compression_options ${PROJECT_NAME})
120+
endif()
121+
112122
ament_add_gmock(test_converter_factory
113123
test/rosbag2_cpp/test_converter_factory.cpp)
114124
if(TARGET test_converter_factory)
@@ -156,6 +166,7 @@ if(BUILD_TESTING)
156166
ament_index_cpp
157167
Poco
158168
rcutils
169+
rosbag2_compression
159170
rosbag2_storage
160171
rosidl_generator_cpp
161172
rosidl_typesupport_introspection_cpp
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_
16+
#define ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_
17+
18+
#include <string>
19+
20+
#include "visibility_control.hpp"
21+
22+
namespace rosbag2_cpp
23+
{
24+
25+
/**
26+
* Modes are used to specify whether to compress by individual serialized bag messages or by file.
27+
* rosbag2_cpp defaults to NONE.
28+
*/
29+
enum class ROSBAG2_CPP_PUBLIC CompressionMode : uint32_t
30+
{
31+
NONE = 0,
32+
FILE,
33+
MESSAGE,
34+
LAST_MODE = MESSAGE
35+
};
36+
37+
/**
38+
* Converts a string into a rosbag2_cpp::CompressionMode enum.
39+
*
40+
* \param compression_mode A case insensitive string that is either "FILE" or "MESSAGE".
41+
* \return CompressionMode NONE if compression_mode is invalid. FILE or MESSAGE otherwise.
42+
*/
43+
ROSBAG2_CPP_PUBLIC CompressionMode compression_mode_from_string(const std::string & compression_mode);
44+
45+
/**
46+
* Converts a rosbag2_cpp::CompressionMode enum into a string.
47+
*
48+
* \param compression_mode A CompressionMode enum.
49+
* \return The corresponding mode as a string.
50+
*/
51+
ROSBAG2_CPP_PUBLIC std::string compression_mode_to_string(CompressionMode compression_mode);
52+
53+
} // namespace rosbag2_cpp
54+
#endif // ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_

rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp

+33
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
#include <string>
2020
#include <vector>
2121

22+
#include "rosbag2_cpp/compression_options.hpp"
2223
#include "rosbag2_cpp/converter.hpp"
2324
#include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp"
2425
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
2526
#include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp"
2627
#include "rosbag2_cpp/visibility_control.hpp"
2728

29+
#include "rosbag2_compression/base_decompressor_interface.hpp"
2830
#include "rosbag2_storage/metadata_io.hpp"
2931
#include "rosbag2_storage/storage_factory.hpp"
3032
#include "rosbag2_storage/storage_factory_interface.hpp"
@@ -121,14 +123,45 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
121123
const std::string & converter_serialization_format,
122124
const std::string & storage_serialization_format);
123125

126+
/**
127+
* Opens a storage plugin for read only.
128+
*
129+
* \throws std::runtime_error If no storage could be initialized.
130+
*/
131+
virtual void open_storage();
132+
133+
/**
134+
* Initializes the decompressor if a compression mode is specified in the metadata.
135+
*
136+
* \throws std::runtime_error If compression format doesn't exist.
137+
*/
138+
virtual void setup_compression();
139+
124140
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_{};
125141
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_{};
126142
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInterface> storage_{};
127143
std::unique_ptr<Converter> converter_{};
144+
std::unique_ptr<rosbag2_compression::BaseDecompressorInterface> decompressor_{};
128145
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_{};
129146
rosbag2_storage::BagMetadata metadata_{};
130147
std::vector<std::string> file_paths_{}; // List of database files.
131148
std::vector<std::string>::iterator current_file_iterator_{}; // Index of file to read from
149+
rosbag2_cpp::CompressionMode compression_mode_{rosbag2_cpp::CompressionMode::NONE};
150+
151+
protected:
152+
/**
153+
* Checks if the compression mode is of type MESSAGE and if so, decompresses the message.
154+
*
155+
* \param message A serialized bag message
156+
*/
157+
virtual void decompress_message(rosbag2_storage::SerializedBagMessage * message);
158+
159+
/**
160+
* Checks if the compression mode is of type FILE and if so, decompresses the file.
161+
*
162+
* \param uri Relative path as a string
163+
*/
164+
virtual void decompress_file(const std::string & uri);
132165
};
133166

134167
} // namespace readers

rosbag2_cpp/package.xml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
<depend>pluginlib</depend>
1414
<depend>poco_vendor</depend>
1515
<depend>rcutils</depend>
16+
<depend>rosbag2_compression</depend>
1617
<depend>rosbag2_storage</depend>
1718
<depend>rosidl_generator_cpp</depend>
1819
<depend>rosidl_typesupport_cpp</depend>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <algorithm>
16+
#include <string>
17+
18+
#include "rosbag2_cpp/compression_options.hpp"
19+
#include "rosbag2_cpp/logging.hpp"
20+
21+
namespace rosbag2_cpp
22+
{
23+
24+
namespace
25+
{
26+
27+
constexpr const char kCompressionModeNoneStr[] = "NONE";
28+
constexpr const char kCompressionModeFileStr[] = "FILE";
29+
constexpr const char kCompressionModeMessageStr[] = "MESSAGE";
30+
31+
std::string to_upper(const std::string & text)
32+
{
33+
std::string uppercase_text = text;
34+
std::transform(uppercase_text.begin(), uppercase_text.end(), uppercase_text.begin(), ::toupper);
35+
return uppercase_text;
36+
}
37+
} // namespace
38+
39+
CompressionMode compression_mode_from_string(const std::string & compression_mode)
40+
{
41+
const auto compression_mode_upper = to_upper(compression_mode);
42+
if (compression_mode.empty() || compression_mode_upper == kCompressionModeNoneStr) {
43+
return CompressionMode::NONE;
44+
} else if (compression_mode_upper == kCompressionModeFileStr) {
45+
return CompressionMode::FILE;
46+
} else if (compression_mode_upper == kCompressionModeMessageStr) {
47+
return CompressionMode::MESSAGE;
48+
} else {
49+
ROSBAG2_CPP_LOG_ERROR_STREAM(
50+
"CompressionMode: \"" << compression_mode << "\" is not supported!");
51+
return CompressionMode::NONE;
52+
}
53+
}
54+
55+
std::string compression_mode_to_string(const CompressionMode compression_mode)
56+
{
57+
switch (compression_mode) {
58+
case CompressionMode::NONE:
59+
return kCompressionModeNoneStr;
60+
case CompressionMode::FILE:
61+
return kCompressionModeFileStr;
62+
case CompressionMode::MESSAGE:
63+
return kCompressionModeMessageStr;
64+
default:
65+
ROSBAG2_CPP_LOG_ERROR_STREAM("CompressionMode not supported!");
66+
return kCompressionModeNoneStr;
67+
}
68+
}
69+
} // namespace rosbag2_cpp

rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp

+46-12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <vector>
2020

2121
#include "rcpputils/filesystem_helper.hpp"
22+
#include "rosbag2_compression/zstd_decompressor.hpp"
2223

2324
#include "rosbag2_cpp/logging.hpp"
2425
#include "rosbag2_cpp/readers/sequential_reader.hpp"
@@ -48,6 +49,30 @@ void SequentialReader::reset()
4849
storage_.reset();
4950
}
5051

52+
void SequentialReader::open_storage()
53+
{
54+
storage_ = storage_factory_->open_read_only(
55+
*current_file_iterator_, metadata_.storage_identifier);
56+
if (!storage_) {
57+
throw std::runtime_error{"No storage could be initialized. Abort"};
58+
}
59+
}
60+
61+
void SequentialReader::setup_compression()
62+
{
63+
compression_mode_ = compression_mode_from_string(metadata_.compression_mode);
64+
if (compression_mode_ != rosbag2_cpp::CompressionMode::NONE) {
65+
// TODO(piraka9011): Replace this with a compressor_factory.
66+
if (metadata_.compression_format == "zstd") {
67+
decompressor_ = std::make_unique<rosbag2_compression::ZstdDecompressor>();
68+
} else {
69+
std::stringstream err;
70+
err << "Unsupported compression format " << metadata_.compression_format;
71+
throw std::runtime_error(err.str());
72+
}
73+
}
74+
}
75+
5176
void SequentialReader::open(
5277
const StorageOptions & storage_options, const ConverterOptions & converter_options)
5378
{
@@ -60,21 +85,12 @@ void SequentialReader::open(
6085
ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata.");
6186
return;
6287
}
63-
88+
open_storage();
89+
setup_compression();
6490
file_paths_ = metadata_.relative_file_paths;
6591
current_file_iterator_ = file_paths_.begin();
66-
67-
storage_ = storage_factory_->open_read_only(
68-
*current_file_iterator_, metadata_.storage_identifier);
69-
if (!storage_) {
70-
throw std::runtime_error("No storage could be initialized. Abort");
71-
}
7292
} else {
73-
storage_ = storage_factory_->open_read_only(
74-
storage_options.uri, storage_options.storage_id);
75-
if (!storage_) {
76-
throw std::runtime_error("No storage could be initialized. Abort");
77-
}
93+
open_storage();
7894
metadata_ = storage_->get_metadata();
7995
if (metadata_.relative_file_paths.empty()) {
8096
ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata.");
@@ -112,10 +128,19 @@ bool SequentialReader::has_next()
112128
throw std::runtime_error("Bag is not open. Call open() before reading.");
113129
}
114130

131+
void SequentialReader::decompress_message(
132+
rosbag2_storage::SerializedBagMessage * message)
133+
{
134+
if (compression_mode_ == CompressionMode::MESSAGE) {
135+
decompressor_->decompress_serialized_bag_message(message);
136+
}
137+
}
138+
115139
std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialReader::read_next()
116140
{
117141
if (storage_) {
118142
auto message = storage_->read_next();
143+
decompress_message(message.get());
119144
return converter_ ? converter_->convert(message) : message;
120145
}
121146
throw std::runtime_error("Bag is not open. Call open() before reading.");
@@ -134,10 +159,19 @@ bool SequentialReader::has_next_file() const
134159
return current_file_iterator_ + 1 != file_paths_.end();
135160
}
136161

162+
void SequentialReader::decompress_file(const std::string & uri)
163+
{
164+
if (compression_mode_ == CompressionMode::FILE) {
165+
ROSBAG2_CPP_LOG_DEBUG_STREAM("Decompressing " << uri);
166+
*current_file_iterator_ = decompressor_->decompress_uri(uri);
167+
}
168+
}
169+
137170
void SequentialReader::load_next_file()
138171
{
139172
assert(current_file_iterator_ != file_paths_.end());
140173
current_file_iterator_++;
174+
decompress_file(*current_file_iterator_);
141175
}
142176

143177
std::string SequentialReader::get_current_file() const

0 commit comments

Comments
 (0)