From 35d29c3cf2b603d0f8a1e71e3197dc16e2a5649b Mon Sep 17 00:00:00 2001 From: Facundo Garcia Date: Mon, 25 Nov 2024 13:45:19 +0100 Subject: [PATCH] Fixed TCP/UART message handling --- .../fixposition_driver.hpp | 4 +- .../src/fixposition_driver.cpp | 124 +++++++++++------- 2 files changed, 76 insertions(+), 52 deletions(-) diff --git a/fixposition_driver_lib/include/fixposition_driver_lib/fixposition_driver.hpp b/fixposition_driver_lib/include/fixposition_driver_lib/fixposition_driver.hpp index 35984dc..9d1bc80 100644 --- a/fixposition_driver_lib/include/fixposition_driver_lib/fixposition_driver.hpp +++ b/fixposition_driver_lib/include/fixposition_driver_lib/fixposition_driver.hpp @@ -149,8 +149,8 @@ class FixpositionDriver { int client_fd_ = -1; //!< TCP or Serial file descriptor int connection_status_ = -1; struct termios options_save_; - //uint8_t readBuf[8192]; - //int buf_size = 0; + uint8_t readBuf[8192]; + int buf_size = 0; }; } // namespace fixposition #endif //__FIXPOSITION_DRIVER_LIB_FIXPOSITION_DRIVER__ diff --git a/fixposition_driver_lib/src/fixposition_driver.cpp b/fixposition_driver_lib/src/fixposition_driver.cpp index 92da33d..e41bf00 100644 --- a/fixposition_driver_lib/src/fixposition_driver.cpp +++ b/fixposition_driver_lib/src/fixposition_driver.cpp @@ -257,66 +257,90 @@ bool FixpositionDriver::RunOnce() { } bool FixpositionDriver::ReadAndPublish() { - char readBuf[8192]; - - ssize_t rv; - if (params_.fp_output.type == INPUT_TYPE::TCP) { - rv = recv(client_fd_, (void*)&readBuf, sizeof(readBuf), MSG_DONTWAIT); - } else if (params_.fp_output.type == INPUT_TYPE::SERIAL) { - rv = read(client_fd_, (void*)&readBuf, sizeof(readBuf)); - } else { - rv = 0; - } - - if (rv == 0) { - std::cerr << "Connection closed.\n"; - return false; - } - - if (rv < 0 && errno == EAGAIN) { - /* no data for now, call back when the socket is readable */ - return true; - } - if (rv < 0) { - std::cerr << "Connection error.\n"; - return false; - } - - ssize_t start_id = 0; - while (start_id < rv) { - int msg_size = 0; - // Nov B - msg_size = IsNovMessage((uint8_t*)&readBuf[start_id], rv - start_id); + int msg_size = 0; + + // Extract messages until the buffer is empty + while (buf_size > 0) { + // Check whether the message is NOV_B + msg_size = IsNovMessage(readBuf, buf_size); + + // a) Message is NOV_B --> Process message and remove it from the buffer if (msg_size > 0) { - NovConvertAndPublish((uint8_t*)&readBuf[start_id]); - start_id += msg_size; - continue; + NovConvertAndPublish(readBuf); + buf_size -= msg_size; + if (buf_size > 0) { + memmove(readBuf, &readBuf[msg_size], buf_size); + } + // Look for new messages in the buffer } - if (msg_size == 0) { - // do nothing + + // b) Message is not NOV_B. Check whether it is NMEA/FP_A + else if (msg_size == 0) { + msg_size = IsNmeaMessage((char*)readBuf, buf_size); + + // Message is NMEA/FP_A --> Process message and remove it from the buffer + if (msg_size > 0) { + NmeaConvertAndPublish({(const char*)readBuf, (const char*)readBuf + msg_size}); + buf_size -= msg_size; + if (buf_size > 0) { + memmove(readBuf, &readBuf[msg_size], buf_size); + } + // Look for new messages in the buffer + } + + // Message is neither NMEA/FP_A or NOV_B --> Remove one byte from the buffer and try again until it is empty + else if (msg_size == 0) { + if (buf_size > 0) { + buf_size -= 1; + memmove(readBuf, &readBuf[1], buf_size); + } + + // Buffer is empty --> Wait for more data + else /* buf_size == 0 */ { + break; + } + } + + // NMEA message might be incomplete --> Wait for more data + else /* msg_size < 0 */ { + break; + } } - if (msg_size < 0) { + + // c) NOV_B message might be incomplete --> Wait for more data + else /* msg_size < 0 */ { break; } + } - // Nmea (incl. FP_A) - msg_size = IsNmeaMessage(&readBuf[start_id], rv - start_id); - if (msg_size > 0) { - // NovConvertAndPublish(start, msg_size); - std::string msg(&readBuf[start_id], msg_size); - NmeaConvertAndPublish(msg); - start_id += msg_size; - continue; + // Read more data from the TCP/Serial port + int rem_size = sizeof(readBuf) - buf_size; + if (rem_size > 0) { + ssize_t rv; + if (params_.fp_output.type == INPUT_TYPE::TCP) { + rv = recv(client_fd_, (void*)&readBuf[buf_size], sizeof(readBuf) - buf_size, MSG_DONTWAIT); + } else if (params_.fp_output.type == INPUT_TYPE::SERIAL) { + rv = read(client_fd_, (void*)&readBuf[buf_size], sizeof(readBuf) - buf_size); + } else { + rv = 0; } - if (msg_size == 0) { - // do nothing + + if (rv == 0) { + std::cerr << "Connection closed.\n"; + return false; } - if (msg_size < 0) { - break; + + if (rv < 0 && errno == EAGAIN) { + /* no data for now, call back when the socket is readable */ + return true; + } + + if (rv < 0) { + std::cerr << "Connection error.\n"; + return false; } - // No Match, increment by 1 - ++start_id; + buf_size += rv; } return true;