Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21313] Type propagation policy #5081

Merged
merged 22 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1f27150
Refs #21313: Substitute auto_fill_type_information with fastdds.type_…
EduPonz Jul 12, 2024
b4dd71f
Refs #21313: Rework DynamicPubSubType.hpp doxydoc
EduPonz Jul 16, 2024
3637a36
Refs #21313: Add minimal bandwidth type propagation
EduPonz Jul 17, 2024
9d8ec86
Refs #21313: Add registration only bandwidth type propagation
EduPonz Jul 17, 2024
de2a56d
Refs #21313: Only init TypeLookupManager when required
EduPonz Jul 17, 2024
85a350e
Refs #21313: Add entry to versions.md
EduPonz Jul 17, 2024
28f0b6d
Refs #21313: Remove unrecheable code
EduPonz Jul 19, 2024
de9b015
Refs #21313: Improve tests
EduPonz Jul 19, 2024
7a282da
Refs #21313: Remove unnecessary includes
EduPonz Jul 19, 2024
94b3b62
Refs #21313: Add RTPSParticipantImplt::type_propagation API to group …
EduPonz Jul 19, 2024
b1256ed
Refs #21313: Rename RTPSParticipant attr getter to get_attributes
EduPonz Jul 19, 2024
64cf33a
Refs #21313. Fix doxygen reference
EduPonz Jul 20, 2024
8c82a55
Refs #21313: Fix tests
richiware Jul 23, 2024
ce7336b
Refs #21313: Update doxygen documentation
richiware Jul 23, 2024
122bc63
Refs #21313. Fix after rebase
richiware Jul 24, 2024
18d6bbd
Refs #21313: Apply Miguel's suggestions
EduPonz Jul 24, 2024
6bab448
Refs #21313: Fix build with BUILD_DOCUMENTATION=ON
EduPonz Jul 24, 2024
3c4ca8c
Refs #21313: Fix build with BUILD_DOCUMENTATION=ON and Fast CDR as th…
EduPonz Jul 24, 2024
b21873e
Refs #21313: Uncrustify
EduPonz Jul 24, 2024
1aa76e4
Refs #21313: Fix uncrustify
EduPonz Jul 24, 2024
6bc41e5
Refs #21313: Fix failing tests
EduPonz Jul 24, 2024
78dee76
Refs #21313: Fix potential ABBA between PDP and RTPSParticipantImpl m…
EduPonz Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ if(BUILD_DOCUMENTATION)
endif()

# Target to create documentation directories
add_custom_target(docdirs
add_custom_target(fastdds_docdirs
COMMAND ${CMAKE_COMMAND} -E make_directory ${PROJECT_BINARY_DIR}/doc
COMMENT "Creating documentation directory" VERBATIM)

Expand All @@ -497,12 +497,12 @@ if(BUILD_DOCUMENTATION)
# Configure the template doxyfile for or specific project
configure_file(utils/doxygen/doxyfile.in ${PROJECT_BINARY_DIR}/doxyfile @ONLY IMMEDIATE)
# Add custom target to run doxygen when ever the project is build
add_custom_target(doxygen
add_custom_target(fastdds_doxygen
COMMAND "${DOXYGEN_EXECUTABLE}" "${PROJECT_BINARY_DIR}/doxyfile"
SOURCES "${PROJECT_BINARY_DIR}/doxyfile"
COMMENT "Generating API documentation with doxygen" VERBATIM)

add_dependencies(doxygen docdirs)
add_dependencies(fastdds_doxygen fastdds_docdirs)

### README html ########################

Expand Down Expand Up @@ -552,15 +552,15 @@ if(BUILD_DOCUMENTATION)
COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_BINARY_DIR}/readthedocs_custom.cmake
)

add_dependencies(readthedocs docdirs)
add_dependencies(readthedocs fastdds_docdirs)
endif()

