Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-13335: Fix examples that assume invalid data cannot be ALIVE #645

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pipeline {
sh 'tar zxvf connextdds-staging-${CONNEXTDDS_ARCH}.tgz unlicensed/'

sh '''
cp ${RTI_INSTALLATION_PATH}/rti_connext_dds-*/lib/${CONNEXTDDS_ARCH}/openssl-1.*/* \
cp ${RTI_INSTALLATION_PATH}/rti_connext_dds-*/lib/${CONNEXTDDS_ARCH}/openssl-3.*/* \
${RTI_INSTALLATION_PATH}/rti_connext_dds-*/lib/${CONNEXTDDS_ARCH}/
'''
}
Expand Down
30 changes: 30 additions & 0 deletions examples/connext_dds/dynamic_data_skip_serialization/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Example Code: Skip Data Serialization for Data Recording

## Concept

There are scenarios where the need to deserialize or inspect data is not necessary.
A common example is recording data. In this case, the application can simply
record the binary data as it is received, and then replay it later. This provides
a significant performance improvement.

The DynamicData API provides a mode that allows sending or receiving data in its
CDR format, without serializing or deserializing it.

Note that while Connext provides a Recording Service, there may be situations
where a custom Recording application may be useful.

## Example Description

This example implements a simple recording application that uses the DynamicData
API to receive data in CDR format and directly record it in a file. The example
also provides a replay option that reads the data buffers from the file
and publishes them back. For convenience, an option to publish a few samples
to test the record and replay functionality is also provided.

The key parts of the example are implemented in the ``record()`` and
``replay()`` functions in the example source code for each language.

The example is very simple and uses a single type and topic, but it could be
extended to use discovered types (see the
[built-in topics example](../builtin_topics/)) to implement a more
general-purpose recording application.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved.
#
# RTI grants Licensee a license to use, modify, compile, and create derivative
# works of the Software. Licensee has the right to distribute object form
# only for use with RTI products. The Software is provided "as is", with no
# warranty of any type, including any warranty for fitness for any purpose.
# RTI is under no obligation to maintain or support the Software. RTI shall
# not be liable for any incidental or consequential damages arising out of the
# use or inability to use the software.
#
cmake_minimum_required(VERSION 3.11)
project(rtiexamples-dynamic-data-skip-serialization)
list(APPEND CMAKE_MODULE_PATH
"${CMAKE_CURRENT_SOURCE_DIR}/../../../../resources/cmake/Modules"
)
include(ConnextDdsConfigureCmakeUtils)
connextdds_configure_cmake_utils()

# Find the RTI Connext DDS libraries
if(NOT RTIConnextDDS_FOUND)
find_package(RTIConnextDDS
"7.3.0"
REQUIRED
COMPONENTS
core
)
endif()

set(CMAKE_CXX_STANDARD 11)
set(PLATFORM_MODERN_CXX_STANDARD 11)

add_executable(recorder_cxx2
"${CMAKE_CURRENT_SOURCE_DIR}/util.cxx"
"${CMAKE_CURRENT_SOURCE_DIR}/recorder.cxx"
)

target_link_libraries(recorder_cxx2
PUBLIC
RTIConnextDDS::cpp2_api
)

target_include_directories(recorder_cxx2
PRIVATE
"${CMAKE_CURRENT_BINARY_DIR}/src"
)

set_target_properties(recorder_cxx2
PROPERTIES
OUTPUT_NAME "recorder")


if(CMAKE_SYSTEM_NAME MATCHES "Linux" AND CMAKE_CXX_COMPILER_ID MATCHES "GNU")
set_target_properties(recorder_cxx2
PROPERTIES
LINK_FLAGS -Wl,--no-as-needed)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Example Code: Skip Data Serialization for Data Recording

## Building the Example :wrench:

To build this example, first run CMake to generate the corresponding build
files. We recommend you use a separate directory to store all the generated
files (e.g., ./build).

```sh
mkdir build
cd build
cmake ..
```

Once you have run CMake, you will find a number of new files in your build
directory (the list of generated files will depend on the specific CMake
Generator). To build the example, run CMake as follows:

```sh
cmake --build .
```

**Note**: if you are using a multi-configuration generator, such as Visual
Studio solutions, you can specify the configuration mode to build as follows:

```sh
cmake --build . --config Release|Debug
```

Alternatively, you can use directly the generated infrastructure (e.g.,
Makefiles or Visual Studio Solutions) to build the example. If you generated
Makefiles in the configuration process, run make to build the example. Likewise,
if you generated a Visual Studio solution, open the solution and follow the
regular build process.

## Running the Example

From the build directory, in one command prompt run the recorder:

```sh
./recorder --record data.bin
```

In a second command prompt run the publisher:

```sh
./recorder --publish
```

The recorder application will print a message each time a sample is recorded.

Now kill the recorder and run the replay application. A file called `data.bin`
will have been created in the current directory.

Now we will run a subscriber and a reply application.

To subscribe to the data we will simply use **rtiddsspy**:

```sh
<Connext installation>/bin/rtiddsspy -printSample
```

Now run the application that replays the data recorded in `data.bin`:

```sh
./recorder --replay data.bin
```

The subscriber will print the data that is being replayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* (c) Copyright, Real-Time Innovations, 2023. All rights reserved.
* RTI grants Licensee a license to use, modify, compile, and create derivative
* works of the software solely for use with RTI Connext DDS. Licensee may
* redistribute copies of the software provided that all such copies are subject
* to this license. The software is provided "as is", with no warranty of any
* type, including any warranty for fitness for any purpose. RTI is under no
* obligation to maintain or support the software. RTI shall not be liable for
* any incidental or consequential damages arising out of the use or inability
* to use the software.
*/

