Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thingspeak: async client fixes #1806

Merged
merged 7 commits into from
Jul 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions code/espurna/config/prototypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ bool inline eraseSDKConfig();

#define ARRAYINIT(type, name, ...) type name[] = {__VA_ARGS__};

size_t strnlen(const char*, size_t);
char* strnstr(const char*, const char*, size_t);

// -----------------------------------------------------------------------------
// WebServer
// -----------------------------------------------------------------------------
Expand Down
210 changes: 145 additions & 65 deletions code/espurna/thinkspeak.ino
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,35 @@ Copyright (C) 2019 by Xose Pérez <xose dot perez at gmail dot com>

#if THINGSPEAK_USE_ASYNC
#include <ESPAsyncTCP.h>
AsyncClient * _tspk_client;
#else
#include <ESP8266WiFi.h>
#endif

#define THINGSPEAK_DATA_BUFFER_SIZE 256

const char THINGSPEAK_REQUEST_TEMPLATE[] PROGMEM =
"POST %s HTTP/1.1\r\n"
"Host: %s\r\n"
"User-Agent: ESPurna\r\n"
"Connection: close\r\n"
"Content-Type: application/x-www-form-urlencoded\r\n"
"Content-Length: %d\r\n\r\n"
"%s\r\n";
"Content-Length: %d\r\n\r\n";

bool _tspk_enabled = false;
bool _tspk_clear = false;

char * _tspk_queue[THINGSPEAK_FIELDS] = {NULL};
String _tspk_data;

bool _tspk_flush = false;
unsigned long _tspk_last_flush = 0;
unsigned char _tspk_tries = 0;
unsigned char _tspk_tries = THINGSPEAK_TRIES;

#if THINGSPEAK_USE_ASYNC
AsyncClient * _tspk_client;
bool _tspk_connecting = false;
bool _tspk_connected = false;
#endif

// -----------------------------------------------------------------------------

Expand Down Expand Up @@ -92,50 +99,106 @@ void _tspkConfigure() {
_tspk_enabled = false;
setSetting("tspkEnabled", 0);
}
if (_tspk_enabled && !_tspk_client) _tspkInitClient();
}

#if THINGSPEAK_USE_ASYNC

void _tspkPost(String data) {

if (_tspk_client == NULL) {
_tspk_client = new AsyncClient();
}

_tspk_client->onDisconnect([](void *s, AsyncClient *c) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Disconnected\n"));
_tspk_client->free();
delete _tspk_client;
_tspk_client = NULL;
}, 0);
enum class tspk_state_t : uint8_t {
NONE,
HEADERS,
BODY
};

_tspk_client->onTimeout([](void *s, AsyncClient *c, uint32_t time) {
_tspk_client->close(true);
}, 0);
tspk_state_t _tspk_client_state = tspk_state_t::NONE;
unsigned long _tspk_client_ts = 0;
constexpr const unsigned long THINGSPEAK_CLIENT_TIMEOUT = 5000;

_tspk_client->onData([](void * arg, AsyncClient * c, void * response, size_t len) {
void _tspkInitClient() {

char * b = (char *) response;
b[len] = 0;
char * p = strstr((char *)response, "\r\n\r\n");
unsigned int code = (p != NULL) ? atoi(&p[4]) : 0;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %d\n"), code);
_tspk_client = new AsyncClient();

_tspk_client->onDisconnect([](void * s, AsyncClient * client) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Disconnected\n"));
_tspk_data = "";
_tspk_client_ts = 0;
_tspk_last_flush = millis();
if ((0 == code) && (--_tspk_tries > 0)) {
_tspk_flush = true;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing\n"));
} else {
_tspkClearQueue();
_tspk_connected = false;
_tspk_connecting = false;
_tspk_client_state = tspk_state_t::NONE;
}, nullptr);

_tspk_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Network timeout after %ums\n"), time);
client->close(true);
}, nullptr);

_tspk_client->onPoll([](void * s, AsyncClient * client) {
uint32_t ts = millis() - _tspk_client_ts;
if (ts > THINGSPEAK_CLIENT_TIMEOUT) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] No response after %ums\n"), ts);
client->close(true);
}
}, nullptr);

