Skip to content

Commit

Permalink
Fix selection of output locators (#3003)
Browse files Browse the repository at this point in the history
* Refs #15839. Adding test_UDPv4Transport::should_drop_locator.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Adding destination locator filter to test_UDPv4Transport

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Avoid calling filters for packets that will be dropped by parent transport.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Add method set initial announcements to PubSubReader.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Add method set initial announcements to PubSubWriter.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Regression test added.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Call select_locators before asking on the number of locators selected.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Call select_locators before checking for global gap.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Do not reset locator selector inside deliver_sample_nts.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Avoid reseting locator selector on send_heartbeat_piggyback_nts_.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Improvements on StatelessWriter

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15839. Improvements on StatefulWriter

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15896. Avoid arithmetic overflow on limitation check.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15896. Apply suggestions from code review.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #15896. Add local filter for locators.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
MiguelCompany authored Oct 13, 2022
1 parent a0b274a commit 23a788d
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct test_UDPv4TransportDescriptor : public SocketTransportDescriptor
{
//! Custom message filtering functions
typedef std::function<bool (fastrtps::rtps::CDRMessage_t& msg)> filter;
//! Locator filtering function
typedef std::function<bool (const Locator& destination)> DestinationLocatorFilter;

//! Test shim parameters
//! Percentage of data messages being dropped
Expand Down Expand Up @@ -67,6 +69,9 @@ struct test_UDPv4TransportDescriptor : public SocketTransportDescriptor
// General filtering function for all kind of messages (indiscriminate)
filter messages_filter_;

//! Filtering function for dropping messages to specific destinations
DestinationLocatorFilter locator_filter_;

//! Vector containing the message's sequence numbers being dropped
std::vector<fastrtps::rtps::SequenceNumber_t> sequenceNumberDataMessagesToDrop;

Expand Down
5 changes: 4 additions & 1 deletion include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,11 @@ class StatefulWriter : public RTPSWriter
LocatorSelectorSender& locator_selector,
bool create_sender_resources);

void select_all_readers_nts(
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector);

void send_heartbeat_piggyback_nts_(
ReaderProxy* reader,
RTPSMessageGroup& message_group,
LocatorSelectorSender& locator_selector,
uint32_t& last_bytes_processed);
Expand Down
22 changes: 18 additions & 4 deletions src/cpp/rtps/messages/RTPSMessageGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ class CacheChangeInlineQoSWriter final : public InlineQosWriter

};

static bool data_exceeds_limitation(
uint32_t size_to_add,
uint32_t limitation,
uint32_t total_sent,
uint32_t pending_to_send)
{
return
// Limitation has been set and
(0 < limitation) &&
// either limitation has already been reached
((limitation <= (total_sent + pending_to_send)) ||
// or adding size_to_add will exceed limitation
(size_to_add > (limitation - (total_sent + pending_to_send))));
}

