Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14770] SampleRejectedStatus feature #2708

Merged
merged 22 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/fastdds/rtps/common/CacheChange.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ struct RTPS_DllAPI CacheChange_t
return first_missing_fragment_ >= fragment_count_;
}

/*! Checks if the first fragment is present.
* @return true when it contains the first fragment. In other case, false.
*/
bool contains_first_fragment()
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
{
return 0 < first_missing_fragment_;
}

/*!
* Fills a FragmentNumberSet_t with the list of missing fragments.
* @param [out] frag_sns FragmentNumberSet_t where result is stored.
Expand Down
39 changes: 39 additions & 0 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <fastdds/rtps/history/History.h>
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/dds/core/status/SampleRejectedStatus.hpp>

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -85,6 +86,25 @@ class ReaderHistory : public History
CacheChange_t* change,
size_t unknown_missing_changes_up_to);

/**
* Virtual method that is called when a new change is received.
* In this implementation this method just calls add_change. The user can overload this method in case
* he needs to perform additional checks before adding the change.
* @param[in] change Pointer to the change
* @param[in] unknown_missing_changes_up_to The number of changes from the same writer with a lower sequence number that
* could potentially be received in the future.
* @param[out] rejection_reason In case of been rejected the sample, it will contain the reason of the rejection.
* @return True if added.
*/
RTPS_DllAPI virtual bool received_change(
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
CacheChange_t* change,
size_t unknown_missing_changes_up_to,
fastdds::dds::SampleRejectedStatusKind& rejection_reason)
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
{
rejection_reason = fastdds::dds::NOT_REJECTED;
return received_change(change, unknown_missing_changes_up_to);
}

/**
* Called when a fragmented change is received completely by the Subscriber. Will find its instance and store it.
* @pre Change should be already present in the history.
Expand All @@ -98,6 +118,25 @@ class ReaderHistory : public History
return true;
}

/**
* Called when a fragmented change is received completely by the Subscriber. Will find its instance and store it.
* @pre Change should be already present in the history.
* @param[in] change The received change
* @param[in] unknown_missing_changes_up_to Number of missing changes before this one
* @param[out] rejection_reason In case of been rejected the sample, it will contain the reason of the rejection.
* @return
*/
RTPS_DllAPI virtual bool completed_change(
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
CacheChange_t* change,
size_t unknown_missing_changes_up_to,
fastdds::dds::SampleRejectedStatusKind& rejection_reason)
{
(void)change;
(void)unknown_missing_changes_up_to;
(void)rejection_reason;
return true;
}

/**
* Add a CacheChange_t to the ReaderHistory.
* @param a_change Pointer to the CacheChange to add.
Expand Down
18 changes: 18 additions & 0 deletions include/fastdds/rtps/reader/ReaderListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <fastdds/dds/core/status/LivelinessChangedStatus.hpp>
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
#include <fastdds/dds/core/status/SampleRejectedStatus.hpp>
#include <fastdds/rtps/builtin/data/WriterProxyData.h>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/MatchingInfo.h>
Expand Down Expand Up @@ -152,6 +153,23 @@ class RTPS_DllAPI ReaderListener
static_cast<void>(writer_info);
}

/**
* This method is called when the reader rejects a samples.
*
* @param reader Pointer to the RTPSReader.
* @param change Pointer to the CacheChange_t. This is a const pointer to const data
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
* to indicate that the user should not dispose of this data himself.
*/
virtual void on_sample_rejected(
RTPSReader* reader,
eprosima::fastdds::dds::SampleRejectedStatusKind reason,
const CacheChange_t* const change)
{
static_cast<void>(reader);
static_cast<void>(reason);
static_cast<void>(change);
}

};

} // namespace rtps
Expand Down
6 changes: 1 addition & 5 deletions src/cpp/fastdds/subscriber/DataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,7 @@ ReturnCode_t DataReader::get_sample_lost_status(
ReturnCode_t DataReader::get_sample_rejected_status(
SampleRejectedStatus& status) const
{
static_cast<void> (status);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_sample_rejected_status(status);
*/
return impl_->get_sample_rejected_status(status);
}

ReturnCode_t DataReader::get_subscription_matched_status(
Expand Down
60 changes: 49 additions & 11 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,25 @@ void DataReaderImpl::InnerDataReaderListener::on_sample_lost(
data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true);
}

void DataReaderImpl::InnerDataReaderListener::on_sample_rejected(
RTPSReader* /*reader*/,
SampleRejectedStatusKind reason,
const CacheChange_t* const change_in)
{
data_reader_->update_sample_rejected_status(reason, change_in);
StatusMask notify_status = StatusMask::sample_rejected();
DataReaderListener* listener = data_reader_->get_listener_for(notify_status);
if (listener != nullptr)
{
SampleRejectedStatus callback_status;
if (data_reader_->get_sample_rejected_status(callback_status) == ReturnCode_t::RETCODE_OK)
{
listener->on_sample_rejected(data_reader_->user_datareader_, callback_status);
}
}
data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true);
}

