Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14430] Fix freezes in the RTPS level when transferring large files (backport #2654) #2711

Merged
merged 1 commit into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 28 additions & 39 deletions include/fastdds/rtps/writer/ChangeForReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,7 @@ class ChangeForReader_t

public:

ChangeForReader_t() = delete;

ChangeForReader_t(
const ChangeForReader_t& ch)
: status_(ch.status_)
, seq_num_(ch.seq_num_)
, change_(ch.change_)
, unsent_fragments_(ch.unsent_fragments_)
{
}

//TODO(Ricardo) Temporal
//ChangeForReader_t(const CacheChange_t* change) : status_(UNSENT),
ChangeForReader_t(
explicit ChangeForReader_t(
CacheChange_t* change)
: status_(UNSENT)
, seq_num_(change->sequenceNumber)
Expand All @@ -80,20 +67,6 @@ class ChangeForReader_t
}
}

~ChangeForReader_t()
{
}

ChangeForReader_t& operator =(
const ChangeForReader_t& ch)
{
status_ = ch.status_;
seq_num_ = ch.seq_num_;
change_ = ch.change_;
unsent_fragments_ = ch.unsent_fragments_;
return *this;
}

/**
* Get the cache change
* @return Cache change
Expand Down Expand Up @@ -150,7 +123,9 @@ class ChangeForReader_t
{
unsent_fragments_.remove(sentFragment);

if (!unsent_fragments_.empty() && unsent_fragments_.max() < change_->getFragmentCount())
// We only use the running window mechanism during the first stage, until all fragments have been delivered
// once, and we consider the whole change as delivered.
if (!delivered_ && !unsent_fragments_.empty() && (unsent_fragments_.max() < change_->getFragmentCount()))
{
FragmentNumber_t base = unsent_fragments_.base();
FragmentNumber_t max = unsent_fragments_.max();
Expand All @@ -168,17 +143,31 @@ class ChangeForReader_t
void markFragmentsAsUnsent(
const FragmentNumberSet_t& unsentFragments)
{
FragmentNumber_t other_base = unsentFragments.base();
if (other_base < unsent_fragments_.base())
// Ignore NACK_FRAG messages during the first stage, until all fragments have been delivered once, and we
// consider the whole change as delivered.
if (delivered_)
{
unsent_fragments_.base_update(other_base);
}
unsentFragments.for_each(
[this](
FragmentNumber_t element)
if (unsent_fragments_.empty())
{
// Current window is empty, so we can set it to the received one.
unsent_fragments_ = unsentFragments;
}
else
{
unsent_fragments_.add(element);
});
// Update window to send the lowest possible requested fragments first.
FragmentNumber_t other_base = unsentFragments.base();
if (other_base < unsent_fragments_.base())
{
unsent_fragments_.base_update(other_base);
}
unsentFragments.for_each(
[this](
FragmentNumber_t element)
{
unsent_fragments_.add(element);
});
}
}
}

bool has_been_delivered() const
Expand All @@ -203,7 +192,7 @@ class ChangeForReader_t

FragmentNumberSet_t unsent_fragments_;

//! Indicates if was delivered at least one time.
//! Indicates if was delivered at least once.
bool delivered_ = false;
};

Expand Down
45 changes: 40 additions & 5 deletions test/blackbox/common/BlackboxTestsPubSubFragments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ class PubSubFragments : public testing::TestWithParam<test_params>
bool reliable,
bool volatile_reader,
bool volatile_writer,
bool small_fragments)
bool small_fragments,
uint32_t loss_rate = 0)
{
PubSubReader<Data1mbPubSubType> reader(topic_name);
PubSubWriter<Data1mbPubSubType> writer(topic_name);
uint32_t fragment_count = 0;

reader
.socket_buffer_size(1048576) // accomodate large and fast fragments
Expand All @@ -104,12 +106,33 @@ class PubSubFragments : public testing::TestWithParam<test_params>

ASSERT_TRUE(reader.isInitialized());

if (small_fragments)
if (small_fragments || 0 < loss_rate)
{
auto testTransport = std::make_shared<UDPv4TransportDescriptor>();
testTransport->sendBufferSize = 1024;
testTransport->maxMessageSize = 1024;
auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();

testTransport->receiveBufferSize = 65536;
if (small_fragments)
{
testTransport->sendBufferSize = 1024;
testTransport->maxMessageSize = 1024;
}
if (0 < loss_rate)
{
testTransport->drop_data_frag_messages_filter_ =
[&fragment_count, loss_rate](eprosima::fastrtps::rtps::CDRMessage_t& msg)->bool
{
static_cast<void>(msg);

++fragment_count;
if (fragment_count >= loss_rate)
{
fragment_count = 0;
}

return 1ul == fragment_count;
};
}

writer.disable_builtin_transport();
writer.add_user_transport_to_pparams(testTransport);
}
Expand Down Expand Up @@ -228,6 +251,12 @@ TEST_P(PubSubFragments, PubSubAsReliableTransientLocalData300kbSmallFragments)
do_fragment_test(TEST_TOPIC_NAME, data, false, true, false, false, true);
}

TEST_P(PubSubFragments, PubSubAsReliableTransientLocalData300kbSmallFragmentsLossy)
{
auto data = default_data300kb_data_generator();
do_fragment_test(TEST_TOPIC_NAME, data, false, true, false, false, true, 260);
}

TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb)
{
auto data = default_data300kb_data_generator();
Expand Down Expand Up @@ -300,6 +329,12 @@ TEST_P(PubSubFragments, AsyncPubSubAsReliableTransientLocalData300kbSmallFragmen
do_fragment_test(TEST_TOPIC_NAME, data, true, true, false, false, true);
}

TEST_P(PubSubFragments, AsyncPubSubAsReliableTransientLocalData300kbSmallFragmentsLossy)
{
auto data = default_data300kb_data_generator();
do_fragment_test(TEST_TOPIC_NAME, data, true, true, false, false, true, 260);
}

class PubSubFragmentsLimited : public testing::TestWithParam<eprosima::fastdds::rtps::FlowControllerSchedulerPolicy>
{
public:
Expand Down
182 changes: 182 additions & 0 deletions test/unittest/rtps/writer/ReaderProxyTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,188 @@ TEST(ReaderProxyTests, requested_changes_set_test)
rproxy.requested_changes_set(set, gap_builder, {0, 1});
}

FragmentNumber_t mark_next_fragment_sent(
ReaderProxy& rproxy,
SequenceNumber_t sequence_number,
FragmentNumber_t expected_fragment)
{
FragmentNumber_t next_fragment{expected_fragment};
SequenceNumber_t gap_seq;
bool need_reactivate_periodic_heartbeat;
rproxy.change_is_unsent(sequence_number, next_fragment, gap_seq, need_reactivate_periodic_heartbeat);
if (next_fragment != expected_fragment)
{
return next_fragment;
}

bool was_last_fragment;
rproxy.mark_fragment_as_sent_for_change(sequence_number, next_fragment, was_last_fragment);
return next_fragment;
}

TEST(ReaderProxyTests, process_nack_frag_single_fragment_different_windows_test)
{
constexpr FragmentNumber_t TOTAL_NUMBER_OF_FRAGMENTS = 400;
constexpr uint16_t FRAGMENT_SIZE = 100;

StatefulWriter writerMock;
WriterTimes wTimes;
RemoteLocatorsAllocationAttributes alloc;
ReaderProxy rproxy(wTimes, alloc, &writerMock);
CacheChange_t seq;
seq.sequenceNumber = {0, 1};
seq.serializedPayload.length = TOTAL_NUMBER_OF_FRAGMENTS * FRAGMENT_SIZE;
seq.setFragmentSize(FRAGMENT_SIZE);

RTPSMessageGroup message_group(nullptr, false);
RTPSGapBuilder gap_builder(message_group);

ReaderProxyData reader_attributes(0, 0);
reader_attributes.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
rproxy.start(reader_attributes);

ChangeForReader_t change(&seq);
rproxy.add_change(change, true, false);

SequenceNumberSet_t sequence_number_set({0, 1});
sequence_number_set.add({0, 1});
rproxy.from_unsent_to_status(seq.sequenceNumber, UNACKNOWLEDGED, false, false);
rproxy.requested_changes_set(sequence_number_set, gap_builder, seq.sequenceNumber);

// The number of sent fragments should be higher than the FragmentNumberSet_t size.
constexpr FragmentNumber_t NUMBER_OF_SENT_FRAGMENTS = 259;

for (auto i = 1u; i <= NUMBER_OF_SENT_FRAGMENTS; ++i)
{
ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, i), i);
}

// Set the change status to UNSENT.
rproxy.perform_acknack_response(nullptr);

// The difference between the latest sent fragment and an undelivered fragment should also be higher than
// the FragmentNumberSet_t size.
constexpr FragmentNumber_t UNDELIVERED_FRAGMENT = 3;
FragmentNumberSet_t undelivered_fragment_set(UNDELIVERED_FRAGMENT);
undelivered_fragment_set.add(UNDELIVERED_FRAGMENT);

rproxy.process_nack_frag({}, 1, seq.sequenceNumber, undelivered_fragment_set);

// Nack data should be ignored: first, complete the sequential delivering of the remaining fragments.
for (auto i = NUMBER_OF_SENT_FRAGMENTS + 1u; i <= TOTAL_NUMBER_OF_FRAGMENTS; ++i)
{
ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, i), i);
}

// Mark the change as sent, i.e. all fragments were sent once.
rproxy.from_unsent_to_status(seq.sequenceNumber, UNACKNOWLEDGED, false, true);

// After the change is marked as delivered, nack data can be processed.
rproxy.process_nack_frag({}, 2, seq.sequenceNumber, undelivered_fragment_set);

// Now, send the fragments that were reported as undelivered.
ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber,
UNDELIVERED_FRAGMENT), UNDELIVERED_FRAGMENT);

// All fragments are marked as sent.
ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber,
TOTAL_NUMBER_OF_FRAGMENTS + 1u), TOTAL_NUMBER_OF_FRAGMENTS + 1u);
}

TEST(ReaderProxyTests, process_nack_frag_multiple_fragments_different_windows_test)
{
constexpr FragmentNumber_t TOTAL_NUMBER_OF_FRAGMENTS = 400;
constexpr uint16_t FRAGMENT_SIZE = 100;

StatefulWriter writerMock;
WriterTimes wTimes;
RemoteLocatorsAllocationAttributes alloc;
ReaderProxy rproxy(wTimes, alloc, &writerMock);
CacheChange_t seq;
seq.sequenceNumber = {0, 1};
seq.serializedPayload.length = TOTAL_NUMBER_OF_FRAGMENTS * FRAGMENT_SIZE;
seq.setFragmentSize(FRAGMENT_SIZE);

RTPSMessageGroup message_group(nullptr, false);
RTPSGapBuilder gap_builder(message_group);

ReaderProxyData reader_attributes(0, 0);
reader_attributes.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
rproxy.start(reader_attributes);

ChangeForReader_t change(&seq);
rproxy.add_change(change, true, false);

SequenceNumberSet_t sequence_number_set({0, 1});
sequence_number_set.add({0, 1});
rproxy.from_unsent_to_status(seq.sequenceNumber, UNACKNOWLEDGED, false, false);
rproxy.requested_changes_set(sequence_number_set, gap_builder, seq.sequenceNumber);

// The number of sent fragments should be higher than the FragmentNumberSet_t size.
constexpr FragmentNumber_t NUMBER_OF_SENT_FRAGMENTS = 259;

for (auto i = 1u; i <= NUMBER_OF_SENT_FRAGMENTS; ++i)
{
ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, i), i);
}

// Set the change status to UNSENT.
rproxy.perform_acknack_response(nullptr);

// Handle the first portion of undelivered fragments.
{
std::vector<FragmentNumber_t> undelivered_fragments = {3, 6, 8};
FragmentNumberSet_t undelivered_fragment_set(undelivered_fragments.front());
for (auto fragment: undelivered_fragments)
{
undelivered_fragment_set.add(fragment);
}
rproxy.process_nack_frag({}, 1, seq.sequenceNumber, undelivered_fragment_set);

// Nack data should be ignored: first, complete the sequential delivering of the remaining fragments.
for (auto i = NUMBER_OF_SENT_FRAGMENTS + 1u; i <= TOTAL_NUMBER_OF_FRAGMENTS; ++i)
{
ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, i), i);
}

// Mark the change as sent, i.e. all fragments were sent once.
rproxy.from_unsent_to_status(seq.sequenceNumber, UNACKNOWLEDGED, false, true);

rproxy.process_nack_frag({}, 2, seq.sequenceNumber, undelivered_fragment_set);

// After the change is marked as delivered, nack data can be processed.
for (auto fragment: undelivered_fragments)
{
ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, fragment), fragment);
}
}

// All fragments are marked as sent.
ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber,
TOTAL_NUMBER_OF_FRAGMENTS + 1u), TOTAL_NUMBER_OF_FRAGMENTS + 1u);

// Handle undelivered fragments that are from a different window.
{
std::vector<FragmentNumber_t> undelivered_fragments = {301, 399};
FragmentNumberSet_t undelivered_fragment_set(undelivered_fragments.front());
for (auto fragment: undelivered_fragments)
{
undelivered_fragment_set.add(fragment);
}
rproxy.process_nack_frag({}, 3, seq.sequenceNumber, undelivered_fragment_set);

// After the change is marked as delivered, nack data can be processed.
for (auto fragment: undelivered_fragments)
{
ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber, fragment), fragment);
}
}

// All fragments are marked as sent.
ASSERT_EQ(mark_next_fragment_sent(rproxy, seq.sequenceNumber,
TOTAL_NUMBER_OF_FRAGMENTS + 1u), TOTAL_NUMBER_OF_FRAGMENTS + 1u);
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
Expand Down