diff --git a/include/fastdds/rtps/history/ReaderHistory.h b/include/fastdds/rtps/history/ReaderHistory.h index d2d5531b6f3..8bd7340b161 100644 --- a/include/fastdds/rtps/history/ReaderHistory.h +++ b/include/fastdds/rtps/history/ReaderHistory.h @@ -126,9 +126,6 @@ class ReaderHistory : public History RTPS_DllAPI void do_release_cache( CacheChange_t* ch) override; - iterator get_first_change_with_minimum_ts( - const Time_t timestamp); - //!Pointer to the reader RTPSReader* mp_reader; diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp index 85c70468782..fa78b40814d 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp @@ -18,16 +18,18 @@ */ #include -#include -#include -#include +#include +#include #include #include -#include -#include +#include + +#include +#include +#include namespace eprosima { namespace fastrtps { @@ -281,10 +283,11 @@ bool SubscriberHistory::add_received_change_with_key( } //ADD TO KEY VECTOR - - // As the instance should be ordered following the presentation QoS, and - // we only support ordering by reception timestamp, we can always add at the end. - instance_changes.push_back(a_change); + eprosima::utilities::collections::sorted_vector_insert(instance_changes, a_change, + [](const CacheChange_t* lhs, const CacheChange_t* rhs) + { + return lhs->sourceTimestamp < rhs->sourceTimestamp; + }); logInfo(SUBSCRIBER, mp_reader->getGuid().entityId << ": Change " << a_change->sequenceNumber << " added from: " diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index 2621437ad4f..50552de2602 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -24,6 +24,8 @@ #include #include +#include + #include namespace eprosima { @@ -71,9 +73,11 @@ bool ReaderHistory::add_change( logError(RTPS_READER_HISTORY, "The Writer GUID_t must be defined"); } - auto it = get_first_change_with_minimum_ts(a_change->sourceTimestamp); - m_changes.insert(it, a_change); - + eprosima::utilities::collections::sorted_vector_insert(m_changes, a_change, + [](const CacheChange_t* lhs, const CacheChange_t* rhs) + { + return lhs->sourceTimestamp < rhs->sourceTimestamp; + }); logInfo(RTPS_READER_HISTORY, "Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes"); @@ -231,22 +235,6 @@ void ReaderHistory::do_release_cache( mp_reader->releaseCache(ch); } -History::iterator ReaderHistory::get_first_change_with_minimum_ts( - const Time_t timestamp) -{ - if (!m_changes.empty() && timestamp < (*m_changes.rbegin())->sourceTimestamp) - { - iterator it = std::lower_bound(m_changes.begin(), m_changes.end(), timestamp, - [](const CacheChange_t* c1, const Time_t& ts) -> bool - { - return c1->sourceTimestamp < ts; - }); - return it; - } - - return m_changes.end(); -} - } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ diff --git a/src/cpp/utils/collections/sorted_vector_insert.hpp b/src/cpp/utils/collections/sorted_vector_insert.hpp new file mode 100644 index 00000000000..6d45cf216c1 --- /dev/null +++ b/src/cpp/utils/collections/sorted_vector_insert.hpp @@ -0,0 +1,64 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file sorted_vector_insert.hpp + */ + +#ifndef SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_ +#define SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_ + +#include +#include + +namespace eprosima { +namespace utilities { +namespace collections { + +/** + * @brief Insert item into sorted vector-like collection + * + * @tparam CollectionType Type of the collection to be modified. + * @tparam ValueType Type of the item to insert. The collection should support to insert a value of this type. + * @tparam LessThanPredicate Predicate that performs ValueType < CollectionType::value_type comparison. + * + * @param[in,out] collection The collection to be modified. + * @param[in] item The item to be inserted. + * @param[in] pred The predicate to use for comparisons. + */ +template< + typename CollectionType, + typename ValueType, + typename LessThanPredicate = std::less> +void sorted_vector_insert( + CollectionType& collection, + const ValueType& item, + const LessThanPredicate& pred = LessThanPredicate()) +{ + // Insert at the end by default + auto it = collection.end(); + + // Find insertion position when item is less than last element in collection + if (!collection.empty() && pred(item, *collection.rbegin())) + { + it = std::lower_bound(collection.begin(), collection.end(), item, pred); + } + collection.insert(it, item); +} + +} // namespace collections +} // namespace utilities +} // namespace eprosima + +#endif // SRC_CPP_UTILS_COLLECTIONS_SORTED_VECTOR_INSERT_HPP_ diff --git a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp index 4a18848ab46..da6ce0579be 100644 --- a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp @@ -935,6 +935,50 @@ TEST_P(PubSubHistory, PubSubAsReliableKeepLastReaderSmallDepthWithKey) } } +// Regression test for redmine bug #12419 +// It uses a test transport to drop some DATA messages, in order to force unordered reception. +TEST_P(PubSubHistory, PubSubAsReliableKeepLastWithKeyUnorderedReception) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + uint32_t keys = 2; + uint32_t depth = 10; + + reader.resource_limits_max_instances(keys). + reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS). + history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS). + history_depth(depth).mem_policy(mem_policy_).init(); + + ASSERT_TRUE(reader.isInitialized()); + + auto testTransport = std::make_shared(); + testTransport->dropDataMessagesPercentage = 25; + + writer.resource_limits_max_instances(keys). + reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS). + history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS). + history_depth(depth).mem_policy(mem_policy_). + disable_builtin_transport().add_user_transport_to_pparams(testTransport). + init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyedhelloworld_data_generator(keys * depth); + reader.startReception(data); + + // Send data + writer.send(data); + ASSERT_TRUE(data.empty()); + + reader.block_for_all(); + reader.stopReception(); +} + bool comparator( HelloWorld first, HelloWorld second) diff --git a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h index 6e49b3c1706..2ef0a15eb8c 100644 --- a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h +++ b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h @@ -127,13 +127,6 @@ class ReaderHistory std::mutex samples_number_mutex_; unsigned int samples_number_; SequenceNumber_t last_sequence_number_; - - iterator get_first_change_with_minimum_ts( - const Time_t& /* timestamp */) - { - return m_changes.end(); - } - }; } // namespace rtps