Skip to content

Commit

Permalink
Fixed TCP/UART message handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Facundo Garcia committed Nov 25, 2024
1 parent 55facad commit 35d29c3
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ class FixpositionDriver {
int client_fd_ = -1; //!< TCP or Serial file descriptor
int connection_status_ = -1;
struct termios options_save_;
//uint8_t readBuf[8192];
//int buf_size = 0;
uint8_t readBuf[8192];
int buf_size = 0;
};
} // namespace fixposition
#endif //__FIXPOSITION_DRIVER_LIB_FIXPOSITION_DRIVER__
124 changes: 74 additions & 50 deletions fixposition_driver_lib/src/fixposition_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,66 +257,90 @@ bool FixpositionDriver::RunOnce() {
}

bool FixpositionDriver::ReadAndPublish() {
char readBuf[8192];

ssize_t rv;
if (params_.fp_output.type == INPUT_TYPE::TCP) {
rv = recv(client_fd_, (void*)&readBuf, sizeof(readBuf), MSG_DONTWAIT);
} else if (params_.fp_output.type == INPUT_TYPE::SERIAL) {
rv = read(client_fd_, (void*)&readBuf, sizeof(readBuf));
} else {
rv = 0;
}

if (rv == 0) {
std::cerr << "Connection closed.\n";
return false;
}

if (rv < 0 && errno == EAGAIN) {
/* no data for now, call back when the socket is readable */
return true;
}
if (rv < 0) {
std::cerr << "Connection error.\n";
return false;
}

ssize_t start_id = 0;
while (start_id < rv) {
int msg_size = 0;
// Nov B
msg_size = IsNovMessage((uint8_t*)&readBuf[start_id], rv - start_id);
int msg_size = 0;

// Extract messages until the buffer is empty
while (buf_size > 0) {
// Check whether the message is NOV_B
msg_size = IsNovMessage(readBuf, buf_size);

// a) Message is NOV_B --> Process message and remove it from the buffer
if (msg_size > 0) {
NovConvertAndPublish((uint8_t*)&readBuf[start_id]);
start_id += msg_size;
continue;
NovConvertAndPublish(readBuf);
buf_size -= msg_size;
if (buf_size > 0) {
memmove(readBuf, &readBuf[msg_size], buf_size);
}
// Look for new messages in the buffer
}
if (msg_size == 0) {
// do nothing

// b) Message is not NOV_B. Check whether it is NMEA/FP_A
else if (msg_size == 0) {
msg_size = IsNmeaMessage((char*)readBuf, buf_size);

// Message is NMEA/FP_A --> Process message and remove it from the buffer
if (msg_size > 0) {
NmeaConvertAndPublish({(const char*)readBuf, (const char*)readBuf + msg_size});
buf_size -= msg_size;
if (buf_size > 0) {
memmove(readBuf, &readBuf[msg_size], buf_size);
}
// Look for new messages in the buffer
}

// Message is neither NMEA/FP_A or NOV_B --> Remove one byte from the buffer and try again until it is empty
else if (msg_size == 0) {
if (buf_size > 0) {
buf_size -= 1;
memmove(readBuf, &readBuf[1], buf_size);
}

// Buffer is empty --> Wait for more data
else /* buf_size == 0 */ {
break;
}
}

// NMEA message might be incomplete --> Wait for more data
else /* msg_size < 0 */ {
break;
}
}
if (msg_size < 0) {

// c) NOV_B message might be incomplete --> Wait for more data
else /* msg_size < 0 */ {
break;
}
}

// Nmea (incl. FP_A)
msg_size = IsNmeaMessage(&readBuf[start_id], rv - start_id);
if (msg_size > 0) {
// NovConvertAndPublish(start, msg_size);
std::string msg(&readBuf[start_id], msg_size);
NmeaConvertAndPublish(msg);
start_id += msg_size;
continue;
// Read more data from the TCP/Serial port
int rem_size = sizeof(readBuf) - buf_size;
if (rem_size > 0) {
ssize_t rv;
if (params_.fp_output.type == INPUT_TYPE::TCP) {
rv = recv(client_fd_, (void*)&readBuf[buf_size], sizeof(readBuf) - buf_size, MSG_DONTWAIT);
} else if (params_.fp_output.type == INPUT_TYPE::SERIAL) {
rv = read(client_fd_, (void*)&readBuf[buf_size], sizeof(readBuf) - buf_size);
} else {
rv = 0;
}
if (msg_size == 0) {
// do nothing

if (rv == 0) {
std::cerr << "Connection closed.\n";
return false;
}
if (msg_size < 0) {
break;

if (rv < 0 && errno == EAGAIN) {
/* no data for now, call back when the socket is readable */
return true;
}

if (rv < 0) {
std::cerr << "Connection error.\n";
return false;
}

// No Match, increment by 1
++start_id;
buf_size += rv;
}

return true;
Expand Down

0 comments on commit 35d29c3

Please sign in to comment.