bool DataReaderImpl::on_new_cache_change_added(
const CacheChange_t* const change)
{
Expand Down Expand Up @@ -1208,16 +1227,24 @@ ReturnCode_t DataReaderImpl::get_sample_lost_status(
return ReturnCode_t::RETCODE_OK;
}

/* TODO
bool DataReaderImpl::get_sample_rejected_status(
SampleRejectedStatus& status) const
{
(void)status;
// TODO Implement
// TODO add callback call subscriber_->subscriber_listener_->on_sample_rejected
return false;
}
*/
ReturnCode_t DataReaderImpl::get_sample_rejected_status(
SampleRejectedStatus& status)
{
if (reader_ == nullptr)
{
return ReturnCode_t::RETCODE_NOT_ENABLED;
}

{
std::lock_guard<RecursiveTimedMutex> lock(reader_->getMutex());

status = sample_rejected_status_;
sample_rejected_status_.total_count_change = 0u;
}

user_datareader_->get_statuscondition().get_impl()->set_status(StatusMask::sample_rejected(), false);
return ReturnCode_t::RETCODE_OK;
}

const Subscriber* DataReaderImpl::get_subscriber() const
{
Expand Down Expand Up @@ -1277,7 +1304,7 @@ LivelinessChangedStatus& DataReaderImpl::update_liveliness_status(
return liveliness_changed_status_;
}

SampleLostStatus& DataReaderImpl::update_sample_lost_status(
const SampleLostStatus& DataReaderImpl::update_sample_lost_status(
int32_t sample_lost_since_last_update)
{
sample_lost_status_.total_count += sample_lost_since_last_update;
Expand Down Expand Up @@ -1690,6 +1717,17 @@ InstanceHandle_t DataReaderImpl::lookup_instance(
return handle;
}

const SampleRejectedStatus& DataReaderImpl::update_sample_rejected_status(
SampleRejectedStatusKind reason,
const CacheChange_t* const change_in)
{
++sample_rejected_status_.total_count;
++sample_rejected_status_.total_count_change;
sample_rejected_status_.last_reason = reason;
sample_rejected_status_.last_instance_handle = change_in->instanceHandle;
return sample_rejected_status_;
}

} /* namespace dds */
} /* namespace fastdds */
} /* namespace eprosima */
30 changes: 26 additions & 4 deletions src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,15 @@ class DataReaderImpl
ReturnCode_t get_sample_lost_status(
fastdds::dds::SampleLostStatus& status);

/* TODO
bool get_sample_rejected_status(
fastrtps::SampleRejectedStatus& status) const;
/*!
* @brief Get the SAMPLE_REJECTED communication status
*
* @param[out] status SampleRejectedStatus object where the status is returned.
*
* @return RETCODE_OK
*/
ReturnCode_t get_sample_rejected_status(
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
fastrtps::SampleRejectedStatus& status);

const Subscriber* get_subscriber() const;

Expand Down Expand Up @@ -383,6 +388,11 @@ class DataReaderImpl
fastrtps::rtps::RTPSReader* reader,
int32_t sample_lost_since_last_update) override;

void on_sample_rejected(
fastrtps::rtps::RTPSReader* reader,
SampleRejectedStatusKind reason,
const fastrtps::rtps::CacheChange_t* const change) override;

DataReaderImpl* data_reader_;
}
reader_listener_;
Expand Down Expand Up @@ -410,6 +420,8 @@ class DataReaderImpl

//! Sample lost status
SampleLostStatus sample_lost_status_;
//! Sample rejected status
SampleRejectedStatus sample_rejected_status_;

//! A timed callback to remove expired samples
fastrtps::rtps::TimedEvent* lifespan_timer_ = nullptr;
Expand Down Expand Up @@ -491,9 +503,19 @@ class DataReaderImpl
LivelinessChangedStatus& update_liveliness_status(
const fastrtps::LivelinessChangedStatus& status);

SampleLostStatus& update_sample_lost_status(
const SampleLostStatus& update_sample_lost_status(
int32_t sample_lost_since_last_update);

/*!
* @brief Update SampleRejectedStatus with information about a new rejected sample.
*
* @param[in] Reason why the new sample was rejected.
* @param[in] New sample which was rejected.
*/
const SampleRejectedStatus& update_sample_rejected_status(
SampleRejectedStatusKind reason,
const fastrtps::rtps::CacheChange_t* const change_in);

/**
* Returns the most appropriate listener to handle the callback for the given status,
* or nullptr if there is no appropriate listener.
Expand Down
Loading