add_custom_target(doc ALL
add_custom_target(fastdds_doc ALL
COMMENT "Generated project documentation" VERBATIM)

add_dependencies(doc doxygen)
add_dependencies(fastdds_doc fastdds_doxygen)
if(NOT CHECK_DOCUMENTATION)
add_dependencies(doc readthedocs)
add_dependencies(fastdds_doc readthedocs)
endif()
endif()

Expand Down
2 changes: 1 addition & 1 deletion examples/cpp/xtypes/CLIParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class CLIParser
}
else if (config.entity == CLIParser::EntityKind::SUBSCRIBER)
{
EPROSIMA_LOG_ERROR(CLI_PARSER, "--xml-type flag available only for subscriber entity");
EPROSIMA_LOG_ERROR(CLI_PARSER, "--xml-type flag available only for publisher entity");
print_help(EXIT_FAILURE);
}
else
Expand Down
3 changes: 3 additions & 0 deletions examples/cpp/xtypes/PublisherApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ void PublisherApp::run()
return is_stopped();
});
}

// Wait for acknowledgments with 500 ms timeout
writer_->wait_for_acknowledgments({0, 500000000});
}

bool PublisherApp::publish()
Expand Down
7 changes: 4 additions & 3 deletions examples/cpp/xtypes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,21 @@ This is accomplished by setting the environment variable ``FASTDDS_DEFAULT_PROFI
* Ubuntu ( / MacOS )

```shell
user@machine:example_path$ export FASTDDS_DEFAULT_PROFILES_FILE=xtypes_profile.xml
user@machine:example_path$ export FASTDDS_DEFAULT_PROFILES_FILE=xtypes_complete_profile.xml
```

* Windows

```powershell
example_path> set FASTDDS_DEFAULT_PROFILES_FILE=xtypes_profile.xml
example_path> set FASTDDS_DEFAULT_PROFILES_FILE=xtypes_complete_profile.xml
```

The example provides with an XML profiles files with certain QoS:
The example provides with two XML profiles files with certain QoS:

- Reliable reliability: avoid sample loss.
- Transient local durability: enable late-join subscriber applications to receive previous samples.
- Keep-last history with high depth: ensure certain amount of previous samples for late-joiners.
- Type propagation: Set to either complete of minimal depending on the used XML profiles file.

Applying different configurations to the entities will change to a greater or lesser extent how the application behaves in relation to sample management.
Even when these settings affect the behavior of the sample management, the applications' output will be the similar.
80 changes: 58 additions & 22 deletions examples/cpp/xtypes/SubscriberApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
#include "SubscriberApp.hpp"

#include <condition_variable>
#include <mutex>
#include <stdexcept>

#include <fastcdr/exceptions/BadParamException.h>

