Skip to content

Commit

Permalink
core: speed up param retransmissions
Browse files Browse the repository at this point in the history
Instead of requesting the parameters which were missed in the initial
burst one by one, we can now request them in chunks, e.g. 10 at a time.

This speeds up the process quite a bit.
  • Loading branch information
julianoes committed Dec 12, 2024
1 parent 0ba6d84 commit 6e13099
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 56 deletions.
20 changes: 16 additions & 4 deletions src/mavsdk/core/mavlink_parameter_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ uint16_t MavlinkParameterCache::count(bool including_extended) const
void MavlinkParameterCache::clear()
{
_all_params.clear();
_last_missing_requested = {};
}

bool MavlinkParameterCache::exists(const std::string& param_id) const
Expand Down Expand Up @@ -151,20 +152,31 @@ uint16_t MavlinkParameterCache::missing_count(uint16_t count) const
return missing;
}

std::optional<uint16_t> MavlinkParameterCache::next_missing_index(uint16_t count)
std::vector<uint16_t>
MavlinkParameterCache::next_missing_indices(uint16_t count, uint16_t chunk_size)
{
// Extended doesn't matter here because we use this function in the sender
// which is always either all extended or not.
std::sort(_all_params.begin(), _all_params.end(), [](const auto& lhs, const auto& rhs) {
return lhs.index < rhs.index;
});

std::vector<uint16_t> result;

for (unsigned i = 0; i < count; ++i) {
if (!exists(i)) {
return {i};
if (exists(i)) {
continue;
}

result.push_back(i);
_last_missing_requested = {i};

if (result.size() == chunk_size) {
break;
}
}
return {};

return result;
}

void MavlinkParameterCache::print_missing(uint16_t count)
Expand Down
10 changes: 9 additions & 1 deletion src/mavsdk/core/mavlink_parameter_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <cstdint>
#include <limits>
#include <map>
#include <optional>
#include <string>
#include <vector>

Expand Down Expand Up @@ -45,10 +46,15 @@ class MavlinkParameterCache {

[[nodiscard]] uint16_t count(bool including_extended) const;

[[nodiscard]] std::optional<uint16_t> next_missing_index(uint16_t count);
[[nodiscard]] std::vector<uint16_t> next_missing_indices(uint16_t count, uint16_t chunk_size);

[[nodiscard]] uint16_t missing_count(uint16_t count) const;

[[nodiscard]] std::optional<uint16_t> last_missing_requested() const
{
return _last_missing_requested;
}

void print_missing(uint16_t count);

void clear();
Expand All @@ -58,6 +64,8 @@ class MavlinkParameterCache {
[[nodiscard]] bool exists(uint16_t param_index) const;

std::vector<Param> _all_params;

std::optional<uint16_t> _last_missing_requested{};
};

} // namespace mavsdk
38 changes: 29 additions & 9 deletions src/mavsdk/core/mavlink_parameter_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ TEST(MavlinkParameterCache, AddingAndUpdating)
MavlinkParameterCache::UpdateExistingParamResult::WrongType,
cache.update_existing_param(custom_value1.id, int_value1.value));
}

TEST(MavlinkParameterCache, MissingIndicesSorted)
{
MavlinkParameterCache cache;
Expand All @@ -78,18 +79,37 @@ TEST(MavlinkParameterCache, MissingIndicesSorted)
cache.add_new_param("PARAM1", value);

// No result when the count matches the contents.
EXPECT_EQ(cache.next_missing_index(2), std::nullopt);
EXPECT_EQ(cache.next_missing_indices(2, 10).size(), 0);

// The next entry when the count is bigger.
EXPECT_EQ(cache.next_missing_index(3), 2);
EXPECT_EQ(cache.next_missing_index(10), 2);
ASSERT_EQ(cache.next_missing_indices(3, 10).size(), 1);
EXPECT_EQ(cache.next_missing_indices(3, 10)[0], 2);
EXPECT_EQ(cache.next_missing_indices(10, 10)[0], 2);

cache.add_new_param("PARAM2", value, 5);
std::vector<uint16_t> result = {2, 3, 4};
EXPECT_EQ(cache.next_missing_indices(6, 10), result);
}

cache.add_new_param("PARAM1", value, 5);
EXPECT_EQ(cache.next_missing_index(6), 2);
TEST(MavlinkParameterCache, MissingInChunks)
{
MavlinkParameterCache cache;
ParamValue value;
// We use all the same value, we don't care about that part.
value.set_int(42);

// What about when we add something
cache.add_new_param("PARAM0", value);
cache.add_new_param("PARAM1", value);
cache.add_new_param("PARAM2", value);
EXPECT_EQ(cache.next_missing_index(6), 3);
cache.add_new_param("PARAM3", value);

// All remaining one are returned.
EXPECT_EQ(cache.next_missing_indices(10, 10).size(), 6);

// Only up to chunk size are returned
EXPECT_EQ(cache.next_missing_indices(10, 3).size(), 3);
}

