From 8e0a365ba58eb868ca2c239a1340c6bf49e545ee Mon Sep 17 00:00:00 2001 From: a7md0 Date: Thu, 1 Aug 2019 17:01:28 +0300 Subject: [PATCH] Use Semaphore - Semaphore used - Change chanel to topic --- src/main.cpp | 125 ++++++++++++++++++++++++++++++--------------------- src/main.h | 12 ++--- 2 files changed, 80 insertions(+), 57 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index a2e0517..04257b8 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -52,6 +52,9 @@ void loop() { } void setupTasks() { + mqttQueueSemaphore = xSemaphoreCreateMutex(); + icmpQueueSemaphore = xSemaphoreCreateMutex(); + xTaskCreate(ntpTask, "NTP_TASK", 2048, NULL, tskIDLE_PRIORITY, NULL); #if defined(SCHEDULE_RESTART) @@ -199,16 +202,16 @@ void messageReceived(String &topic, String &payload) { switch (msgID) { case 1: { if (obj.containsKey("MAC")) { - deviceStruct *device = new deviceStruct; + wakeMessageStruct *device = new wakeMessageStruct; device->mac = obj["MAC"].as(); if (obj.containsKey("port")) device->port = obj["port"].as(); - if (obj.containsKey("retrieveStatus") && obj.containsKey("channel") && obj.containsKey("ip")) { + if (obj.containsKey("retrieveStatus") && obj.containsKey("topic") && obj.containsKey("ip")) { device->retrieveStatus = obj["retrieveStatus"].as(); - device->channel = obj["channel"].as(); + device->topic = obj["topic"].as(); device->ip = obj["ip"].as(); } @@ -221,14 +224,14 @@ void messageReceived(String &topic, String &payload) { } } break; case 2: { - if (obj.containsKey("channel") && obj.containsKey("device")) { - statusStruct *status = new statusStruct; + if (obj.containsKey("topic") && obj.containsKey("device")) { + statusMessageStruct *status = new statusMessageStruct; - status->channel = obj["channel"].as(); + status->topic = obj["topic"].as(); status->mac = obj["device"]["MAC"].as(); status->ip = obj["device"]["IP"].as(); - xTaskCreatePinnedToCore(deviceStatusTask, "deviceStatusTask", 2048, (void *)status, 6, NULL, 1); + xTaskCreatePinnedToCore(deviceStatusTask, "deviceStatusTask", 2048, (void *)status, 4, NULL, 1); } } break; default: @@ -238,6 +241,9 @@ void messageReceived(String &topic, String &payload) { } void mqttMessageQueueProcess() { + if (xSemaphoreTake(mqttQueueSemaphore, pdMS_TO_TICKS(10)) == pdFALSE) + return; + if (mqttMessagesQueueIndex == mqttMessagesQueueSize) mqttMessagesQueueIndex = 0; @@ -258,6 +264,8 @@ void mqttMessageQueueProcess() { mqttMessagesQueueIndex++; } + + xSemaphoreGive(mqttQueueSemaphore); } void sendShadowData(void) { @@ -289,7 +297,7 @@ void sendShadowData(void) { } void wakeDeviceTask(void *pvParameters) { - deviceStruct *device = (deviceStruct *)pvParameters; + wakeMessageStruct *device = (wakeMessageStruct *)pvParameters; bool status; IPAddress deviceIP; @@ -308,7 +316,7 @@ void wakeDeviceTask(void *pvParameters) { if (device->retrieveStatus == true) { deviceIP.fromString(device->ip.c_str()); - icmpRequstAdd(device->mac, deviceIP, device->channel, PING_RETRY_NUM); + icmpRequstAdd(device->mac, deviceIP, device->topic, PING_RETRY_NUM); } delete pvParameters; @@ -316,11 +324,11 @@ void wakeDeviceTask(void *pvParameters) { } void deviceStatusTask(void *pvParameters) { - statusStruct *status = (statusStruct *)pvParameters; + statusMessageStruct *status = (statusMessageStruct *)pvParameters; IPAddress deviceIP; deviceIP.fromString(status->ip.c_str()); - icmpRequstAdd(status->mac, deviceIP, status->channel, 1); + icmpRequstAdd(status->mac, deviceIP, status->topic, 1); delete pvParameters; vTaskDelete(NULL); @@ -365,26 +373,32 @@ void icmpTask(void *pvParameters) { continue; } - for (uint8_t i = 0; i < icmpQueueSize; i++) { - if (icmpQueue[i].waiting == true && millis() >= icmpQueue[i].nextICMP) { - Sprint("> ping "); - Sprintln(icmpQueue[i].ip.toString()); + if (xSemaphoreTake(icmpQueueSemaphore, pdMS_TO_TICKS(1000)) == pdTRUE) { + for (uint8_t i = 0; i < icmpQueueSize; i++) { + if (icmpQueue[i].waiting == true && millis() >= icmpQueue[i].nextICMP) { + Sprint("> ping "); + Sprintln(icmpQueue[i].ip.toString()); + + bool pingResult = Ping.ping(icmpQueue[i].ip); - bool pingResult = Ping.ping(icmpQueue[i].ip); + Sprintf(">> %d\n", pingResult); - Sprintf(">> %d\n", pingResult); + icmpQueue[i].tries--; + icmpQueue[i].nextICMP = millis() + PING_BETWEEN_DELAY_MS; - icmpQueue[i].tries--; - icmpQueue[i].nextICMP = millis() + PING_BETWEEN_DELAY_MS; + Sprintf("tries: %d\n", icmpQueue[i].tries); - if (pingResult == true || icmpQueue[i].tries == 0) { - icmpQueue[i].waiting = false; - addDeviceStatus(icmpQueue[i].mac, icmpQueue[i].topic, pingResult); + if (pingResult == true || icmpQueue[i].tries == 0) { + icmpQueue[i].waiting = false; + addDeviceStatus(icmpQueue[i].mac, icmpQueue[i].topic, pingResult); + } } + + if (icmpQueue[i].waiting == true) + queueIsEmpty = false; } - if (icmpQueue[i].waiting == true) - queueIsEmpty = false; + xSemaphoreGive(icmpQueueSemaphore); } if (queueIsEmpty == true) @@ -397,25 +411,29 @@ void icmpTask(void *pvParameters) { void icmpRequstAdd(String &mac, IPAddress ip, String &topic, uint8_t maxTries) { bool addedToQueue = false; - for (uint8_t i = 0; i < icmpQueueSize; i++) { - if (icmpQueue[i].waiting == true && icmpQueue[i].ip == ip) { - addedToQueue = true; - break; - } + if (xSemaphoreTake(icmpQueueSemaphore, pdMS_TO_TICKS(1000)) == pdTRUE) { + for (uint8_t i = 0; i < icmpQueueSize; i++) { + if (icmpQueue[i].waiting == true && icmpQueue[i].ip == ip) { + addedToQueue = true; + break; + } - if (icmpQueue[i].waiting == false) { - icmpQueue[i].waiting = true; + if (icmpQueue[i].waiting == false) { + icmpQueue[i].waiting = true; - icmpQueue[i].mac = mac; - icmpQueue[i].ip = ip; + icmpQueue[i].mac = mac; + icmpQueue[i].ip = ip; - icmpQueue[i].topic = topic; + icmpQueue[i].topic = topic; - icmpQueue[i].tries = maxTries; + icmpQueue[i].tries = maxTries; - addedToQueue = true; - break; + addedToQueue = true; + break; + } } + + xSemaphoreGive(icmpQueueSemaphore); } if (addedToQueue) @@ -429,28 +447,31 @@ void icmpRequstAdd(String &mac, IPAddress ip, String &topic, uint8_t maxTries) { void addDeviceStatus(String &mac, String &topic, bool status) { bool addedToQueue = false; - for (uint8_t i = 0; i < mqttMessagesQueueSize; i++) { - if (mqttMessagesQueue[i].waiting == false) { - DynamicJsonDocument jsonBuffer(JSON_OBJECT_SIZE(4) + 100); + if (xSemaphoreTake(mqttQueueSemaphore, pdMS_TO_TICKS(1000)) == pdTRUE) { + for (uint8_t i = 0; i < mqttMessagesQueueSize; i++) { + if (mqttMessagesQueue[i].waiting == false) { + DynamicJsonDocument jsonBuffer(JSON_OBJECT_SIZE(4) + 100); - JsonObject rootJSON = jsonBuffer.to(); - rootJSON["MAC"] = mac.c_str(); - rootJSON["pingResult"] = status; + JsonObject rootJSON = jsonBuffer.to(); + rootJSON["MAC"] = mac.c_str(); + rootJSON["pingResult"] = status; - char data[measureJson(rootJSON) + 1]; - serializeJson(rootJSON, data, sizeof(data)); + char data[measureJson(rootJSON) + 1]; + serializeJson(rootJSON, data, sizeof(data)); - Sjson(rootJSON, Serial); + mqttMessagesQueue[i].waiting = true; - mqttMessagesQueue[i].waiting = true; - mqttMessagesQueue[i].topic = topic; - mqttMessagesQueue[i].payload = data; - mqttMessagesQueue[i].nextTry = 0; + mqttMessagesQueue[i].topic = topic; + mqttMessagesQueue[i].payload = data; - addedToQueue = true; + mqttMessagesQueue[i].nextTry = 0; - break; + addedToQueue = true; + break; + } } + + xSemaphoreGive(mqttQueueSemaphore); } if (!addedToQueue) { diff --git a/src/main.h b/src/main.h index 996ca07..2936e13 100644 --- a/src/main.h +++ b/src/main.h @@ -72,21 +72,21 @@ extern void ledBlinkDuration(float onTime = 1, float offTime = 1); #endif #endif -struct deviceStruct { +struct wakeMessageStruct { String mac; uint16_t port = 9; bool retrieveStatus = false; - String channel; + String topic; String ip; bool secureOn = false; String secureOnPassword; }; -struct statusStruct { +struct statusMessageStruct { String mac; - String channel; + String topic; String ip; }; @@ -125,10 +125,12 @@ const size_t mqttMessagesQueueSize = 12; uint8_t mqttMessagesQueueIndex = 0; mqttMessageStruct mqttMessagesQueue[mqttMessagesQueueSize]; +SemaphoreHandle_t mqttQueueSemaphore = NULL; + const size_t icmpQueueSize = 24; icmpQueueStruct icmpQueue[icmpQueueSize]; TaskHandle_t icmpTaskHandler = NULL; - +SemaphoreHandle_t icmpQueueSemaphore = NULL; #endif