Skip to content

Commit

Permalink
Merge pull request #396 from philljj/fix_some_threading_issues
Browse files Browse the repository at this point in the history
Fix some Helgrind thread errors with enable-tls, and enable-curl.
  • Loading branch information
embhorn authored Feb 28, 2024
2 parents c923292 + 79c879f commit 2c62db3
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 24 deletions.
87 changes: 69 additions & 18 deletions examples/mqttnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,37 +391,66 @@ static int NetRead(void *context, byte* buf, int buf_len,
* MQTT_CODE_CONTINUE, or proceed with a smaller buffer read/write.
* Used for testing nonblocking. */
static int
mqttcurl_test_nonblock(int* buf_len, int for_recv)
mqttcurl_test_nonblock_read(int* buf_len)
{
static int testNbAlt = 0;
static int testSmallerBuf = 0;
#if !defined(WOLFMQTT_DEBUG_SOCKET)
(void)for_recv;
#endif
static int testNbReadAlt = 0;
static int testSmallerRead = 0;

if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbReadAlt++;
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("mqttcurl_test_nonblock_read: returning early with CONTINUE");
#endif
return MQTT_CODE_CONTINUE;
}

testNbReadAlt = 0;

if (!testSmallerRead) {
if (*buf_len > 2) {
*buf_len /= 2;
testSmallerRead = 1;
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("mqttcurl_test_nonblock_read: testing small buff: %d",
*buf_len);
#endif
}
}
else {
testSmallerRead = 0;
}

return MQTT_CODE_SUCCESS;
}

