Skip to content

Commit

Permalink
#530 Added TCP_NODELAY option.
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagliughi committed Nov 5, 2023
1 parent ac5ce59 commit a1f8a7e
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 33 deletions.
1 change: 1 addition & 0 deletions src/Clients.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ typedef struct
MQTTClient_SSLOptions *sslopts; /**< the SSL/TLS connect options */
SSL_SESSION* session; /**< SSL session pointer for fast handhake */
#endif
int nodelay; /**< TCP_NODELAY socket option */
} Clients;

int clientIDCompare(void* a, void* b);
Expand Down
6 changes: 5 additions & 1 deletion src/MQTTAsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
goto exit;
}

if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 9)
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
Expand Down Expand Up @@ -693,6 +693,10 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
if (options->httpsProxy)
m->c->httpsProxy = MQTTStrdup(options->httpsProxy);
}
if (options->struct_version >= 9)
m->c->nodelay = options->nodelay;
else
m->c->nodelay = 0;

if (m->c->will)
{
Expand Down
29 changes: 19 additions & 10 deletions src/MQTTAsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -1199,15 +1199,18 @@ typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0, 1, 2, 3 4 5 6, 7 or 8.
/** The version number of this structure. Must be 0, 1, 2, 3 4 5 6, 7,
* 8, or 9.
*
* 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs
* 2 signifies no MQTTVersion
* 3 signifies no automatic reconnect options
* 4 signifies no binary password option (just string)
* 5 signifies no MQTTV5 properties
* 6 signifies no HTTP headers option
* 7 signifies no HTTP proxy and HTTPS proxy options
* 7 signifies no HTTP proxy and HTTPS proxy options
* 8 signifies no TCP_NODELAY socket option
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
Expand Down Expand Up @@ -1378,27 +1381,33 @@ typedef struct
* HTTPS proxy
*/
const char* httpsProxy;
/**
* Set the TCP_NODELAY option on the client socket. This could resuce
* the latency on small messages as the cost of increased network
* traffic.
*/
int nodelay;
} MQTTAsync_connectOptions;

/** Initializer for connect options for MQTT 3.1.1 non-WebSocket connections */
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 9, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 5.0 non-WebSocket connections */
#define MQTTAsync_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 8, 60, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
#define MQTTAsync_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 9, 60, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 3.1.1 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTAsync_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
#define MQTTAsync_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 5.0 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTAsync_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
#define MQTTAsync_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0}


/**
Expand Down
5 changes: 4 additions & 1 deletion src/MQTTClient.c
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,10 @@ static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_c
int sessionPresent = 0;
MQTTResponse resp = MQTTResponse_initializer;


FUNC_ENTRY;
m->c->nodelay =options->nodelay;

resp.reasonCode = SOCKET_ERROR;
if (m->ma && !running)
{
Expand Down Expand Up @@ -1759,7 +1762,7 @@ MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions*
goto exit;
}

if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 9)
{
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
Expand Down
27 changes: 18 additions & 9 deletions src/MQTTClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,9 @@ typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0, 1, 2, 3, 4, 5, 6, 7 or 8.
/** The version number of this structure. Must be 0, 1, 2, 3, 4, 5, 6,
* 7, 8, or 9.
*
* 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs
* 2 signifies no MQTTVersion
Expand All @@ -834,6 +836,7 @@ typedef struct
* 5 signifies no maxInflightMessages and cleanstart
* 6 signifies no HTTP headers option
* 7 signifies no HTTP proxy and HTTPS proxy options
* 8 signifies no NO_DELAY
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
Expand Down Expand Up @@ -976,27 +979,33 @@ typedef struct
* HTTPS proxy
*/
const char* httpsProxy;
/**
* Set the TCP_NODELAY option on the client socket. This could resuce
* the latency on small messages as the cost of increased network
* traffic.
*/
int nodelay;
} MQTTClient_connectOptions;

/** Initializer for connect options for MQTT 3.1.1 non-WebSocket connections */
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL}
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 9, 60, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 5.0 non-WebSocket connections */
#define MQTTClient_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 8, 60, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL}
#define MQTTClient_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 9, 60, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 3.1.1 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTClient_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL}
#define MQTTClient_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL, 0}