_tspk_client->onData([](void * arg, AsyncClient * client, void * response, size_t len) {

char * p = nullptr;

do {

p = nullptr;

switch (_tspk_client_state) {
case tspk_state_t::NONE:
{
p = strnstr(reinterpret_cast<const char *>(response), "HTTP/1.1 200 OK", len);
if (!p) {
client->close(true);
return;
}
_tspk_client_state = tspk_state_t::HEADERS;
continue;
}
case tspk_state_t::HEADERS:
{
p = strnstr(reinterpret_cast<const char *>(response), "\r\n\r\n", len);
if (!p) return;
_tspk_client_state = tspk_state_t::BODY;
}
case tspk_state_t::BODY:
{
if (!p) {
p = strnstr(reinterpret_cast<const char *>(response), "\r\n\r\n", len);
if (!p) return;
}

unsigned int code = (p) ? atoi(&p[4]) : 0;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %u\n"), code);

if ((0 == code) && _tspk_tries) {
_tspk_flush = true;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing %u more time(s)\n"), _tspk_tries);
} else {
_tspkClearQueue();
}

client->close(true);

_tspk_client_state = tspk_state_t::NONE;
}
}

_tspk_client->close(true);
} while (_tspk_client_state != tspk_state_t::NONE);

}, nullptr);

}, NULL);
_tspk_client->onConnect([](void * arg, AsyncClient * client) {

_tspk_client->onConnect([data](void * arg, AsyncClient * client) {
_tspk_connected = true;
_tspk_connecting = false;

DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%d\n"), THINGSPEAK_HOST, THINGSPEAK_PORT);
DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%u\n"), THINGSPEAK_HOST, THINGSPEAK_PORT);

#if THINGSPEAK_USE_SSL
uint8_t fp[20] = {0};
Expand All @@ -146,27 +209,36 @@ void _tspkPost(String data) {
}
#endif

DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, data.c_str());

char buffer[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + data.length()];
snprintf_P(buffer, sizeof(buffer),
DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, _tspk_data.c_str());
char headers[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + 1];
snprintf_P(headers, sizeof(headers),
THINGSPEAK_REQUEST_TEMPLATE,
THINGSPEAK_URL,
THINGSPEAK_HOST,
data.length(),
data.c_str()
_tspk_data.length()
);

client->write(buffer);
client->write(headers);
client->write(_tspk_data.c_str());

}, nullptr);

}

void _tspkPost() {

}, NULL);
if (_tspk_connected || _tspk_connecting) return;

_tspk_client_ts = millis();

#if ASYNC_TCP_SSL_ENABLED
bool connected = _tspk_client->connect(THINGSPEAK_HOST, THINGSPEAK_PORT, THINGSPEAK_USE_SSL);
#else
bool connected = _tspk_client->connect(THINGSPEAK_HOST, THINGSPEAK_PORT);
#endif

_tspk_connecting = connected;