if (testNbAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbAlt++;
static int
mqttcurl_test_nonblock_write(int* buf_len)
{
static int testNbWriteAlt = 0;
static int testSmallerWrite = 0;

if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbWriteAlt++;
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("mqttcurl_test_nonblock(%d): returning early with CONTINUE",
for_recv);
PRINTF("mqttcurl_test_nonblock_write: returning early with CONTINUE");
#endif
return MQTT_CODE_CONTINUE;
}

testNbAlt = 0;
testNbWriteAlt = 0;

if (!testSmallerBuf) {
if (!testSmallerWrite) {
if (*buf_len > 2) {
*buf_len /= 2;
testSmallerBuf = 1;
testSmallerWrite = 1;
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("mqttcurl_test_nonblock(%d): testing small buff: %d",
for_recv, *buf_len);
PRINTF("mqttcurl_test_nonblock_write: testing small buff: %d",
*buf_len);
#endif
}
}
else {
testSmallerBuf = 0;
testSmallerWrite = 0;
}

return MQTT_CODE_SUCCESS;
Expand Down Expand Up @@ -745,7 +774,7 @@ static int NetWrite(void *context, const byte* buf, int buf_len,

#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
if (sock->mqttCtx->useNonBlockMode) {
if (mqttcurl_test_nonblock(&buf_len, 0)) {
if (mqttcurl_test_nonblock_write(&buf_len)) {
return MQTT_CODE_CONTINUE;
}
}
Expand Down Expand Up @@ -773,8 +802,19 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
* payload will be transferred in a single shot without buffering.
* todo: add buffering? */
for (size_t i = 0; i < MQTT_CURL_NUM_RETRY; ++i) {
#ifdef WOLFMQTT_MULTITHREAD
int rc = wm_SemLock(&sock->mqttCtx->client.lockCURL);
if (rc != 0) {
return rc;
}
#endif

res = curl_easy_send(sock->curl, buf, buf_len, &sent);

#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&sock->mqttCtx->client.lockCURL);
#endif

if (res == CURLE_OK) {
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("info: curl_easy_send(%d) returned: %d, %s", buf_len, res,
Expand Down Expand Up @@ -828,7 +868,7 @@ static int NetRead(void *context, byte* buf, int buf_len,

#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
if (sock->mqttCtx->useNonBlockMode) {
if (mqttcurl_test_nonblock(&buf_len, 1)) {
if (mqttcurl_test_nonblock_read(&buf_len)) {
return MQTT_CODE_CONTINUE;
}
}
Expand Down Expand Up @@ -856,8 +896,19 @@ static int NetRead(void *context, byte* buf, int buf_len,
* payload will be transferred in a single shot without buffering.
* todo: add buffering? */
for (size_t i = 0; i < MQTT_CURL_NUM_RETRY; ++i) {
#ifdef WOLFMQTT_MULTITHREAD
int rc = wm_SemLock(&sock->mqttCtx->client.lockCURL);
if (rc != 0) {
return rc;
}
#endif

res = curl_easy_recv(sock->curl, buf, buf_len, &recvd);

#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&sock->mqttCtx->client.lockCURL);
#endif

if (res == CURLE_OK) {
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("info: curl_easy_recv(%d) returned: %d, %s", buf_len, res,
Expand Down
8 changes: 8 additions & 0 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,11 @@ int MqttClient_Init(MqttClient *client, MqttNet* net,
if (rc == 0) {
rc = wm_SemInit(&client->lockClient);
}
#ifdef ENABLE_MQTT_CURL
if (rc == 0) {
rc = wm_SemInit(&client->lockCURL);
}
#endif
#endif

if (rc == 0) {
Expand All @@ -1573,6 +1578,9 @@ void MqttClient_DeInit(MqttClient *client)
(void)wm_SemFree(&client->lockSend);
(void)wm_SemFree(&client->lockRecv);
(void)wm_SemFree(&client->lockClient);
#ifdef ENABLE_MQTT_CURL
(void)wm_SemFree(&client->lockCURL);
#endif
#endif
}
#ifdef WOLFMQTT_V5
Expand Down
13 changes: 8 additions & 5 deletions src/mqtt_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ int MqttSocket_TlsSocketReceive(WOLFSSL* ssl, char *buf, int sz,
(void)ssl; /* Not used */

rc = client->net->read(client->net->context, (byte*)buf, sz,
client->tls.timeout_ms);
client->tls.timeout_ms_read);

/* save network read response */
client->tls.sockRcRead = rc;
Expand All @@ -87,7 +87,7 @@ int MqttSocket_TlsSocketSend(WOLFSSL* ssl, char *buf, int sz,
(void)ssl; /* Not used */

rc = client->net->write(client->net->context, (byte*)buf, sz,
client->tls.timeout_ms);
client->tls.timeout_ms_write);

/* save network write response */
client->tls.sockRcWrite = rc;
Expand Down Expand Up @@ -116,7 +116,8 @@ int MqttSocket_Init(MqttClient *client, MqttNet *net)
#if defined(ENABLE_MQTT_TLS) && !defined(ENABLE_MQTT_CURL)
client->tls.ctx = NULL;
client->tls.ssl = NULL;
client->tls.timeout_ms = client->cmd_timeout_ms;
client->tls.timeout_ms_read = client->cmd_timeout_ms;
client->tls.timeout_ms_write = client->cmd_timeout_ms;
#endif

/* Validate callbacks are not null! */
Expand All @@ -134,8 +135,9 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len,

#if defined(ENABLE_MQTT_TLS) && !defined(ENABLE_MQTT_CURL)
if (MqttClient_Flags(client,0,0) & MQTT_CLIENT_FLAG_IS_TLS) {
client->tls.timeout_ms = timeout_ms;
client->tls.timeout_ms_write = timeout_ms;
client->tls.sockRcWrite = 0; /* init value */

rc = wolfSSL_write(client->tls.ssl, (char*)buf, buf_len);
if (rc < 0) {
#if defined(WOLFMQTT_DEBUG_SOCKET) || defined(WOLFSSL_ASYNC_CRYPT)
Expand Down Expand Up @@ -236,8 +238,9 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len,

#if defined(ENABLE_MQTT_TLS) && !defined(ENABLE_MQTT_CURL)
if (MqttClient_Flags(client,0,0) & MQTT_CLIENT_FLAG_IS_TLS) {
client->tls.timeout_ms = timeout_ms;
client->tls.timeout_ms_read = timeout_ms;
client->tls.sockRcRead = 0; /* init value */

rc = wolfSSL_read(client->tls.ssl, (char*)buf, buf_len);
if (rc < 0) {
int error = wolfSSL_get_error(client->tls.ssl, 0);
Expand Down
3 changes: 3 additions & 0 deletions wolfmqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ typedef struct _MqttClient {
wm_Sem lockSend;
wm_Sem lockRecv;
wm_Sem lockClient;
#ifdef ENABLE_MQTT_CURL
wm_Sem lockCURL;
#endif
struct _MqttPendResp* firstPendResp; /* protected with client lock */
struct _MqttPendResp* lastPendResp; /* protected with client lock */
#endif
Expand Down
3 changes: 2 additions & 1 deletion wolfmqtt/mqtt_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ typedef struct _MqttTls {
WOLFSSL *ssl;
int sockRcRead;
int sockRcWrite;
int timeout_ms;
int timeout_ms_read;
int timeout_ms_write;
} MqttTls;
#endif

Expand Down

0 comments on commit 2c62db3

Please sign in to comment.