#include <fstream>
#include <iostream>
#include <string>
#include <rti/rti.hpp>

#include "util.hpp"

dds::core::xtypes::StructType create_type()
{
using namespace dds::core::xtypes;

StructType type(
"RecordExample",
{ Member("id", StringType(128)),
Member("payload",
SequenceType(primitive_type<int32_t>(), 1024)) });

return type;
}

void record(const std::string &file_name, int domain_id)
{
using dds::core::xtypes::DynamicData;

// To disable the deserialization on the DataReader and get direct access
// to the serialized CDR buffer, we need to register the DataReader type
// with the DynamicDataTypeSerializationProperty::skip_deserialization set
// to true, and then create the DataReader with the name used to register
// the type.
dds::domain::DomainParticipant participant(domain_id);
const std::string type_name = "RecordExample";
rti::core::xtypes::DynamicDataTypeSerializationProperty property;
property.skip_deserialization(true);
rti::domain::register_type(participant, type_name, create_type(), property);

dds::topic::Topic<DynamicData> topic(
participant,
"Example Record",
type_name); // specify the registered type name

auto qos = dds::core::QosProvider::Default().datareader_qos(
rti::core::builtin_profiles::qos_lib::generic_strict_reliable());
dds::sub::DataReader<DynamicData> reader(topic, qos);
if (!util::wait_for_writer(reader)) {
return;
}

// File setup
std::ofstream out_file(file_name, std::ios::binary);
if (!out_file) {
std::cerr << "Failed to open file for recording: " << file_name
<< std::endl;
return;
}

auto record_data = [&out_file, &reader]() {
auto samples = reader.take();
for (auto sample : samples) {
if (!sample.info().valid()) {
continue;
}

// Now the only way to access the data is to call get_cdr_buffer,
// any other field accessor will fail.
auto buffer_info = sample.data().get_cdr_buffer();
auto buffer = buffer_info.first;
auto buffer_length = buffer_info.second;
std::cout << "Recording data sample (" << buffer_length << " bytes)"
<< std::endl;
out_file.write(
reinterpret_cast<const char *>(&buffer_length),
sizeof(buffer_length));
out_file.write(
reinterpret_cast<const char *>(buffer),
buffer_length);
}
};

// Set up a ReadCondition to trigger the record_data function when
// data is available
dds::core::cond::WaitSet waitset;
dds::sub::cond::ReadCondition read_condition(
reader,
dds::sub::status::DataState::any(),
record_data);
waitset += read_condition;

while (!application::shutdown_requested) {
waitset.dispatch(dds::core::Duration(1));
}
}

void replay(const std::string &file_name, int domain_id)
{
using dds::core::xtypes::DynamicData;

dds::domain::DomainParticipant participant(domain_id);

// For the replay application we don't need to register the type with any
// particular property because DynamicData DataWriters are always prepared
// to write serialized buffers directly
dds::topic::Topic<DynamicData> topic(
participant,
"Example Record",
create_type());

auto qos = dds::core::QosProvider::Default().datawriter_qos(
rti::core::builtin_profiles::qos_lib::generic_strict_reliable());
dds::pub::DataWriter<DynamicData> writer(topic, qos);
if (!util::wait_for_reader(writer)) {
return;
}

std::ifstream in_file(file_name, std::ios::binary);
if (!in_file) {
std::cerr << "Failed to open file for replay: " << file_name
<< std::endl;
return;
}

uint32_t length;
std::vector<char> buffer;
DynamicData sample(create_type());
while (!in_file.eof()) {
// read length and data
if (!in_file.read(reinterpret_cast<char *>(&length), sizeof(length))) {
break;
}

std::cout << "Replaying data sample (" << length << " bytes)"
<< std::endl;
buffer.resize(length);
if (!in_file.read(buffer.data(), length)) {
break;
}

// By calling the set_cdr_buffer method we override the contents
// of the DynamicData object with the new serialized data. After
// setting a cdr buffer we can't use any field getters or setters.
sample.set_cdr_buffer(buffer.data(), length);
writer.write(sample);
}

in_file.close();
writer.wait_for_acknowledgments(dds::core::Duration(10));
}

int main(int argc, char *argv[])
{
using namespace application;

// Parse arguments and handle control-C
auto arguments = parse_arguments(argc, argv);
if (arguments.parse_result == ParseReturn::exit) {
return EXIT_SUCCESS;
} else if (arguments.parse_result == ParseReturn::failure) {
return EXIT_FAILURE;
}
setup_signal_handlers();

// Sets Connext verbosity to help debugging
rti::config::Logger::instance().verbosity(arguments.verbosity);

try {
if (arguments.application_type == ApplicationType::record) {
record(arguments.file_name, arguments.domain_id);
} else if (arguments.application_type == ApplicationType::replay) {
replay(arguments.file_name, arguments.domain_id);
} else if (arguments.application_type == ApplicationType::publish) {
util::publish_example_data(arguments.domain_id, create_type());
}

} catch (const std::exception &ex) {
std::cerr << "Exception in application: " << ex.what() << std::endl;
return EXIT_FAILURE;
}

// Releases the memory used by the participant factory. Optional at
// application exit
dds::domain::DomainParticipant::finalize_participant_factory();

return EXIT_SUCCESS;
}
Loading
Loading