static bool append_message(
CDRMessage_t* full_msg,
CDRMessage_t* submsg)
Expand Down Expand Up @@ -422,8 +437,8 @@ bool RTPSMessageGroup::add_data(
logInfo(RTPS_WRITER, "Sending relevant changes as DATA/DATA_FRAG messages");

// Check limitation
if (0 < sent_bytes_limitation_ &&
(change.serializedPayload.length > (sent_bytes_limitation_ - (current_sent_bytes_ + full_msg_->length))))
uint32_t data_size = change.serializedPayload.length;
if (data_exceeds_limitation(data_size, sent_bytes_limitation_, current_sent_bytes_, full_msg_->length))
{
flush_and_reset();
throw limit_exceeded();
Expand Down Expand Up @@ -525,8 +540,7 @@ bool RTPSMessageGroup::add_data_frag(
uint32_t fragment_size = fragment_number < change.getFragmentCount() ? change.getFragmentSize() :
change.serializedPayload.length - fragment_start;
// Check limitation
if (0 < sent_bytes_limitation_ &&
(fragment_size > (sent_bytes_limitation_ - (current_sent_bytes_ + full_msg_->length))))
if (data_exceeds_limitation(fragment_size, sent_bytes_limitation_, current_sent_bytes_, full_msg_->length))
{
flush_and_reset();
throw limit_exceeded();
Expand Down
90 changes: 56 additions & 34 deletions src/cpp/rtps/transport/test_UDPv4Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ uint32_t test_UDPv4Transport::test_UDPv4Transport_DropLogLength = 0;
bool test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = false;
bool test_UDPv4Transport::always_drop_participant_builtin_topic_data = false;
bool test_UDPv4Transport::simulate_no_interfaces = false;
test_UDPv4TransportDescriptor::DestinationLocatorFilter test_UDPv4Transport::locator_filter([](const Locator&)
{
return false;
});

test_UDPv4Transport::test_UDPv4Transport(
const test_UDPv4TransportDescriptor& descriptor)
Expand All @@ -56,6 +60,7 @@ test_UDPv4Transport::test_UDPv4Transport(
, percentage_of_messages_to_drop_(descriptor.percentageOfMessagesToDrop)
, messages_filter_(descriptor.messages_filter_)
, sequence_number_data_messages_to_drop_(descriptor.sequenceNumberDataMessagesToDrop)
, locator_filter_(descriptor.locator_filter_)
{
test_UDPv4Transport_DropLogLength = 0;
test_UDPv4Transport_ShutdownAllNetwork = false;
Expand All @@ -75,37 +80,41 @@ test_UDPv4TransportDescriptor::test_UDPv4TransportDescriptor()
, drop_data_messages_filter_([](CDRMessage_t&)
{
return false;
}),
dropParticipantBuiltinTopicData(false),
dropPublicationBuiltinTopicData(false),
dropSubscriptionBuiltinTopicData(false),
dropDataFragMessagesPercentage(0),
drop_data_frag_messages_filter_([](CDRMessage_t&)
})
, dropParticipantBuiltinTopicData(false)
, dropPublicationBuiltinTopicData(false)
, dropSubscriptionBuiltinTopicData(false)
, dropDataFragMessagesPercentage(0)
, drop_data_frag_messages_filter_([](CDRMessage_t&)
{
return false;
}),
dropHeartbeatMessagesPercentage(0),
drop_heartbeat_messages_filter_([](CDRMessage_t&)
})
, dropHeartbeatMessagesPercentage(0)
, drop_heartbeat_messages_filter_([](CDRMessage_t&)
{
return false;
}),
dropAckNackMessagesPercentage(0),
drop_ack_nack_messages_filter_([](CDRMessage_t&)
})
, dropAckNackMessagesPercentage(0)
, drop_ack_nack_messages_filter_([](CDRMessage_t&)
{
return false;
}),
dropGapMessagesPercentage(0),
drop_gap_messages_filter_([](CDRMessage_t&)
})
, dropGapMessagesPercentage(0)
, drop_gap_messages_filter_([](CDRMessage_t&)
{
return false;
}),
percentageOfMessagesToDrop(0),
messages_filter_([](CDRMessage_t&)
})
, percentageOfMessagesToDrop(0)
, messages_filter_([](CDRMessage_t&)
{
return false;
}),
sequenceNumberDataMessagesToDrop(),
dropLogLength(0)
})
, locator_filter_([](const Locator&)
{
return false;
})
, sequenceNumberDataMessagesToDrop()
, dropLogLength(0)
{
}

