diff --git a/include/fastdds/dds/subscriber/DataReader.hpp b/include/fastdds/dds/subscriber/DataReader.hpp index ad7eaee01ae..61846b6a91b 100644 --- a/include/fastdds/dds/subscriber/DataReader.hpp +++ b/include/fastdds/dds/subscriber/DataReader.hpp @@ -785,7 +785,9 @@ class DataReader : public DomainEntity const void* instance) const; /** - * @brief Returns information about the first untaken sample. + * @brief Returns information about the first untaken sample. This method is meant to be called prior to + * a read() or take() operation as it does not modify the status condition of the entity. + * * * @param [out] info Pointer to a SampleInfo_t structure to store first untaken sample information. * diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index 4ba6f323862..6294dd3598c 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -199,7 +199,8 @@ class DataReaderImpl SampleInfoSeq& sample_infos); /** - * @brief Returns information about the first untaken sample. + * @brief Returns information about the first untaken sample. This method is meant to be called prior to + * a read() or take() operation as it does not modify the status condition of the entity. * @param [out] info Pointer to a SampleInfo structure to store first untaken sample information. * @return true if sample info was returned. false if there is no sample to take. */ diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index 392c86f4864..d8455e497ee 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -356,9 +356,21 @@ bool DataReaderHistory::get_first_untaken_info( for (auto& it : data_available_instances_) { auto& instance_changes = it.second->cache_changes; - if (!instance_changes.empty()) + for (auto& instance_change : instance_changes) { - ReadTakeCommand::generate_info(info, *(it.second), instance_changes.front()); + WriterProxy* wp = nullptr; + bool is_future_change = false; + + if (mp_reader->begin_sample_access_nts(instance_change, wp, is_future_change)) + { + mp_reader->end_sample_access_nts(instance_change, wp, false); + if (is_future_change) + { + continue; + } + } + + ReadTakeCommand::generate_info(info, *(it.second), instance_change); return true; } } diff --git a/test/blackbox/common/DDSBlackboxTestsDataReader.cpp b/test/blackbox/common/DDSBlackboxTestsDataReader.cpp index d2bb301541b..c96fc2a8db6 100644 --- a/test/blackbox/common/DDSBlackboxTestsDataReader.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDataReader.cpp @@ -16,6 +16,8 @@ #include +#include +#include #include #include "BlackboxTests.hpp" @@ -230,6 +232,110 @@ TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo) ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count(); } +//! Regression test for #20706 +//! get_first_untaken_info() returns the first valid change of an instance, not only the first +//! cache change. This implies searching in all the cache changes of the instance. +//! In the scenario of having multiple reliable writers and one reader with history size > 1 in the same topic, +//! it can happen that get_first_untaken_info() returns OK (as it is not currently checking whether the change is in the future) +//! but take() returns NO_DATA because it is waiting for a previous SequenceNumber from the writer. +TEST(DDSDataReader, GetFirstUntakenInfoReturnsTheFirstValidChange) +{ + PubSubWriter writer_1(TEST_TOPIC_NAME); + PubSubWriter writer_2(TEST_TOPIC_NAME); + // The reader should not take nor read any sample in this test + PubSubReader reader(TEST_TOPIC_NAME, false, false, false); + + auto testTransport_1 = std::make_shared(); + + EntityId_t writer1_id; + EntityId_t reader_id; + + testTransport_1->drop_data_messages_filter_ = + [&writer1_id, &reader_id](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool + { + uint32_t old_pos = msg.pos; + + // see RTPS DDS 9.4.5.3 Data Submessage + EntityId_t readerID; + EntityId_t writerID; + SequenceNumber_t sn; + + msg.pos += 2; // flags + msg.pos += 2; // octets to inline quos + CDRMessage::readEntityId(&msg, &readerID); + CDRMessage::readEntityId(&msg, &writerID); + CDRMessage::readSequenceNumber(&msg, &sn); + + // restore buffer pos + msg.pos = old_pos; + + // Loose Seqnum 1 + if (writerID == writer1_id && + readerID == reader_id && + (sn == SequenceNumber_t{0, 1})) + { + return true; + } + + return false; + }; + + writer_1.disable_builtin_transport() + .add_user_transport_to_pparams(testTransport_1) + .history_depth(3) + .init(); + + writer_2.history_depth(3) + .init(); + + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .history_depth(3) + .init(); + + ASSERT_TRUE(writer_1.isInitialized()); + ASSERT_TRUE(writer_2.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); + + writer1_id = writer_1.datawriter_guid().entityId; + reader_id = reader.datareader_guid().entityId; + + // Wait for discovery. + writer_1.wait_discovery(); + writer_2.wait_discovery(); + reader.wait_discovery(std::chrono::seconds::zero(), 2); + + // Send writer_1 samples + auto data = default_helloworld_data_generator(3); + + reader.startReception(data); + writer_1.send(data); + + // The reader should have received samples 2,3 but not 1 + // get_first_untaken_info() should never return OK since the received changes are all in the future. + // We try it several times in case the reader has not received the samples yet. + eprosima::fastdds::dds::SampleInfo info; + for (size_t i = 0; i < 3; i++) + { + ASSERT_NE(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, reader.get_native_reader().get_first_untaken_info( + &info)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + + // Now we send data from writer_2 with no drops and all samples shall be received. + data = default_helloworld_data_generator(3); + writer_2.send(data); + reader.block_for_unread_count_of(3); + + // get_first_untaken_info() must return OK now + ASSERT_EQ(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, + reader.get_native_reader().get_first_untaken_info(&info)); + eprosima::fastdds::dds::StackAllocatedSequence data_values; + eprosima::fastdds::dds::SampleInfoSeq sample_infos{1}; + // As get_first_untaken_info() returns OK, take() must return OK too + ASSERT_EQ(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, + reader.get_native_reader().take(data_values, sample_infos)); +} + //! Regression test for Issues #3822 Github #3875 //! This test needs to late join a reader in the same process. //! Not setting this test as parametrized since it only makes sense in intraprocess.