/** Initializer for connect options for MQTT 5.0 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTClient_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL}
#define MQTTClient_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 9, 45, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL, 0}

/**
* This function attempts to connect a previously-created client (see
Expand Down
12 changes: 6 additions & 6 deletions src/MQTTProtocolOut.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket
if (timeout < 0)
rc = -1;
else
rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket), timeout);
rc = Socket_new(aClient->net.http_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket), timeout);
#else
rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket));
rc = Socket_new(aClient->net.http_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket));
#endif
}
#if defined(OPENSSL)
Expand All @@ -277,9 +277,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket
if (timeout < 0)
rc = -1;
else
rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket), timeout);
rc = Socket_new(aClient->net.https_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket), timeout);
#else
rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket));
rc = Socket_new(aClient->net.https_proxy, addr_len, port, aClient->nodelay, &(aClient->net.socket));
#endif
}
#endif
Expand All @@ -295,9 +295,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket
if (timeout < 0)
rc = -1;
else
rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket), timeout);
rc = Socket_new(ip_address, addr_len, port, aClient->nodelay, &(aClient->net.socket), timeout);
#else
rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket));
rc = Socket_new(ip_address, addr_len, port, aClient->nodelay, &(aClient->net.socket));
#endif
}
if (rc == EINPROGRESS || rc == EWOULDBLOCK)
Expand Down
18 changes: 16 additions & 2 deletions src/Socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@

#include "Heap.h"

#if defined(_WIN32) || defined(_WIN64)
#define SOL_TCP IPPROTO_TCP
#elif !defined(SOL_TCP) && defined(IPPROTO_TCP)
#define SOL_TCP IPPROTO_TCP
#endif


#if defined(USE_SELECT)
int isReady(int socket, fd_set* read_set, fd_set* write_set);
int Socket_continueWrites(fd_set* pwset, SOCKET* socket, mutex_type mutex);
Expand Down Expand Up @@ -1049,9 +1056,9 @@ int Socket_close(SOCKET socket)
* @return completion code 0=good, SOCKET_ERROR=fail
*/
#if defined(__GNUC__) && defined(__linux__)
int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock, long timeout)
int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* sock, long timeout)
#else
int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock)
int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* sock)
#endif
{
int type = SOCK_STREAM;
Expand Down Expand Up @@ -1181,6 +1188,13 @@ int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock)
Log(LOG_ERROR, -1, "Could not set SO_SNDBUF for socket %d", *sock);
}
#endif
if (nodelay)
{
int opt = 1;
if (setsockopt(*sock, SOL_TCP, TCP_NODELAY, &opt, sizeof(opt)) != 0)
Log(LOG_ERROR, -1, "Could not set TCP_NODELAY for socket %d", *sock);

}
Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port);
if (Socket_addSocket(*sock) == SOCKET_ERROR)
rc = Socket_error("addSocket", *sock);
Expand Down
4 changes: 2 additions & 2 deletions src/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ int Socket_putdatas(SOCKET socket, char* buf0, size_t buf0len, PacketBuffers buf
int Socket_close(SOCKET socket);
#if defined(__GNUC__) && defined(__linux__)
/* able to use GNU's getaddrinfo_a to make timeouts possible */
int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* socket, long timeout);
int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* socket, long timeout);
#else
int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* socket);
int Socket_new(const char* addr, size_t addr_len, int port, int nodelay, SOCKET* socket);
#endif

int Socket_noPendingWrites(SOCKET socket);
Expand Down
77 changes: 76 additions & 1 deletion test/test1.c
Original file line number Diff line number Diff line change
Expand Up @@ -1168,10 +1168,85 @@ int test6a(struct Options options)
return failures;
}

/*********************************************************************
Test7: Socket options
*********************************************************************/

int test7(struct Options options)
{
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
int rc = 0;
char* test_topic = "C client test7";

fprintf(xml, "<testcase classname=\"test1\" name=\"socket options\"");
global_start_time = start_clock();
failures = 0;
MyLog(LOGA_INFO, "Starting test 7 - socket options");

rc = MQTTClient_create(&c, options.connection, "sockopt_test",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTCLIENT_SUCCESS)
{
MQTTClient_destroy(&c);
goto exit;
}

opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}

opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;

opts.nodelay = 1;

/* Test with the socket option(s) */
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTClient_connect(c, &opts);
assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
if (rc != MQTTCLIENT_SUCCESS)
goto exit;

rc = MQTTClient_disconnect(c, 0);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);

/* Try to reconnect */
rc = MQTTClient_connect(c, &opts);
assert("Connect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);

rc = MQTTClient_disconnect(c, 0);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);

MQTTClient_destroy(&c);

exit:
MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}


int main(int argc, char** argv)
{
int rc = 0;
int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test6a};
int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test6a, test7};
int i;

xml = fopen("TEST-test1.xml", "w");
Expand Down
Loading

0 comments on commit a1f8a7e

Please sign in to comment.