Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21065] Bugfix: correct liveliness state in a multiple reader - one writer scenario (backport #4822) #4882

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions include/fastdds/rtps/writer/LivelinessManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,17 @@ class LivelinessManager

/**
* @brief Removes a writer
* @param guid GUID of the writer
* @param kind Liveliness kind
* @param lease_duration Liveliness lease duration
* @param [in] guid GUID of the writer
* @param [in] kind Liveliness kind
* @param [in] lease_duration Liveliness lease duration
* @param [out] writer_liveliness_status The liveliness status of the writer
* @return True if the writer was successfully removed
*/
bool remove_writer(
GUID_t guid,
LivelinessQosPolicyKind kind,
Duration_t lease_duration);
fastdds::dds::LivelinessQosPolicyKind kind,
Duration_t lease_duration,
LivelinessData::WriterStatus& writer_liveliness_status);

/**
* @brief Asserts liveliness of a writer in the set
Expand Down
8 changes: 6 additions & 2 deletions src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,8 @@ bool WLP::remove_local_writer(

EPROSIMA_LOG_INFO(RTPS_LIVELINESS, W->getGuid().entityId << " from Liveliness Protocol");

LivelinessData::WriterStatus writer_status;

if (W->get_liveliness_kind() == AUTOMATIC_LIVELINESS_QOS)
{
auto it = std::find(
Expand Down Expand Up @@ -764,7 +766,8 @@ bool WLP::remove_local_writer(
if (!pub_liveliness_manager_->remove_writer(
W->getGuid(),
W->get_liveliness_kind(),
W->get_liveliness_lease_duration()))
W->get_liveliness_lease_duration(),
writer_status))
{
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
"Could not remove writer " << W->getGuid() << " from liveliness manager");
Expand Down Expand Up @@ -807,7 +810,8 @@ bool WLP::remove_local_writer(
if (!pub_liveliness_manager_->remove_writer(
W->getGuid(),
W->get_liveliness_kind(),
W->get_liveliness_lease_duration()))
W->get_liveliness_lease_duration(),
writer_status))
{
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
"Could not remove writer " << W->getGuid() << " from liveliness manager");
Expand Down
14 changes: 13 additions & 1 deletion src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,22 @@ bool StatefulReader::matched_writer_remove(
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_);
liveliness_lease_duration_,
writer_liveliness_status);

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}

}
else
{
Expand Down
13 changes: 12 additions & 1 deletion src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,21 @@ bool StatelessReader::matched_writer_remove(
auto wlp = mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_);
liveliness_lease_duration_,
writer_liveliness_status);

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}
}
else
{
Expand Down
22 changes: 5 additions & 17 deletions src/cpp/rtps/writer/LivelinessManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,21 @@ bool LivelinessManager::add_writer(

bool LivelinessManager::remove_writer(
GUID_t guid,
LivelinessQosPolicyKind kind,
Duration_t lease_duration)
fastdds::dds::LivelinessQosPolicyKind kind,
Duration_t lease_duration,
LivelinessData::WriterStatus& writer_status)
{
bool removed = false;
LivelinessData::WriterStatus status;

{
// collection guard
std::lock_guard<shared_mutex> _(col_mutex_);
// writers_ elements guard
std::lock_guard<std::mutex> __(mutex_);

removed = writers_.remove_if([guid, kind, lease_duration, &status](LivelinessData& writer)
removed = writers_.remove_if([guid, kind, lease_duration, &writer_status](LivelinessData& writer)
{
status = writer.status;
writer_status = writer.status;
return writer.guid == guid &&
writer.kind == kind &&
writer.lease_duration == lease_duration &&
Expand All @@ -118,18 +118,6 @@ bool LivelinessManager::remove_writer(
return false;
}

if (callback_ != nullptr)
{
if (status == LivelinessData::WriterStatus::ALIVE)
{
callback_(guid, kind, lease_duration, -1, 0);
}
else if (status == LivelinessData::WriterStatus::NOT_ALIVE)
{
callback_(guid, kind, lease_duration, 0, -1);
}
}

std::unique_lock<std::mutex> lock(mutex_);

if (timer_owner_ != nullptr)
Expand Down
41 changes: 41 additions & 0 deletions test/blackbox/common/BlackboxTestsLivelinessQos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1998,6 +1998,47 @@ TEST(LivelinessTests, Reader_Successfully_Asserts_Liveliness_on_a_Disconnected_W
ASSERT_EQ(reader.sub_wait_liveliness_lost_for(1, std::chrono::seconds(4)), 1u);
}

// Regression test of Refs #21065, github issue #4610
TEST(LivelinessTests, correct_liveliness_state_one_writer_multiple_readers)
{
uint8_t num_readers = 2;

// Create one writer participant
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);

// Create a reader participant containing 2 readers
PubSubParticipant<HelloWorldPubSubType> reader(0, num_readers, 0, num_readers);

reader.init_participant();
// Define the reader's lease duration in 1.6 secs
reader.sub_liveliness_lease_duration(eprosima::fastrtps::Time_t(1, 600000000));
// Both readers on the same topic
reader.sub_topic_name(TEST_TOPIC_NAME);

for (size_t i = 0; i < num_readers; i++)
{
// Create Subscribers and readers, one for each writer
reader.init_subscriber(static_cast<unsigned int>(i));
}

// Create writers
writer.lease_duration(c_TimeInfinite, 1)
.liveliness_lease_duration(eprosima::fastrtps::Time_t(1, 0))
.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_announcement_period(eprosima::fastrtps::Time_t(0, 500000000))
.init();

// Wait for discovery to occur. Liveliness should be recovered twice,
// one per matched reader.
reader.sub_wait_liveliness_recovered(2);

// Destroy the writer
writer.destroy();

// After 1.6 secs, we should receive a on_liveliness_changed(status lost) on the two readers
ASSERT_EQ(reader.sub_wait_liveliness_lost_for(2, std::chrono::seconds(4)), 2u);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
40 changes: 26 additions & 14 deletions test/unittest/rtps/writer/LivelinessManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,28 @@ TEST_F(LivelinessManagerTests, WriterCannotBeRemovedTwice)
GuidPrefix_t guidP;
guidP.value[0] = 1;
GUID_t guid(guidP, 0);

EXPECT_EQ(liveliness_manager.add_writer(guid, AUTOMATIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, AUTOMATIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, AUTOMATIC_LIVELINESS_QOS, Duration_t(1)), false);

EXPECT_EQ(liveliness_manager.add_writer(guid, MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(1)), false);

EXPECT_EQ(liveliness_manager.add_writer(guid, MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1)), false);
LivelinessData::WriterStatus writer_status;

EXPECT_EQ(liveliness_manager.add_writer(guid, fastdds::dds::AUTOMATIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::AUTOMATIC_LIVELINESS_QOS, Duration_t(1),
writer_status), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::AUTOMATIC_LIVELINESS_QOS, Duration_t(1),
writer_status), false);

EXPECT_EQ(liveliness_manager.add_writer(guid, fastdds::dds::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(
1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(
1), writer_status), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(
1), writer_status), false);

EXPECT_EQ(liveliness_manager.add_writer(guid, fastdds::dds::MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1),
writer_status),
true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1),
writer_status),
false);
}

//! Tests that the assert_liveliness() method that takes liveliness kind as argument sets the alive state and time
Expand Down Expand Up @@ -479,12 +489,14 @@ TEST_F(LivelinessManagerTests, TimerOwnerRemoved)

GuidPrefix_t guidP;
guidP.value[0] = 1;
LivelinessData::WriterStatus writer_status;

liveliness_manager.add_writer(GUID_t(guidP, 1), AUTOMATIC_LIVELINESS_QOS, Duration_t(0.5));
liveliness_manager.add_writer(GUID_t(guidP, 2), AUTOMATIC_LIVELINESS_QOS, Duration_t(1));

liveliness_manager.assert_liveliness(GUID_t(guidP, 1), AUTOMATIC_LIVELINESS_QOS, Duration_t(0.5));
liveliness_manager.remove_writer(GUID_t(guidP, 1), AUTOMATIC_LIVELINESS_QOS, Duration_t(0.5));
liveliness_manager.assert_liveliness(GUID_t(guidP, 1), fastdds::dds::AUTOMATIC_LIVELINESS_QOS, Duration_t(0.5));
liveliness_manager.remove_writer(GUID_t(guidP, 1), fastdds::dds::AUTOMATIC_LIVELINESS_QOS, Duration_t(
0.5), writer_status);

wait_liveliness_lost(1u);
EXPECT_EQ(writer_losing_liveliness, GUID_t(guidP, 2));
Expand Down
Loading