Skip to content

Commit

Permalink
Refs #21293: StatefulReader StatelessReader using LocalReaderPointers
Browse files Browse the repository at this point in the history
Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL committed Oct 3, 2024
1 parent 0eaf1e7 commit be38120
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
29 changes: 18 additions & 11 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,31 +405,37 @@ bool StatefulWriter::intraprocess_delivery(
CacheChange_t* change,
ReaderProxy* reader_proxy)
{
BaseReader* reader = reader_proxy->local_reader();
if (reader)
bool ret = false;
LocalReaderPointer local_reader_pointer = reader_proxy->local_reader();

if (local_reader_pointer.is_valid())
{
if (change->write_params.related_sample_identity() != SampleIdentity::unknown())
{
change->write_params.sample_identity(change->write_params.related_sample_identity());
}
return reader->process_data_msg(change);
ret = local_reader_pointer->process_data_msg(change);
}
return false;

return ret;
}

bool StatefulWriter::intraprocess_gap(
ReaderProxy* reader_proxy,
const SequenceNumber_t& first_seq,
const SequenceNumber_t& last_seq)
{
RTPSReader* reader = reader_proxy->local_reader();
if (reader)
bool ret = false;

LocalReaderPointer local_reader_pt = reader_proxy->local_reader();

if (local_reader_pt.is_valid())
{
return BaseReader::downcast(reader)->process_gap_msg(
ret = local_reader_pt->process_gap_msg(
m_guid, first_seq, SequenceNumberSet_t(last_seq), c_VendorId_eProsima);
}

return false;
return ret;
}

bool StatefulWriter::intraprocess_heartbeat(
Expand All @@ -439,10 +445,11 @@ bool StatefulWriter::intraprocess_heartbeat(
bool returned_value = false;

std::lock_guard<RecursiveTimedMutex> guardW(mp_mutex);
RTPSReader* reader = RTPSDomainImpl::find_local_reader(reader_proxy->guid());
LocalReaderPointer local_reader_pt = reader_proxy->local_reader();

if (reader)
if (local_reader_pt.is_valid())
{
std::lock_guard<RecursiveTimedMutex> guardW(mp_mutex);
SequenceNumber_t first_seq = get_seq_num_min();
SequenceNumber_t last_seq = get_seq_num_max();

Expand All @@ -459,7 +466,7 @@ bool StatefulWriter::intraprocess_heartbeat(
(liveliness || reader_proxy->has_changes()))
{
increment_hb_count();
returned_value = BaseReader::downcast(reader)->process_heartbeat_msg(
returned_value = local_reader_pt->process_heartbeat_msg(
m_guid, heartbeat_count_, first_seq, last_seq, true, liveliness, c_VendorId_eProsima);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/rtps/writer/StatelessWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,16 @@ bool StatelessWriter::intraprocess_delivery(
CacheChange_t* change,
ReaderLocator& reader_locator)
{
RTPSReader* reader = reader_locator.local_reader();
LocalReaderPointer local_reader_pointer = reader_locator.local_reader();

if (reader &&
if (local_reader_pointer.is_valid() &&
(!reader_data_filter_ || reader_data_filter_->is_relevant(*change, reader_locator.remote_guid())))
{
if (change->write_params.related_sample_identity() != SampleIdentity::unknown())
{
change->write_params.sample_identity(change->write_params.related_sample_identity());
}
return BaseReader::downcast(reader)->process_data_msg(change);
return local_reader_pointer->process_data_msg(change);
}

return false;
Expand Down

0 comments on commit be38120

Please sign in to comment.