#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
Expand Down Expand Up @@ -55,7 +58,7 @@ SubscriberApp::SubscriberApp(
, reader_(nullptr)
, samples_(config.samples)
, received_samples_(0)
, type_discovered_(false)
, type_discovered_("")
, stop_(false)
{
// Create the participant
Expand Down Expand Up @@ -114,11 +117,36 @@ void SubscriberApp::on_subscription_matched(
void SubscriberApp::on_data_available(
DataReader* reader)
{
using eprosima::fastcdr::exception::BadParamException;

SampleInfo info;
while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&hello_, &info)))
{
// Mininal type objects are registered with autogenerated member names that may containg non-utf-8 chars.
bool is_member_name_known = false;

// Extract type kind from remote type object
TypeKind type_kind = TK_NONE;

try
{
type_kind = remote_type_object_.complete()._d();
is_member_name_known = true;
}
catch (const BadParamException&)
{
try
{
type_kind = remote_type_object_.minimal()._d();
}
catch (const BadParamException&)
{
throw std::runtime_error("Cannot get type kind from remote type object");
}
}

if (ALIVE_INSTANCE_STATE == info.instance_state && info.valid_data &&
TK_STRUCTURE == remote_type_object_.complete()._d())
TK_STRUCTURE == type_kind)
{
// Increase received samples
received_samples_++;
Expand All @@ -136,7 +164,8 @@ void SubscriberApp::on_data_available(
// Print all the members
for (auto elem : members)
{
std::cout << " - " << elem.first << ": ";
std::string member_name = is_member_name_known ? elem.first.c_str() : "Unkown member name";
std::cout << " - " << member_name << ": ";

MemberDescriptor::_ref_type member_descriptor = {traits<MemberDescriptor>::make_shared()};

Expand Down Expand Up @@ -197,49 +226,56 @@ void SubscriberApp::on_data_writer_discovery(
// We don't want to ignore the writer
should_be_ignored = false;

std::lock_guard<std::mutex> lck(mtx_);

// Check if the discovered topic is the one we are interested in
if (topic_name_ == info.topic_name.to_string())
if ((type_discovered_ == "") && (topic_name_ == info.topic_name.to_string()))
{
// Get remote type information and use it to retrieve the type object
auto type_info = info.type_information.type_information;
auto type_id = type_info.complete().typeid_with_size().type_id();

if (RETCODE_OK != DomainParticipantFactory::get_instance()->type_object_registry().get_type_object(
type_id,
remote_type_object_))
remote_type_information_ = info.type_information.type_information;
auto type_id_complete = remote_type_information_.complete().typeid_with_size().type_id();
auto type_id_minimal = remote_type_information_.minimal().typeid_with_size().type_id();
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved

if ((RETCODE_OK != DomainParticipantFactory::get_instance()->type_object_registry().get_type_object(
type_id_complete,
remote_type_object_)) &&
(RETCODE_OK != DomainParticipantFactory::get_instance()->type_object_registry().get_type_object(
type_id_minimal,
remote_type_object_)))
{
std::cout << "Cannot get discovered type from registry:" << std::endl;
std::cout << " - Topic name: " << info.topic_name << std::endl;
std::cout << " - Type name: " << info.type_name << std::endl;
throw std::runtime_error("Error getting type object from registry");
}

// Notify run thread that type has been discovered
type_discovered_.store(true);
terminate_cv_.notify_one();
type_discovered_ = info.type_name.to_string();
cv_.notify_one();
}
}

void SubscriberApp::run()
{
// Wait for type discovery
{
std::unique_lock<std::mutex> lck(terminate_cv_mtx_);
terminate_cv_.wait(lck, [&]
std::unique_lock<std::mutex> lck(mtx_);
cv_.wait(lck, [&]
{
return is_stopped() || type_discovered_.load();
return is_stopped() || (type_discovered_ != "");
});
}

// Create entities unless we need to exit
if (type_discovered_)
if (type_discovered_ != "")
{
initialize_entities();
}

// Wait for shutdown command
{
std::unique_lock<std::mutex> lck(terminate_cv_mtx_);
terminate_cv_.wait(lck, [&]
std::unique_lock<std::mutex> lck(mtx_);
cv_.wait(lck, [&]
{
return is_stopped();
});
Expand All @@ -254,7 +290,7 @@ bool SubscriberApp::is_stopped()
void SubscriberApp::stop()
{
stop_.store(true);
terminate_cv_.notify_all();
cv_.notify_all();
}

void SubscriberApp::initialize_entities()
Expand All @@ -276,9 +312,9 @@ void SubscriberApp::initialize_entities()
throw std::runtime_error("Error building type");
}

TypeSupport dyn_type_support(new DynamicPubSubType(remote_type_));
TypeSupport dyn_type_support(new DynamicPubSubType(remote_type_, remote_type_information_));

if (RETCODE_OK != dyn_type_support.register_type(participant_))
if (RETCODE_OK != dyn_type_support.register_type(participant_, type_discovered_))
{
throw std::runtime_error("Error registering type");
}
Expand All @@ -304,7 +340,7 @@ void SubscriberApp::initialize_entities()
// Create the topic
TopicQos topic_qos = TOPIC_QOS_DEFAULT;
participant_->get_default_topic_qos(topic_qos);
topic_ = participant_->create_topic(topic_name_, dyn_type_support.get_type_name(), topic_qos);
topic_ = participant_->create_topic(topic_name_, type_discovered_, topic_qos);

if (nullptr == topic_)
{
Expand Down
10 changes: 7 additions & 3 deletions examples/cpp/xtypes/SubscriberApp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#define FASTDDS_EXAMPLES_CPP_XTYPES__SUBSCRIBERAPP_HPP

#include <condition_variable>
#include <mutex>
#include <string>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantListener.hpp>
Expand Down Expand Up @@ -82,6 +84,8 @@ class SubscriberApp : public Application, public DomainParticipantListener

::xtypes::TypeObject remote_type_object_;

::xtypes::TypeInformation remote_type_information_;

DynamicType::_ref_type remote_type_;

DomainParticipant* participant_;
Expand All @@ -98,13 +102,13 @@ class SubscriberApp : public Application, public DomainParticipantListener

uint16_t received_samples_;

std::atomic<bool> type_discovered_;
std::string type_discovered_;

std::atomic<bool> stop_;

mutable std::mutex terminate_cv_mtx_;
mutable std::mutex mtx_;

std::condition_variable terminate_cv_;
std::condition_variable cv_;

};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
<domainId>0</domainId>
<rtps>
<name>xtypes_participant</name>
<propertiesPolicy>
<properties>
<!-- Explicitly activate complete type propagation -->
<property>
<name>fastdds.type_propagation</name>
<value>enabled</value>
</property>
</properties>
</propertiesPolicy>
</rtps>
</participant>
<data_writer profile_name="xtypes_datawriter_profile" is_default_profile="true">
Expand Down
72 changes: 72 additions & 0 deletions examples/cpp/xtypes/xtypes_minimal_profile.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8" ?>
<dds xmlns="http://www.eprosima.com" >
<profiles>
<participant profile_name="xtypes_participant_profile" is_default_profile="true">
<domainId>0</domainId>
<rtps>
<name>xtypes_participant</name>
<propertiesPolicy>
<properties>
<!-- Activate minimal type propagation only -->
<property>
<name>fastdds.type_propagation</name>
<value>minimal_bandwidth</value>
</property>
</properties>
</propertiesPolicy>
</rtps>
</participant>
<data_writer profile_name="xtypes_datawriter_profile" is_default_profile="true">
<qos>
<durability>
<kind>TRANSIENT_LOCAL</kind>
</durability>
<reliability>
<kind>RELIABLE</kind>
</reliability>
</qos>
<topic>
<historyQos>
<kind>KEEP_LAST</kind>
<depth>100</depth>
</historyQos>
<resourceLimitsQos>
<max_samples>100</max_samples>
<max_instances>1</max_instances>
<max_samples_per_instance>100</max_samples_per_instance>
</resourceLimitsQos>
</topic>
</data_writer>

<data_reader profile_name="xtypes_datareader_profile" is_default_profile="true">
<qos>
<durability>
<kind>TRANSIENT_LOCAL</kind>
</durability>
<reliability>
<kind>RELIABLE</kind>
</reliability>
</qos>
<topic>
<historyQos>
<kind>KEEP_LAST</kind>
<depth>100</depth>
</historyQos>
<resourceLimitsQos>
<max_samples>100</max_samples>
<max_instances>1</max_instances>
<max_samples_per_instance>100</max_samples_per_instance>
</resourceLimitsQos>
</topic>
</data_reader>
</profiles>

<types>
<type>
<struct name="HelloWorld">
<member name="index" type="uint32"/>
<member name="message" type="string"/>
</struct>
</type>
</types>
</dds>
7 changes: 7 additions & 0 deletions include/fastdds/dds/core/policy/ParameterTypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,13 @@ const char* const parameter_policy_physical_data_process = "fastdds.physical_dat
*/
const char* const parameter_enable_monitor_service = "fastdds.enable_monitor_service";

/**
* Parameter property value for configuring type propagation
*
* @ingroup PARAMETER_MODULE
*/
const char* const parameter_policy_type_propagation = "fastdds.type_propagation";

/**
* @ingroup PARAMETER_MODULE
*/
Expand Down
Loading
Loading