diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 119aa629fc4..a434e6afe85 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -190,7 +190,15 @@ ReturnCode_t DataWriterImpl::enable() property.value(topic_->get_name().c_str()); w_att.endpoint.properties.properties().push_back(std::move(property)); - if (publisher_->get_qos().partition().names().size() > 0) + std::string* endpoint_partitions = PropertyPolicyHelper::find_property(qos_.properties(), "partitions"); + + if (endpoint_partitions) + { + property.name("partitions"); + property.value(*endpoint_partitions); + w_att.endpoint.properties.properties().push_back(std::move(property)); + } + else if (publisher_->get_qos().partition().names().size() > 0) { property.name("partitions"); std::string partitions; @@ -336,6 +344,17 @@ ReturnCode_t DataWriterImpl::enable() { wqos.data_sharing.off(); } + if (endpoint_partitions) + { + std::istringstream partition_string(*endpoint_partitions); + std::string partition_name; + wqos.m_partition.clear(); + + while (std::getline(partition_string, partition_name, ';')) + { + wqos.m_partition.push_back(partition_name.c_str()); + } + } publisher_->rtps_participant()->registerWriter(writer_, get_topic_attributes(qos_, *topic_, type_), wqos); return ReturnCode_t::RETCODE_OK; diff --git a/src/cpp/fastdds/publisher/PublisherImpl.cpp b/src/cpp/fastdds/publisher/PublisherImpl.cpp index d32ceb02abc..1f6692be0f5 100644 --- a/src/cpp/fastdds/publisher/PublisherImpl.cpp +++ b/src/cpp/fastdds/publisher/PublisherImpl.cpp @@ -29,6 +29,7 @@ #include #include +#include #include #include @@ -45,6 +46,7 @@ namespace dds { using fastrtps::xmlparser::XMLProfileManager; using fastrtps::xmlparser::XMLP_ret; using fastrtps::rtps::InstanceHandle_t; +using fastrtps::rtps::Property; using fastrtps::Duration_t; using fastrtps::PublisherAttributes; @@ -79,6 +81,23 @@ static void set_qos_from_attributes( qos.history() = attr.topic.historyQos; qos.resource_limits() = attr.topic.resourceLimitsQos; qos.data_sharing() = attr.qos.data_sharing; + + if (attr.qos.m_partition.size() > 0 ) + { + Property property; + property.name("partitions"); + std::string partitions; + bool is_first_partition = true; + + for (auto partition : attr.qos.m_partition.names()) + { + partitions += (is_first_partition ? "" : ";") + partition; + is_first_partition = false; + } + + property.value(std::move(partitions)); + qos.properties().properties().push_back(std::move(property)); + } } PublisherImpl::PublisherImpl( diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 8f829673a00..c260f8b5cc9 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -177,7 +177,16 @@ ReturnCode_t DataReaderImpl::enable() property.name("topic_name"); property.value(topic_->get_name().c_str()); att.endpoint.properties.properties().push_back(std::move(property)); - if (subscriber_->get_qos().partition().names().size() > 0) + + std::string* endpoint_partitions = PropertyPolicyHelper::find_property(qos_.properties(), "partitions"); + + if (endpoint_partitions) + { + property.name("partitions"); + property.value(*endpoint_partitions); + att.endpoint.properties.properties().push_back(std::move(property)); + } + else if (subscriber_->get_qos().partition().names().size() > 0) { property.name("partitions"); std::string partitions; @@ -249,6 +258,17 @@ ReturnCode_t DataReaderImpl::enable() { rqos.data_sharing.off(); } + if (endpoint_partitions) + { + std::istringstream partition_string(*endpoint_partitions); + std::string partition_name; + rqos.m_partition.clear(); + + while (std::getline(partition_string, partition_name, ';')) + { + rqos.m_partition.push_back(partition_name.c_str()); + } + } subscriber_->rtps_participant()->registerReader(reader_, topic_attributes(), rqos); return ReturnCode_t::RETCODE_OK; diff --git a/src/cpp/fastdds/subscriber/SubscriberImpl.cpp b/src/cpp/fastdds/subscriber/SubscriberImpl.cpp index 0f0d7ee275e..5a1063342eb 100644 --- a/src/cpp/fastdds/subscriber/SubscriberImpl.cpp +++ b/src/cpp/fastdds/subscriber/SubscriberImpl.cpp @@ -30,6 +30,7 @@ #include #include +#include #include #include @@ -44,6 +45,7 @@ namespace dds { using fastrtps::xmlparser::XMLProfileManager; using fastrtps::xmlparser::XMLP_ret; using fastrtps::rtps::InstanceHandle_t; +using fastrtps::rtps::Property; using fastrtps::Duration_t; using fastrtps::SubscriberAttributes; @@ -78,6 +80,23 @@ static void set_qos_from_attributes( qos.history() = attr.topic.historyQos; qos.resource_limits() = attr.topic.resourceLimitsQos; qos.data_sharing() = attr.qos.data_sharing; + + if (attr.qos.m_partition.size() > 0 ) + { + Property property; + property.name("partitions"); + std::string partitions; + bool is_first_partition = true; + + for (auto partition : attr.qos.m_partition.names()) + { + partitions += (is_first_partition ? "" : ";") + partition; + is_first_partition = false; + } + + property.value(std::move(partitions)); + qos.properties().properties().push_back(std::move(property)); + } } SubscriberImpl::SubscriberImpl( diff --git a/test/blackbox/CMakeLists.txt b/test/blackbox/CMakeLists.txt index c6e85d654e2..f05b6a467c9 100644 --- a/test/blackbox/CMakeLists.txt +++ b/test/blackbox/CMakeLists.txt @@ -252,6 +252,8 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) ${CMAKE_CURRENT_BINARY_DIR}/StatisticsDomainParticipant.xml) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/utils/check_guid.py ${CMAKE_CURRENT_BINARY_DIR}/check_guid.py) + configure_file(${CMAKE_CURRENT_SOURCE_DIR}/partitions.xml + ${CMAKE_CURRENT_BINARY_DIR}/partitions.xml) if(FASTRTPS_API_TESTS) set(BLACKBOXTESTS_FASTRTPS_SOURCE diff --git a/test/blackbox/common/DDSBlackboxTestsEndpointPartitioning.cpp b/test/blackbox/common/DDSBlackboxTestsEndpointPartitioning.cpp new file mode 100644 index 00000000000..dae061d889f --- /dev/null +++ b/test/blackbox/common/DDSBlackboxTestsEndpointPartitioning.cpp @@ -0,0 +1,252 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "BlackboxTests.hpp" +#include "PubSubReader.hpp" +#include "PubSubWriter.hpp" + +using namespace eprosima::fastrtps; + +/** + * This test checks that DataWriter/DataReaders match with each other one to one when the partition + * configuration set on their profile matches. It also tests that this configuration is compatible with + * Publisher/Subscriber partition configuration. + */ +TEST(EndpointPartitioning, SinglePartition) +{ + PubSubWriter writer_a(TEST_TOPIC_NAME); + PubSubReader reader_a(TEST_TOPIC_NAME); + + // Positive test. Same partition. Match + + writer_a.set_xml_filename("partitions.xml"); + writer_a.set_datawriter_profile("partition_a_writer"); + writer_a.init(); + EXPECT_TRUE(writer_a.isInitialized()); + + reader_a.set_xml_filename("partitions.xml"); + reader_a.set_datareader_profile("partition_a_reader"); + reader_a.init(); + EXPECT_TRUE(reader_a.isInitialized()); + + writer_a.wait_discovery(std::chrono::seconds(3)); + reader_a.wait_discovery(std::chrono::seconds(3)); + + ASSERT_TRUE(writer_a.is_matched()); + ASSERT_TRUE(reader_a.is_matched()); + + // Negative test. Partition differs. No match. + + PubSubReader reader_b(TEST_TOPIC_NAME); + reader_b.set_xml_filename("partitions.xml"); + reader_b.set_datareader_profile("partition_b_reader"); + reader_b.init(); + EXPECT_TRUE(reader_b.isInitialized()); + + reader_b.wait_discovery(std::chrono::seconds(3)); + ASSERT_FALSE(reader_b.is_matched()); + + // Partition interaction between DataWriter and Subscriber Qos. Match + + PubSubReader reader_a_from_subscriber(TEST_TOPIC_NAME); + reader_a_from_subscriber.partition("partition_a"); + reader_a_from_subscriber.init(); + + EXPECT_TRUE(reader_a_from_subscriber.isInitialized()); + + reader_a_from_subscriber.wait_discovery(std::chrono::seconds(3)); + ASSERT_TRUE(reader_a_from_subscriber.is_matched()); + + // Partition interaction between Publisher and DataReader Qos. Match + + PubSubWriter writer_b_from_publisher(TEST_TOPIC_NAME); + writer_b_from_publisher.partition("partition_b"); + writer_b_from_publisher.init(); + + reader_b.wait_discovery(std::chrono::seconds(3)); + writer_b_from_publisher.wait_discovery(std::chrono::seconds(3)); + + ASSERT_TRUE(writer_b_from_publisher.is_matched()); + ASSERT_TRUE(reader_b.is_matched()); + + writer_a.destroy(); + writer_b_from_publisher.destroy(); + + reader_a.destroy(); + reader_b.destroy(); + reader_a_from_subscriber.destroy(); +} + +/** + * This test checks that DataWriter/DataReader partition configuration takes precedence over the same configuration + * done via the Publisher/Subscriber corresponding Qos parameter. + */ +TEST(EndpointPartitioning, QosOverride) +{ + // Partition configuration overriding. Endpoint configuration must take precedence. + PubSubWriter writer_a_qos_override(TEST_TOPIC_NAME); + PubSubReader reader_b_qos_override(TEST_TOPIC_NAME); + + writer_a_qos_override.set_xml_filename("partitions.xml"); + writer_a_qos_override.set_datawriter_profile("partition_a_writer"); + + // We change the PublisherQos so the partition matches. DataWriter partition stays the same (partition_a) + writer_a_qos_override.partition("partition_b"); + writer_a_qos_override.init(); + EXPECT_TRUE(writer_a_qos_override.isInitialized()); + + reader_b_qos_override.set_xml_filename("partitions.xml"); + reader_b_qos_override.set_datareader_profile("partition_b_reader"); + reader_b_qos_override.init(); + EXPECT_TRUE(reader_b_qos_override.isInitialized()); + + writer_a_qos_override.wait_discovery(std::chrono::seconds(3)); + reader_b_qos_override.wait_discovery(std::chrono::seconds(3)); + + ASSERT_FALSE(writer_a_qos_override.is_matched()); + ASSERT_FALSE(reader_b_qos_override.is_matched()); + + writer_a_qos_override.destroy(); + reader_b_qos_override.destroy(); + + // Same test but trying to override DataReader configuration + PubSubWriter writer_b_qos_override(TEST_TOPIC_NAME); + PubSubReader reader_a_qos_override(TEST_TOPIC_NAME); + + writer_b_qos_override.set_xml_filename("partitions.xml"); + writer_b_qos_override.set_datawriter_profile("partition_b_writer"); + writer_b_qos_override.partition("partition_a"); + writer_b_qos_override.init(); + EXPECT_TRUE(writer_b_qos_override.isInitialized()); + + reader_a_qos_override.set_xml_filename("partitions.xml"); + reader_a_qos_override.set_datareader_profile("partition_a_reader"); + reader_a_qos_override.init(); + EXPECT_TRUE(reader_a_qos_override.isInitialized()); + + writer_b_qos_override.wait_discovery(std::chrono::seconds(3)); + reader_a_qos_override.wait_discovery(std::chrono::seconds(3)); + + ASSERT_FALSE(writer_b_qos_override.is_matched()); + ASSERT_FALSE(reader_a_qos_override.is_matched()); + + writer_b_qos_override.destroy(); + reader_a_qos_override.destroy(); +} + +/** + * This test checks that multiple partitions can be defined properly on both DataReader and DataWriters + * and that it matches with other endpoints with the same partitions. + */ +TEST(EndpointPartitioning, MultiplePartitions) +{ + // Multiple partition test + PubSubReader reader_a(TEST_TOPIC_NAME); + PubSubReader reader_b(TEST_TOPIC_NAME); + PubSubWriter writer_a_b(TEST_TOPIC_NAME); + + writer_a_b.set_xml_filename("partitions.xml"); + writer_a_b.set_datawriter_profile("partition_a_b_writer"); + writer_a_b.init(); + EXPECT_TRUE(writer_a_b.isInitialized()); + + reader_a.set_xml_filename("partitions.xml"); + reader_a.set_datareader_profile("partition_a_reader"); + reader_a.init(); + EXPECT_TRUE(reader_a.isInitialized()); + + reader_b.set_xml_filename("partitions.xml"); + reader_b.set_datareader_profile("partition_b_reader"); + reader_b.init(); + EXPECT_TRUE(reader_b.isInitialized()); + + writer_a_b.wait_discovery(std::chrono::seconds(3)); + reader_a.wait_discovery(std::chrono::seconds(3)); + reader_b.wait_discovery(std::chrono::seconds(3)); + + ASSERT_TRUE(writer_a_b.is_matched()); + ASSERT_TRUE(reader_a.is_matched()); + ASSERT_TRUE(reader_b.is_matched()); + + writer_a_b.destroy(); + reader_a.destroy(); + reader_b.destroy(); + + PubSubWriter writer_a(TEST_TOPIC_NAME); + PubSubWriter writer_b(TEST_TOPIC_NAME); + PubSubReader reader_a_b(TEST_TOPIC_NAME); + + writer_a.set_xml_filename("partitions.xml"); + writer_a.set_datawriter_profile("partition_a_writer"); + writer_a.init(); + EXPECT_TRUE(writer_a.isInitialized()); + + writer_b.set_xml_filename("partitions.xml"); + writer_b.set_datawriter_profile("partition_b_writer"); + writer_b.init(); + EXPECT_TRUE(writer_b.isInitialized()); + + reader_a_b.set_xml_filename("partitions.xml"); + reader_a_b.set_datareader_profile("partition_a_b_reader"); + reader_a_b.init(); + EXPECT_TRUE(reader_a_b.isInitialized()); + + writer_a.wait_discovery(std::chrono::seconds(3)); + writer_b.wait_discovery(std::chrono::seconds(3)); + reader_a_b.wait_discovery(std::chrono::seconds(3)); + + ASSERT_TRUE(writer_a.is_matched()); + ASSERT_TRUE(writer_b.is_matched()); + ASSERT_TRUE(reader_a_b.is_matched()); +} + +/** + * This test checks that partition configuration can be modified via the PropertyPolicyQos API + */ +TEST(EndpointPartitioning, PropertyQos) +{ + PubSubWriter writer_a(TEST_TOPIC_NAME); + PubSubReader reader_a(TEST_TOPIC_NAME); + PubSubReader reader_b(TEST_TOPIC_NAME); + + eprosima::fastdds::dds::PropertyPolicyQos writer_a_policy; + writer_a_policy.properties().emplace_back("partitions", "partition_b"); + writer_a.entity_property_policy(writer_a_policy); + writer_a.init(); + EXPECT_TRUE(writer_a.isInitialized()); + + reader_a.set_xml_filename("partitions.xml"); + reader_a.set_datareader_profile("partition_a_reader"); + reader_a.init(); + EXPECT_TRUE(reader_a.isInitialized()); + + reader_b.set_xml_filename("partitions.xml"); + reader_b.set_datareader_profile("partition_b_reader"); + reader_b.init(); + EXPECT_TRUE(reader_b.isInitialized()); + + writer_a.wait_discovery(std::chrono::seconds(2)); + reader_a.wait_discovery(std::chrono::seconds(2)); + reader_b.wait_discovery(std::chrono::seconds(2)); + + ASSERT_TRUE(writer_a.is_matched()); + ASSERT_FALSE(reader_a.is_matched()); + ASSERT_TRUE(reader_b.is_matched()); + + writer_a.destroy(); + reader_a.destroy(); + reader_b.destroy(); +} diff --git a/test/blackbox/partitions.xml b/test/blackbox/partitions.xml new file mode 100644 index 00000000000..aa418cea056 --- /dev/null +++ b/test/blackbox/partitions.xml @@ -0,0 +1,64 @@ + + + + + + + + partition_a + + + + + + + + + partition_b + + + + + + + + + partition_a + partition_b + + + + + + + + + + partition_a + + + + + + + + + + partition_b + + + + + + + + + + partition_a + partition_b + + + + + + diff --git a/test/unittest/dds/publisher/PublisherTests.cpp b/test/unittest/dds/publisher/PublisherTests.cpp index 30d42fb5c17..62a704a539c 100644 --- a/test/unittest/dds/publisher/PublisherTests.cpp +++ b/test/unittest/dds/publisher/PublisherTests.cpp @@ -31,6 +31,8 @@ #include #include +#include + #include #include #include @@ -41,6 +43,7 @@ namespace fastdds { namespace dds { using fastrtps::PublisherAttributes; +using fastrtps::rtps::PropertyPolicyHelper; using fastrtps::xmlparser::XMLProfileManager; using fastrtps::xmlparser::XMLP_ret; @@ -242,7 +245,19 @@ void check_datawriter_with_profile ( ASSERT_TRUE( qos.writer_resource_limits().matched_subscriber_allocation == publisher_atts.matched_subscriber_allocation); - ASSERT_TRUE(qos.properties() == publisher_atts.properties); + if (publisher_atts.qos.m_partition.names().empty()) + { + ASSERT_TRUE(qos.properties() == publisher_atts.properties); + } + else + { + ASSERT_NE(PropertyPolicyHelper::find_property(qos.properties(), "partitions"), nullptr); + for (auto partition: publisher_atts.qos.m_partition.names()) + { + ASSERT_NE(PropertyPolicyHelper::find_property(qos.properties(), "partitions")->find( + partition), std::string::npos); + } + } ASSERT_TRUE(qos.throughput_controller() == publisher_atts.throughputController); ASSERT_TRUE(qos.endpoint().unicast_locator_list == publisher_atts.unicastLocatorList); ASSERT_TRUE(qos.endpoint().multicast_locator_list == publisher_atts.multicastLocatorList); diff --git a/test/unittest/dds/subscriber/SubscriberTests.cpp b/test/unittest/dds/subscriber/SubscriberTests.cpp index c5427c12f66..106dee07226 100644 --- a/test/unittest/dds/subscriber/SubscriberTests.cpp +++ b/test/unittest/dds/subscriber/SubscriberTests.cpp @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -41,6 +42,7 @@ namespace dds { using fastrtps::PublisherAttributes; using fastrtps::SubscriberAttributes; +using fastrtps::rtps::PropertyPolicyHelper; using fastrtps::xmlparser::XMLProfileManager; using fastrtps::xmlparser::XMLP_ret; @@ -276,7 +278,19 @@ void check_datareader_with_profile ( ASSERT_TRUE( qos.reader_resource_limits().matched_publisher_allocation == subscriber_atts.matched_publisher_allocation); - ASSERT_TRUE(qos.properties() == subscriber_atts.properties); + if (subscriber_atts.qos.m_partition.names().empty()) + { + ASSERT_TRUE(qos.properties() == subscriber_atts.properties); + } + else + { + ASSERT_NE(PropertyPolicyHelper::find_property(qos.properties(), "partitions"), nullptr); + for (auto partition: subscriber_atts.qos.m_partition.names()) + { + ASSERT_NE(PropertyPolicyHelper::find_property(qos.properties(), "partitions")->find( + partition), std::string::npos); + } + } ASSERT_TRUE(qos.expects_inline_qos() == subscriber_atts.expectsInlineQos); ASSERT_TRUE(qos.endpoint().unicast_locator_list == subscriber_atts.unicastLocatorList); ASSERT_TRUE(qos.endpoint().multicast_locator_list == subscriber_atts.multicastLocatorList);