Expand Down Expand Up @@ -225,21 +234,23 @@ bool test_UDPv4Transport::send(
bool whitelisted,
const std::chrono::microseconds& timeout)
{
if (packet_should_drop(send_buffer, send_buffer_size) ||
// If there are no interfaces (simulate_no_interfaces), only multicast and localhost traffic is sent
(simulate_no_interfaces &&
!fastrtps::rtps::IPLocator::isMulticast(remote_locator) &&
!fastrtps::rtps::IPLocator::isLocal(remote_locator)))
bool is_multicast_remote_address = fastrtps::rtps::IPLocator::IPLocator::isMulticast(remote_locator);
if (is_multicast_remote_address == only_multicast_purpose || whitelisted)
{
statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size);
log_drop(send_buffer, send_buffer_size);
return true;
}
else
{
return UDPv4Transport::send(send_buffer, send_buffer_size, socket, remote_locator, only_multicast_purpose,
whitelisted, timeout);
if (packet_should_drop(send_buffer, send_buffer_size) || should_drop_locator(remote_locator))
{
statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size);
log_drop(send_buffer, send_buffer_size);
return true;
}
else
{
return UDPv4Transport::send(send_buffer, send_buffer_size, socket, remote_locator, only_multicast_purpose,
whitelisted, timeout);
}
}

return false;
}