TEST(MavlinkParameterCache, MissingIndicesNotSorted)
{
MavlinkParameterCache cache;
Expand All @@ -99,8 +119,8 @@ TEST(MavlinkParameterCache, MissingIndicesNotSorted)

cache.add_new_param("PARAM0", value, 1);
cache.add_new_param("PARAM1", value, 3);
cache.add_new_param("PARAM2", value, 0);

// It should still work when not sorted.
EXPECT_EQ(cache.next_missing_index(3), 2);
std::vector<uint16_t> result = {0, 2};
EXPECT_EQ(cache.next_missing_indices(3, 10), result);
}
83 changes: 41 additions & 42 deletions src/mavsdk/core/mavlink_parameter_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ void MavlinkParameterClient::process_param_value(const mavlink_message_t& messag
}

if (_parameter_debugging) {
LogDebug() << "process_param_value: " << safe_param_id << " " << received_value << ", index: " << param_value.param_index;
LogDebug() << "process_param_value: " << safe_param_id << " " << received_value
<< ", index: " << param_value.param_index;
}

if (param_value.param_index == std::numeric_limits<uint16_t>::max()) {
Expand Down Expand Up @@ -769,8 +770,7 @@ void MavlinkParameterClient::process_param_value(const mavlink_message_t& messag
}
},
[&](WorkItemGetAll& item) {

auto maybe_current_missing_index = _param_cache.next_missing_index(item.count);
auto maybe_current_missing_index = _param_cache.last_missing_requested();

switch (_param_cache.add_new_param(
safe_param_id, received_value, param_value.param_index)) {
Expand Down Expand Up @@ -813,36 +813,17 @@ void MavlinkParameterClient::process_param_value(const mavlink_message_t& messag
<< " of " << param_value.param_count;
}
if (item.rerequesting) {

if (maybe_current_missing_index.has_value() &&
maybe_current_missing_index.value() == param_value.param_index) {

// We got what we were waiting for, request next one.
auto maybe_next_missing_index =
_param_cache.next_missing_index(item.count);

if (!maybe_next_missing_index.has_value()) {
LogErr() << "logic error, there should a missing index";
assert(false);
}

LogInfo() << "Requesting " << _param_cache.missing_count() << " parameters missed during initial burst.";

if (_parameter_debugging) {
LogDebug() << "Requesting missing parameter "
<< (int)maybe_next_missing_index.value();
}

std::array<char, PARAM_ID_LEN> param_id_buff{};
if (!send_get_param_message(
param_id_buff, maybe_next_missing_index.value())) {
LogErr() << "Send message failed";
if (maybe_current_missing_index == param_value.param_index) {
// Looks like the last one of the previous retransmission chunk
// was done, start another one.
if (!request_next_missing(item.count)) {
work_queue_guard->pop_front();
if (item.callback) {
auto callback = item.callback;
work_queue_guard.reset();
callback(Result::ConnectionError, {});
}
return;
}
}
} else {
Expand Down Expand Up @@ -1167,25 +1148,16 @@ void MavlinkParameterClient::receive_timeout()
} else {
item.rerequesting = true;

if (_parameter_debugging) {
_param_cache.print_missing(item.count);
}

auto maybe_next_missing_index = _param_cache.next_missing_index(item.count);
if (!maybe_next_missing_index.has_value()) {
LogErr() << "logic error, there should a missing index";
assert(false);
}
LogInfo() << "Requesting " << _param_cache.missing_count(item.count) << " of "
<< item.count << " parameters missed during initial burst.";

if (_parameter_debugging) {
LogDebug() << "Requesting missing parameter "
<< (int)maybe_next_missing_index.value();
_param_cache.print_missing(item.count);
}

std::array<char, PARAM_ID_LEN> param_id_buff{};

if (!send_get_param_message(param_id_buff, maybe_next_missing_index.value())) {
LogErr() << "Send message failed";
// To speed retransmissions up, we request params in chunks, otherwise the
// latency back and forth makes this quite slow.
if (!request_next_missing(item.count)) {
work_queue_guard->pop_front();
if (item.callback) {
auto callback = item.callback;
Expand All @@ -1194,13 +1166,40 @@ void MavlinkParameterClient::receive_timeout()
}
return;
}

_timeout_cookie =
_timeout_handler.add([this] { receive_timeout(); }, _timeout_s_callback());
}
}},
work->work_item_variant);
}

bool MavlinkParameterClient::request_next_missing(uint16_t count)
{
// Requesting 10 at a time seems to work on SiK radios.
const uint16_t chunk_size = 10;

auto next_missing_indices = _param_cache.next_missing_indices(count, chunk_size);
if (next_missing_indices.empty()) {
LogErr() << "logic error, there should a missing index";
return false;
}

for (auto next_missing_index : next_missing_indices) {
if (_parameter_debugging) {
LogDebug() << "Requesting missing parameter " << (int)next_missing_index;
}

std::array<char, PARAM_ID_LEN> param_id_buff{};

if (!send_get_param_message(param_id_buff, next_missing_index)) {
LogErr() << "Send message failed";
return false;
}
}
return true;
}

std::ostream& operator<<(std::ostream& str, const MavlinkParameterClient::Result& result)
{
switch (result) {
Expand Down
2 changes: 2 additions & 0 deletions src/mavsdk/core/mavlink_parameter_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ class MavlinkParameterClient : public MavlinkParameterSubscription {
const std::array<char, PARAM_ID_LEN>& param_id_buff, int16_t param_index);
bool send_request_list_message();

bool request_next_missing(uint16_t count);

Sender& _sender;
MavlinkMessageHandler& _message_handler;
TimeoutHandler& _timeout_handler;
Expand Down

0 comments on commit 6e13099

Please sign in to comment.