diff --git a/include/fastdds/rtps/writer/ChangeForReader.h b/include/fastdds/rtps/writer/ChangeForReader.h index da81b00c512..0100883170d 100644 --- a/include/fastdds/rtps/writer/ChangeForReader.h +++ b/include/fastdds/rtps/writer/ChangeForReader.h @@ -54,20 +54,7 @@ class ChangeForReader_t public: - ChangeForReader_t() = delete; - - ChangeForReader_t( - const ChangeForReader_t& ch) - : status_(ch.status_) - , seq_num_(ch.seq_num_) - , change_(ch.change_) - , unsent_fragments_(ch.unsent_fragments_) - { - } - - //TODO(Ricardo) Temporal - //ChangeForReader_t(const CacheChange_t* change) : status_(UNSENT), - ChangeForReader_t( + explicit ChangeForReader_t( CacheChange_t* change) : status_(UNSENT) , seq_num_(change->sequenceNumber) @@ -80,20 +67,6 @@ class ChangeForReader_t } } - ~ChangeForReader_t() - { - } - - ChangeForReader_t& operator =( - const ChangeForReader_t& ch) - { - status_ = ch.status_; - seq_num_ = ch.seq_num_; - change_ = ch.change_; - unsent_fragments_ = ch.unsent_fragments_; - return *this; - } - /** * Get the cache change * @return Cache change @@ -150,7 +123,9 @@ class ChangeForReader_t { unsent_fragments_.remove(sentFragment); - if (!unsent_fragments_.empty() && unsent_fragments_.max() < change_->getFragmentCount()) + // We only use the running window mechanism during the first stage, until all fragments have been delivered + // once, and we consider the whole change as delivered. + if (!delivered_ && !unsent_fragments_.empty() && (unsent_fragments_.max() < change_->getFragmentCount())) { FragmentNumber_t base = unsent_fragments_.base(); FragmentNumber_t max = unsent_fragments_.max(); @@ -168,17 +143,31 @@ class ChangeForReader_t void markFragmentsAsUnsent( const FragmentNumberSet_t& unsentFragments) { - FragmentNumber_t other_base = unsentFragments.base(); - if (other_base < unsent_fragments_.base()) + // Ignore NACK_FRAG messages during the first stage, until all fragments have been delivered once, and we + // consider the whole change as delivered. + if (delivered_) { - unsent_fragments_.base_update(other_base); - } - unsentFragments.for_each( - [this]( - FragmentNumber_t element) + if (unsent_fragments_.empty()) + { + // Current window is empty, so we can set it to the received one. + unsent_fragments_ = unsentFragments; + } + else { - unsent_fragments_.add(element); - }); + // Update window to send the lowest possible requested fragments first. + FragmentNumber_t other_base = unsentFragments.base(); + if (other_base < unsent_fragments_.base()) + { + unsent_fragments_.base_update(other_base); + } + unsentFragments.for_each( + [this]( + FragmentNumber_t element) + { + unsent_fragments_.add(element); + }); + } + } } bool has_been_delivered() const @@ -203,7 +192,7 @@ class ChangeForReader_t FragmentNumberSet_t unsent_fragments_; - //! Indicates if was delivered at least one time. + //! Indicates if was delivered at least once. bool delivered_ = false; }; diff --git a/test/blackbox/common/BlackboxTestsPubSubFragments.cpp b/test/blackbox/common/BlackboxTestsPubSubFragments.cpp index 6789ee51143..122fd61ff82 100644 --- a/test/blackbox/common/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubFragments.cpp @@ -86,10 +86,12 @@ class PubSubFragments : public testing::TestWithParam bool reliable, bool volatile_reader, bool volatile_writer, - bool small_fragments) + bool small_fragments, + uint32_t loss_rate = 0) { PubSubReader reader(topic_name); PubSubWriter writer(topic_name); + uint32_t fragment_count = 0; reader .socket_buffer_size(1048576) // accomodate large and fast fragments @@ -104,12 +106,33 @@ class PubSubFragments : public testing::TestWithParam ASSERT_TRUE(reader.isInitialized()); - if (small_fragments) + if (small_fragments || 0 < loss_rate) { - auto testTransport = std::make_shared(); - testTransport->sendBufferSize = 1024; - testTransport->maxMessageSize = 1024; + auto testTransport = std::make_shared(); + testTransport->receiveBufferSize = 65536; + if (small_fragments) + { + testTransport->sendBufferSize = 1024; + testTransport->maxMessageSize = 1024; + } + if (0 < loss_rate) + { + testTransport->drop_data_frag_messages_filter_ = + [&fragment_count, loss_rate](eprosima::fastrtps::rtps::CDRMessage_t& msg)->bool + { + static_cast(msg); + + ++fragment_count; + if (fragment_count >= loss_rate) + { + fragment_count = 0; + } + + return 1ul == fragment_count; + }; + } + writer.disable_builtin_transport(); writer.add_user_transport_to_pparams(testTransport); } @@ -228,6 +251,12 @@ TEST_P(PubSubFragments, PubSubAsReliableTransientLocalData300kbSmallFragments) do_fragment_test(TEST_TOPIC_NAME, data, false, true, false, false, true); } +TEST_P(PubSubFragments, PubSubAsReliableTransientLocalData300kbSmallFragmentsLossy) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, true, false, false, true, 260); +} + TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb) { auto data = default_data300kb_data_generator(); @@ -300,6 +329,12 @@ TEST_P(PubSubFragments, AsyncPubSubAsReliableTransientLocalData300kbSmallFragmen do_fragment_test(TEST_TOPIC_NAME, data, true, true, false, false, true); } +TEST_P(PubSubFragments, AsyncPubSubAsReliableTransientLocalData300kbSmallFragmentsLossy) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, true, false, false, true, 260); +} + class PubSubFragmentsLimited : public testing::TestWithParam { public: diff --git a/test/unittest/rtps/writer/ReaderProxyTests.cpp b/test/unittest/rtps/writer/ReaderProxyTests.cpp index a5f4e1782d0..9fc7ce1f69a 100644 --- a/test/unittest/rtps/writer/ReaderProxyTests.cpp +++ b/test/unittest/rtps/writer/ReaderProxyTests.cpp @@ -148,6 +148,188 @@ TEST(ReaderProxyTests, requested_changes_set_test) rproxy.requested_changes_set(set, gap_builder, {0, 1}); } +FragmentNumber_t mark_next_fragment_sent( + ReaderProxy& rproxy, + SequenceNumber_t sequence_number, + FragmentNumber_t expected_fragment) +{ + FragmentNumber_t next_fragment{expected_fragment}; + SequenceNumber_t gap_seq; + bool need_reactivate_periodic_heartbeat; + rproxy.change_is_unsent(sequence_number, next_fragment, gap_seq, need_reactivate_periodic_heartbeat); + if (next_fragment != expected_fragment) + { + return next_fragment; + } + + bool was_last_fragment; + rproxy.mark_fragment_as_sent_for_change(sequence_number, next_fragment, was_last_fragment); + return next_fragment; +} + +TEST(ReaderProxyTests, process_nack_frag_single_fragment_different_windows_test) +{ + constexpr FragmentNumber_t TOTAL_NUMBER_OF_FRAGMENTS = 400; + constexpr uint16_t FRAGMENT_SIZE = 100; + + StatefulWriter writerMock; + WriterTimes wTimes; + RemoteLocatorsAllocationAttributes alloc; + ReaderProxy rproxy(wTimes, alloc, &writerMock); + CacheChange_t seq; + seq.sequenceNumber = {0, 1}; + seq.serializedPayload.length = TOTAL_NUMBER_OF_FRAGMENTS * FRAGMENT_SIZE; + seq.setFragmentSize(FRAGMENT_SIZE); + + RTPSMessageGroup message_group(nullptr, false); + RTPSGapBuilder gap_builder(message_group); + + ReaderProxyData reader_attributes(0, 0); + reader_attributes.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; + rproxy.start(reader_attributes); + + ChangeForReader_t change(&seq); + rproxy.add_change(change, true, false); + + SequenceNumberSet_t sequence_number_set({0, 1}); + sequence_number_set.add({0, 1}); + rproxy.from_unsent_to_status(seq.sequenceNumber, UNACKNOWLEDGED, false, false); + rproxy.requested_changes_set(sequence_number_set, gap_builder, seq.sequenceNumber); + + // The number of sent fragments should be higher than the FragmentNumberSet_t size. + constexpr FragmentNumber_t NUMBER_OF_SENT_FRAGMENTS = 259; + + for (auto i = 1u; i <= NUMBER_OF_SENT_FRAGMENTS; ++i) + { + ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, i), i); + } + + // Set the change status to UNSENT. + rproxy.perform_acknack_response(nullptr); + + // The difference between the latest sent fragment and an undelivered fragment should also be higher than + // the FragmentNumberSet_t size. + constexpr FragmentNumber_t UNDELIVERED_FRAGMENT = 3; + FragmentNumberSet_t undelivered_fragment_set(UNDELIVERED_FRAGMENT); + undelivered_fragment_set.add(UNDELIVERED_FRAGMENT); + + rproxy.process_nack_frag({}, 1, seq.sequenceNumber, undelivered_fragment_set); + + // Nack data should be ignored: first, complete the sequential delivering of the remaining fragments. + for (auto i = NUMBER_OF_SENT_FRAGMENTS + 1u; i <= TOTAL_NUMBER_OF_FRAGMENTS; ++i) + { + ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, i), i); + } + + // Mark the change as sent, i.e. all fragments were sent once. + rproxy.from_unsent_to_status(seq.sequenceNumber, UNACKNOWLEDGED, false, true); + + // After the change is marked as delivered, nack data can be processed. + rproxy.process_nack_frag({}, 2, seq.sequenceNumber, undelivered_fragment_set); + + // Now, send the fragments that were reported as undelivered. + ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, + UNDELIVERED_FRAGMENT), UNDELIVERED_FRAGMENT); + + // All fragments are marked as sent. + ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, + TOTAL_NUMBER_OF_FRAGMENTS + 1u), TOTAL_NUMBER_OF_FRAGMENTS + 1u); +} + +TEST(ReaderProxyTests, process_nack_frag_multiple_fragments_different_windows_test) +{ + constexpr FragmentNumber_t TOTAL_NUMBER_OF_FRAGMENTS = 400; + constexpr uint16_t FRAGMENT_SIZE = 100; + + StatefulWriter writerMock; + WriterTimes wTimes; + RemoteLocatorsAllocationAttributes alloc; + ReaderProxy rproxy(wTimes, alloc, &writerMock); + CacheChange_t seq; + seq.sequenceNumber = {0, 1}; + seq.serializedPayload.length = TOTAL_NUMBER_OF_FRAGMENTS * FRAGMENT_SIZE; + seq.setFragmentSize(FRAGMENT_SIZE); + + RTPSMessageGroup message_group(nullptr, false); + RTPSGapBuilder gap_builder(message_group); + + ReaderProxyData reader_attributes(0, 0); + reader_attributes.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; + rproxy.start(reader_attributes); + + ChangeForReader_t change(&seq); + rproxy.add_change(change, true, false); + + SequenceNumberSet_t sequence_number_set({0, 1}); + sequence_number_set.add({0, 1}); + rproxy.from_unsent_to_status(seq.sequenceNumber, UNACKNOWLEDGED, false, false); + rproxy.requested_changes_set(sequence_number_set, gap_builder, seq.sequenceNumber); + + // The number of sent fragments should be higher than the FragmentNumberSet_t size. + constexpr FragmentNumber_t NUMBER_OF_SENT_FRAGMENTS = 259; + + for (auto i = 1u; i <= NUMBER_OF_SENT_FRAGMENTS; ++i) + { + ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, i), i); + } + + // Set the change status to UNSENT. + rproxy.perform_acknack_response(nullptr); + + // Handle the first portion of undelivered fragments. + { + std::vector undelivered_fragments = {3, 6, 8}; + FragmentNumberSet_t undelivered_fragment_set(undelivered_fragments.front()); + for (auto fragment: undelivered_fragments) + { + undelivered_fragment_set.add(fragment); + } + rproxy.process_nack_frag({}, 1, seq.sequenceNumber, undelivered_fragment_set); + + // Nack data should be ignored: first, complete the sequential delivering of the remaining fragments. + for (auto i = NUMBER_OF_SENT_FRAGMENTS + 1u; i <= TOTAL_NUMBER_OF_FRAGMENTS; ++i) + { + ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, i), i); + } + + // Mark the change as sent, i.e. all fragments were sent once. + rproxy.from_unsent_to_status(seq.sequenceNumber, UNACKNOWLEDGED, false, true); + + rproxy.process_nack_frag({}, 2, seq.sequenceNumber, undelivered_fragment_set); + + // After the change is marked as delivered, nack data can be processed. + for (auto fragment: undelivered_fragments) + { + ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, fragment), fragment); + } + } + + // All fragments are marked as sent. + ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, + TOTAL_NUMBER_OF_FRAGMENTS + 1u), TOTAL_NUMBER_OF_FRAGMENTS + 1u); + + // Handle undelivered fragments that are from a different window. + { + std::vector undelivered_fragments = {301, 399}; + FragmentNumberSet_t undelivered_fragment_set(undelivered_fragments.front()); + for (auto fragment: undelivered_fragments) + { + undelivered_fragment_set.add(fragment); + } + rproxy.process_nack_frag({}, 3, seq.sequenceNumber, undelivered_fragment_set); + + // After the change is marked as delivered, nack data can be processed. + for (auto fragment: undelivered_fragments) + { + ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, fragment), fragment); + } + } + + // All fragments are marked as sent. + ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, + TOTAL_NUMBER_OF_FRAGMENTS + 1u), TOTAL_NUMBER_OF_FRAGMENTS + 1u); +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima