From ab853be9a37e3fc24d18134a88ee234b56a4a7e2 Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Tue, 30 Apr 2024 14:21:18 +1200 Subject: [PATCH] log_streaming: make it robust to message drops When a message is dropped, we need to discard any message part. Otherwise, we end up having corrrupt ulog files. Signed-off-by: Julian Oes --- examples/log_streaming/log_streaming.cpp | 15 +- .../log_streaming/log_streaming_impl.cpp | 140 +++++++++++------- .../log_streaming/log_streaming_impl.h | 8 +- 3 files changed, 108 insertions(+), 55 deletions(-) diff --git a/examples/log_streaming/log_streaming.cpp b/examples/log_streaming/log_streaming.cpp index 34d99cfcf..e50aae6c9 100644 --- a/examples/log_streaming/log_streaming.cpp +++ b/examples/log_streaming/log_streaming.cpp @@ -19,18 +19,19 @@ using std::this_thread::sleep_for; void usage(const std::string& bin_name) { - std::cerr << "Usage : " << bin_name << " [--rm]\n" + std::cerr << "Usage : " << bin_name << " [--drop]\n" << '\n' << "Connection URL format should be :\n" << " For TCP : tcp://[server_host][:server_port]\n" << " For UDP : udp://[bind_host][:bind_port]\n" << " For Serial : serial:///path/to/serial/dev[:baudrate]\n" - << "For example, to connect to the simulator use URL: udp://:14540" << std::endl; + << "For example, to connect to the simulator use URL: udp://:14540" << '\n' + << "--drop To drop some of the messages" << std::endl; } int main(int argc, char** argv) { - if (argc > 2) { + if (argc < 2) { usage(argv[0]); return 1; } @@ -49,6 +50,14 @@ int main(int argc, char** argv) return 1; } + // To simulate message drops. + if (argc == 3 && std::string(argv[2]) == "--drop") { + std::cout << "Dropping some messages" << std::endl; + unsigned counter = 0; + mavsdk.intercept_incoming_messages_async( + [&](const mavlink_message_t&) { return counter++ % 10 != 0; }); + } + // Create file to log to. // Get current time auto now = std::chrono::system_clock::now(); diff --git a/src/mavsdk/plugins/log_streaming/log_streaming_impl.cpp b/src/mavsdk/plugins/log_streaming/log_streaming_impl.cpp index 53b2ced3a..c6fec4771 100644 --- a/src/mavsdk/plugins/log_streaming/log_streaming_impl.cpp +++ b/src/mavsdk/plugins/log_streaming/log_streaming_impl.cpp @@ -178,28 +178,45 @@ void LogStreamingImpl::process_logging_data(const mavlink_message_t& message) } std::lock_guard lock(_mutex); - check_sequence(logging_data.sequence); - - if (logging_data.first_message_offset == std::numeric_limits::max()) { - _ulog_data.insert( - _ulog_data.end(), logging_data.data, logging_data.data + logging_data.length); - } else { - if (logging_data.first_message_offset > sizeof(logging_data.data)) { - LogWarn() << "Invalid first_message_offset"; - return; - } - - _ulog_data.insert( - _ulog_data.end(), - logging_data.data, - logging_data.data + logging_data.first_message_offset); - process_message(); - _ulog_data.clear(); - - _ulog_data.insert( - _ulog_data.end(), - logging_data.data + logging_data.first_message_offset, - logging_data.data + logging_data.length); + auto drop_state = check_sequence(logging_data.sequence); + + switch (drop_state) { + case DropState::Ok: + if (logging_data.first_message_offset == std::numeric_limits::max()) { + _ulog_data.insert( + _ulog_data.end(), logging_data.data, logging_data.data + logging_data.length); + + } else { + if (logging_data.first_message_offset > sizeof(logging_data.data)) { + LogWarn() << "Invalid first_message_offset"; + return; + } + + _ulog_data.insert( + _ulog_data.end(), + logging_data.data, + logging_data.data + logging_data.first_message_offset); + process_message(); + + _ulog_data.clear(); + _ulog_data.insert( + _ulog_data.end(), + logging_data.data + logging_data.first_message_offset, + logging_data.data + logging_data.length); + } + break; + + case DropState::Dropped: + _ulog_data.clear(); + _ulog_data.insert( + _ulog_data.end(), + logging_data.data + logging_data.first_message_offset, + logging_data.data + logging_data.length); + break; + + case DropState::Duplicate: + // Ignore. + break; } } @@ -254,46 +271,62 @@ void LogStreamingImpl::process_logging_data_acked(const mavlink_message_t& messa } std::lock_guard lock(_mutex); - check_sequence(logging_data_acked.sequence); - - if (logging_data_acked.first_message_offset == std::numeric_limits::max()) { - _ulog_data.insert( - _ulog_data.end(), - logging_data_acked.data, - logging_data_acked.data + logging_data_acked.length); - } else { - if (logging_data_acked.first_message_offset > sizeof(logging_data_acked.data)) { - LogWarn() << "Invalid first_message_offset"; - return; - } - - _ulog_data.insert( - _ulog_data.end(), - logging_data_acked.data, - logging_data_acked.data + logging_data_acked.first_message_offset); - process_message(); - _ulog_data.clear(); - - _ulog_data.insert( - _ulog_data.end(), - logging_data_acked.data + logging_data_acked.first_message_offset, - logging_data_acked.data + logging_data_acked.length); + auto drop_state = check_sequence(logging_data_acked.sequence); + + switch (drop_state) { + case DropState::Ok: + if (logging_data_acked.first_message_offset == std::numeric_limits::max()) { + _ulog_data.insert( + _ulog_data.end(), + logging_data_acked.data, + logging_data_acked.data + logging_data_acked.length); + } else { + if (logging_data_acked.first_message_offset > sizeof(logging_data_acked.data)) { + LogWarn() << "Invalid first_message_offset"; + return; + } + + _ulog_data.insert( + _ulog_data.end(), + logging_data_acked.data, + logging_data_acked.data + logging_data_acked.first_message_offset); + process_message(); + _ulog_data.clear(); + + _ulog_data.insert( + _ulog_data.end(), + logging_data_acked.data + logging_data_acked.first_message_offset, + logging_data_acked.data + logging_data_acked.length); + } + break; + + case DropState::Dropped: + _ulog_data.clear(); + _ulog_data.insert( + _ulog_data.end(), + logging_data_acked.data + logging_data_acked.first_message_offset, + logging_data_acked.data + logging_data_acked.length); + break; + + case DropState::Duplicate: + // Ignore. + break; } } -void LogStreamingImpl::check_sequence(uint16_t sequence) +LogStreamingImpl::DropState LogStreamingImpl::check_sequence(uint16_t sequence) { // Assume we have lock. if (!_maybe_current_sequence) { // This is the first time we use the sequence. _maybe_current_sequence = sequence; - return; + return DropState::Ok; } if (_maybe_current_sequence.value() == sequence) { // Duplicate - return; + return DropState::Duplicate; } if (sequence > _maybe_current_sequence.value()) { @@ -303,6 +336,10 @@ void LogStreamingImpl::check_sequence(uint16_t sequence) if (drop > 0 && _debugging) { LogDebug() << "Dropped: " << drop << " (no wrap around), overall: " << _drops; } + + _maybe_current_sequence = sequence; + return drop > 0 ? DropState::Dropped : DropState::Ok; + } else { // Wrap around! uint16_t drop = @@ -311,9 +348,10 @@ void LogStreamingImpl::check_sequence(uint16_t sequence) if (drop > 0 && _debugging) { LogDebug() << "Dropped: " << drop << " (with wrap around), overall: " << _drops; } - } - _maybe_current_sequence = sequence; + _maybe_current_sequence = sequence; + return drop > 0 ? DropState::Dropped : DropState::Ok; + } } void LogStreamingImpl::process_message() diff --git a/src/mavsdk/plugins/log_streaming/log_streaming_impl.h b/src/mavsdk/plugins/log_streaming/log_streaming_impl.h index 35f80ba05..12eab6901 100644 --- a/src/mavsdk/plugins/log_streaming/log_streaming_impl.h +++ b/src/mavsdk/plugins/log_streaming/log_streaming_impl.h @@ -37,9 +37,15 @@ class LogStreamingImpl : public PluginImplBase { void unsubscribe_log_streaming_raw(LogStreaming::LogStreamingRawHandle handle); private: + enum class DropState { + Ok, + Dropped, + Duplicate, + }; + void process_logging_data(const mavlink_message_t& message); void process_logging_data_acked(const mavlink_message_t& message); - void check_sequence(uint16_t sequence); + DropState check_sequence(uint16_t sequence); void process_message(); static LogStreaming::Result