static bool ReadSubmessageHeader(
Expand Down Expand Up @@ -276,6 +287,17 @@ static bool ReadSubmessageHeader(
return true;
}

bool test_UDPv4Transport::should_drop_locator(
const Locator& remote_locator)
{
return locator_filter(remote_locator) ||
locator_filter_(remote_locator) ||
// If there are no interfaces (simulate_no_interfaces), only multicast and localhost traffic is sent
(simulate_no_interfaces &&
!fastrtps::rtps::IPLocator::isMulticast(remote_locator) &&
!fastrtps::rtps::IPLocator::isLocal(remote_locator));
}

bool test_UDPv4Transport::packet_should_drop(
const octet* send_buffer,
uint32_t send_buffer_size)
Expand Down
7 changes: 5 additions & 2 deletions src/cpp/rtps/transport/test_UDPv4Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class test_UDPv4Transport : public UDPv4Transport
RTPS_DllAPI static bool always_drop_participant_builtin_topic_data;
RTPS_DllAPI static bool simulate_no_interfaces;

RTPS_DllAPI static test_UDPv4TransportDescriptor::DestinationLocatorFilter locator_filter;

protected:

virtual void get_ips(
Expand All @@ -81,8 +83,6 @@ class test_UDPv4Transport : public UDPv4Transport
uint8_t accumulator;
};

typedef std::function<bool (fastrtps::rtps::CDRMessage_t& msg)> filter;

PercentageData drop_data_messages_percentage_;
test_UDPv4TransportDescriptor::filter drop_data_messages_filter_;
bool drop_participant_builtin_topic_data_;
Expand All @@ -99,7 +99,10 @@ class test_UDPv4Transport : public UDPv4Transport
PercentageData percentage_of_messages_to_drop_;
test_UDPv4TransportDescriptor::filter messages_filter_;
std::vector<fastrtps::rtps::SequenceNumber_t> sequence_number_data_messages_to_drop_;
test_UDPv4TransportDescriptor::DestinationLocatorFilter locator_filter_;

bool should_drop_locator(
const Locator& remote_locator);

bool log_drop(
const fastrtps::rtps::octet* buffer,
Expand Down
58 changes: 32 additions & 26 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,11 +567,7 @@ bool StatefulWriter::change_removed_by_history(

void StatefulWriter::send_heartbeat_to_all_readers()
{
// This version is called from send_any_unsent_changes when any of the following conditions is satisfied:
// a) history is empty
// b) there are no matched readers

// It may also be called from send_periodic_heartbeat
// This method is only called from send_periodic_heartbeat

if (m_separateSendingEnabled)
{
Expand All @@ -595,6 +591,7 @@ void StatefulWriter::send_heartbeat_to_all_readers()
if (there_are_remote_readers_)
{
RTPSMessageGroup group(mp_RTPSParticipant, this, &locator_selector_general_);
select_all_readers_nts(group, locator_selector_general_);

// Send a GAP with holes in the history.
SequenceNumber_t first_seq = get_seq_num_min();
Expand Down Expand Up @@ -775,7 +772,17 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
}
}

if (SequenceNumber_t::unknown() != gap_seq_for_all) // Send GAP for all readers
bool should_send_global_gap = SequenceNumber_t::unknown() != gap_seq_for_all;

if (locator_selector.locator_selector.state_has_changed() &&
((should_be_sent && !m_separateSendingEnabled) || should_send_global_gap))
{
group.flush_and_reset();
network.select_locators(locator_selector.locator_selector);
compute_selected_guids(locator_selector);
}

if (should_send_global_gap) // Send GAP for all readers
{
group.add_gap(gap_seq_for_all, SequenceNumberSet_t(change->sequenceNumber));
}
Expand All @@ -790,9 +797,6 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
size_t num_locators = locator_selector.locator_selector.selected_size();
if (num_locators > 0)
{
network.select_locators(locator_selector.locator_selector);
compute_selected_guids(locator_selector);

if (0 < n_fragments)
{
if (min_unsent_fragment != n_fragments + 1)
Expand Down Expand Up @@ -866,7 +870,7 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
}
}

send_heartbeat_piggyback_nts_(nullptr, group, locator_selector, last_processed);
send_heartbeat_piggyback_nts_(group, locator_selector, last_processed);
}
}
else
Expand Down Expand Up @@ -966,10 +970,6 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(

}

locator_selector.locator_selector.reset(true);
network.select_locators(locator_selector.locator_selector);
compute_selected_guids(locator_selector);

if (need_reactivate_periodic_heartbeat)
{
periodic_hb_event_->restart_timer(max_blocking_time);
Expand Down Expand Up @@ -1003,6 +1003,19 @@ void StatefulWriter::update_reader_info(
there_are_datasharing_readers_ = !matched_datasharing_readers_.empty();
}

void StatefulWriter::select_all_readers_nts(
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector)
{
locator_selector.locator_selector.reset(true);
if (locator_selector.locator_selector.state_has_changed())
{
group.flush_and_reset();
mp_RTPSParticipant->network_factory().select_locators(locator_selector.locator_selector);
compute_selected_guids(locator_selector);
}
}

bool StatefulWriter::matched_reader_add(
const ReaderProxyData& rdata)
{
Expand Down Expand Up @@ -1666,6 +1679,7 @@ bool StatefulWriter::send_periodic_heartbeat(
}
else if (m_separateSendingEnabled)
{
// Send individual liveliness heartbeat to each reader
for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
[this, &liveliness, &unacked_changes](ReaderProxy* reader)
{
Expand Down Expand Up @@ -1791,26 +1805,16 @@ void StatefulWriter::send_heartbeat_nts_(
}

void StatefulWriter::send_heartbeat_piggyback_nts_(
ReaderProxy* reader,
RTPSMessageGroup& message_group,
LocatorSelectorSender& locator_selector,
uint32_t& last_bytes_processed)
{
if (!disable_heartbeat_piggyback_)
{
size_t number_of_readers = reader == nullptr ? locator_selector.all_remote_readers.size() : 1u;
if (mp_history->isFull() || next_all_acked_notify_sequence_ < get_seq_num_min())
{
if (reader == nullptr)
{
locator_selector.locator_selector.reset(true);
if (locator_selector.locator_selector.state_has_changed())
{
message_group.flush_and_reset();
getRTPSParticipant()->network_factory().select_locators(locator_selector.locator_selector);
compute_selected_guids(locator_selector);
}
}
select_all_readers_nts(message_group, locator_selector);
size_t number_of_readers = locator_selector.all_remote_readers.size();
send_heartbeat_nts_(number_of_readers, message_group, disable_positive_acks_);
}
else
Expand All @@ -1820,6 +1824,8 @@ void StatefulWriter::send_heartbeat_piggyback_nts_(
last_bytes_processed = current_bytes;
if (currentUsageSendBufferSize_ < 0)
{
select_all_readers_nts(message_group, locator_selector);
size_t number_of_readers = locator_selector.all_remote_readers.size();
send_heartbeat_nts_(number_of_readers, message_group, disable_positive_acks_);
}
}
Expand Down
Loading

0 comments on commit 23a788d

Please sign in to comment.