if (!connected) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Connection failed\n"));
_tspk_client->close(true);
Expand All @@ -176,7 +248,7 @@ void _tspkPost(String data) {

#else // THINGSPEAK_USE_ASYNC

void _tspkPost(String data) {
void _tspkPost() {

#if THINGSPEAK_USE_SSL
WiFiClientSecure _tspk_client;
Expand All @@ -186,35 +258,36 @@ void _tspkPost(String data) {

if (_tspk_client.connect(THINGSPEAK_HOST, THINGSPEAK_PORT)) {

DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%d\n"), THINGSPEAK_HOST, THINGSPEAK_PORT);
DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%u\n"), THINGSPEAK_HOST, THINGSPEAK_PORT);

if (!_tspk_client.verify(THINGSPEAK_FINGERPRINT, THINGSPEAK_HOST)) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Warning: certificate doesn't match\n"));
}

DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, data.c_str());
char buffer[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + data.length()];
snprintf_P(buffer, sizeof(buffer),
DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, _tspk_data.c_str());
char headers[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + 1];
snprintf_P(headers, sizeof(headers),
THINGSPEAK_REQUEST_TEMPLATE,
THINGSPEAK_URL,
THINGSPEAK_HOST,
data.length(),
data.c_str()
_tspk_data.length()
);
_tspk_client.print(buffer);

_tspk_client.print(headers);
_tspk_client.print(_tspk_data);

nice_delay(100);

String response = _tspk_client.readString();
int pos = response.indexOf("\r\n\r\n");
unsigned int code = (pos > 0) ? response.substring(pos + 4).toInt() : 0;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %d\n"), code);
DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %u\n"), code);
_tspk_client.stop();

_tspk_last_flush = millis();
if ((0 == code) && (--_tspk_tries > 0)) {
if ((0 == code) && _tspk_tries) {
_tspk_flush = true;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing\n"));
DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing %u more time(s)\n"), _tspk_tries);
} else {
_tspkClearQueue();
}
Expand All @@ -230,13 +303,14 @@ void _tspkPost(String data) {
#endif // THINGSPEAK_USE_ASYNC

void _tspkEnqueue(unsigned char index, char * payload) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Enqueuing field #%d with value %s\n"), index, payload);
DEBUG_MSG_P(PSTR("[THINGSPEAK] Enqueuing field #%u with value %s\n"), index, payload);
--index;
if (_tspk_queue[index] != NULL) free(_tspk_queue[index]);
_tspk_queue[index] = strdup(payload);
}

void _tspkClearQueue() {
_tspk_tries = THINGSPEAK_TRIES;
if (_tspk_clear) {
for (unsigned char id=0; id<THINGSPEAK_FIELDS; id++) {
if (_tspk_queue[id] != NULL) {
Expand All @@ -249,22 +323,30 @@ void _tspkClearQueue() {

void _tspkFlush() {

if (!_tspk_flush) return;
if (millis() - _tspk_last_flush < THINGSPEAK_MIN_INTERVAL) return;
if (_tspk_connected || _tspk_connecting) return;

_tspk_last_flush = millis();
_tspk_flush = false;
_tspk_data.reserve(THINGSPEAK_DATA_BUFFER_SIZE);

// Walk the fields
String data;
// Walk the fields, numbered 1...THINGSPEAK_FIELDS
for (unsigned char id=0; id<THINGSPEAK_FIELDS; id++) {
if (_tspk_queue[id] != NULL) {
if (data.length() > 0) data = data + String("&");
data = data + String("field") + String(id+1) + String("=") + String(_tspk_queue[id]);
if (_tspk_data.length() > 0) _tspk_data.concat("&");
char buf[32] = {0};
snprintf_P(buf, sizeof(buf), PSTR("field%u=%s"), (id + 1), _tspk_queue[id]);
_tspk_data.concat(buf);
}
}

// POST data if any
if (data.length() > 0) {
data = data + String("&api_key=") + getSetting("tspkKey");
_tspk_tries = THINGSPEAK_TRIES;
_tspkPost(data);
if (_tspk_data.length()) {
_tspk_data.concat("&api_key=");
_tspk_data.concat(getSetting("tspkKey"));
--_tspk_tries;
_tspkPost();
}

}
Expand Down Expand Up @@ -326,9 +408,7 @@ void tspkSetup() {
void tspkLoop() {
if (!_tspk_enabled) return;
if (!wifiConnected() || (WiFi.getMode() != WIFI_STA)) return;
if (_tspk_flush && (millis() - _tspk_last_flush > THINGSPEAK_MIN_INTERVAL)) {
_tspkFlush();
}
_tspkFlush();
}

#endif
16 changes: 16 additions & 0 deletions code/espurna/utils.ino
Original file line number Diff line number Diff line change
Expand Up @@ -612,3 +612,19 @@ bool isNumber(const char * s) {
}
return digit;
}

// ref: lwip2 lwip_strnstr with strnlen
char* strnstr(const char* buffer, const char* token, size_t n) {
size_t token_len = strnlen(token, n);
if (token_len == 0) {
return const_cast<char*>(buffer);
}

for (const char* p = buffer; *p && (p + token_len <= buffer + n); p++) {
if ((*p == *token) && (strncmp(p, token, token_len) == 0)) {
return const_cast<char*>(p);
}
}

return nullptr;
}