Skip to content

Commit

Permalink
Use Semaphore
Browse files Browse the repository at this point in the history
- Semaphore used
- Change chanel to topic
  • Loading branch information
a7md0 committed Aug 1, 2019
1 parent f628a8c commit 8e0a365
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 57 deletions.
125 changes: 73 additions & 52 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ void loop() {
}

void setupTasks() {
mqttQueueSemaphore = xSemaphoreCreateMutex();
icmpQueueSemaphore = xSemaphoreCreateMutex();

xTaskCreate(ntpTask, "NTP_TASK", 2048, NULL, tskIDLE_PRIORITY, NULL);

#if defined(SCHEDULE_RESTART)
Expand Down Expand Up @@ -199,16 +202,16 @@ void messageReceived(String &topic, String &payload) {
switch (msgID) {
case 1: {
if (obj.containsKey("MAC")) {
deviceStruct *device = new deviceStruct;
wakeMessageStruct *device = new wakeMessageStruct;

device->mac = obj["MAC"].as<String>();

if (obj.containsKey("port"))
device->port = obj["port"].as<uint16_t>();

if (obj.containsKey("retrieveStatus") && obj.containsKey("channel") && obj.containsKey("ip")) {
if (obj.containsKey("retrieveStatus") && obj.containsKey("topic") && obj.containsKey("ip")) {
device->retrieveStatus = obj["retrieveStatus"].as<bool>();
device->channel = obj["channel"].as<String>();
device->topic = obj["topic"].as<String>();
device->ip = obj["ip"].as<String>();
}

Expand All @@ -221,14 +224,14 @@ void messageReceived(String &topic, String &payload) {
}
} break;
case 2: {
if (obj.containsKey("channel") && obj.containsKey("device")) {
statusStruct *status = new statusStruct;
if (obj.containsKey("topic") && obj.containsKey("device")) {
statusMessageStruct *status = new statusMessageStruct;

status->channel = obj["channel"].as<String>();
status->topic = obj["topic"].as<String>();
status->mac = obj["device"]["MAC"].as<String>();
status->ip = obj["device"]["IP"].as<String>();

xTaskCreatePinnedToCore(deviceStatusTask, "deviceStatusTask", 2048, (void *)status, 6, NULL, 1);
xTaskCreatePinnedToCore(deviceStatusTask, "deviceStatusTask", 2048, (void *)status, 4, NULL, 1);
}
} break;
default:
Expand All @@ -238,6 +241,9 @@ void messageReceived(String &topic, String &payload) {
}

void mqttMessageQueueProcess() {
if (xSemaphoreTake(mqttQueueSemaphore, pdMS_TO_TICKS(10)) == pdFALSE)
return;

if (mqttMessagesQueueIndex == mqttMessagesQueueSize)
mqttMessagesQueueIndex = 0;

Expand All @@ -258,6 +264,8 @@ void mqttMessageQueueProcess() {

mqttMessagesQueueIndex++;
}

xSemaphoreGive(mqttQueueSemaphore);
}

void sendShadowData(void) {
Expand Down Expand Up @@ -289,7 +297,7 @@ void sendShadowData(void) {
}

void wakeDeviceTask(void *pvParameters) {
deviceStruct *device = (deviceStruct *)pvParameters;
wakeMessageStruct *device = (wakeMessageStruct *)pvParameters;

bool status;
IPAddress deviceIP;
Expand All @@ -308,19 +316,19 @@ void wakeDeviceTask(void *pvParameters) {
if (device->retrieveStatus == true) {
deviceIP.fromString(device->ip.c_str());

icmpRequstAdd(device->mac, deviceIP, device->channel, PING_RETRY_NUM);
icmpRequstAdd(device->mac, deviceIP, device->topic, PING_RETRY_NUM);
}

delete pvParameters;
vTaskDelete(NULL);
}

void deviceStatusTask(void *pvParameters) {
statusStruct *status = (statusStruct *)pvParameters;
statusMessageStruct *status = (statusMessageStruct *)pvParameters;
IPAddress deviceIP;

deviceIP.fromString(status->ip.c_str());
icmpRequstAdd(status->mac, deviceIP, status->channel, 1);
icmpRequstAdd(status->mac, deviceIP, status->topic, 1);

delete pvParameters;
vTaskDelete(NULL);
Expand Down Expand Up @@ -365,26 +373,32 @@ void icmpTask(void *pvParameters) {
continue;
}

for (uint8_t i = 0; i < icmpQueueSize; i++) {
if (icmpQueue[i].waiting == true && millis() >= icmpQueue[i].nextICMP) {
Sprint("> ping ");
Sprintln(icmpQueue[i].ip.toString());
if (xSemaphoreTake(icmpQueueSemaphore, pdMS_TO_TICKS(1000)) == pdTRUE) {
for (uint8_t i = 0; i < icmpQueueSize; i++) {
if (icmpQueue[i].waiting == true && millis() >= icmpQueue[i].nextICMP) {
Sprint("> ping ");
Sprintln(icmpQueue[i].ip.toString());

bool pingResult = Ping.ping(icmpQueue[i].ip);

bool pingResult = Ping.ping(icmpQueue[i].ip);
Sprintf(">> %d\n", pingResult);

Sprintf(">> %d\n", pingResult);
icmpQueue[i].tries--;
icmpQueue[i].nextICMP = millis() + PING_BETWEEN_DELAY_MS;

icmpQueue[i].tries--;
icmpQueue[i].nextICMP = millis() + PING_BETWEEN_DELAY_MS;
Sprintf("tries: %d\n", icmpQueue[i].tries);

if (pingResult == true || icmpQueue[i].tries == 0) {
icmpQueue[i].waiting = false;
addDeviceStatus(icmpQueue[i].mac, icmpQueue[i].topic, pingResult);
if (pingResult == true || icmpQueue[i].tries == 0) {
icmpQueue[i].waiting = false;
addDeviceStatus(icmpQueue[i].mac, icmpQueue[i].topic, pingResult);
}
}

if (icmpQueue[i].waiting == true)
queueIsEmpty = false;
}

if (icmpQueue[i].waiting == true)
queueIsEmpty = false;
xSemaphoreGive(icmpQueueSemaphore);
}

if (queueIsEmpty == true)
Expand All @@ -397,25 +411,29 @@ void icmpTask(void *pvParameters) {
void icmpRequstAdd(String &mac, IPAddress ip, String &topic, uint8_t maxTries) {
bool addedToQueue = false;

for (uint8_t i = 0; i < icmpQueueSize; i++) {
if (icmpQueue[i].waiting == true && icmpQueue[i].ip == ip) {
addedToQueue = true;
break;
}
if (xSemaphoreTake(icmpQueueSemaphore, pdMS_TO_TICKS(1000)) == pdTRUE) {
for (uint8_t i = 0; i < icmpQueueSize; i++) {
if (icmpQueue[i].waiting == true && icmpQueue[i].ip == ip) {
addedToQueue = true;
break;
}

if (icmpQueue[i].waiting == false) {
icmpQueue[i].waiting = true;
if (icmpQueue[i].waiting == false) {
icmpQueue[i].waiting = true;

icmpQueue[i].mac = mac;
icmpQueue[i].ip = ip;
icmpQueue[i].mac = mac;
icmpQueue[i].ip = ip;

icmpQueue[i].topic = topic;
icmpQueue[i].topic = topic;

icmpQueue[i].tries = maxTries;
icmpQueue[i].tries = maxTries;

addedToQueue = true;
break;
addedToQueue = true;
break;
}
}

xSemaphoreGive(icmpQueueSemaphore);
}

if (addedToQueue)
Expand All @@ -429,28 +447,31 @@ void icmpRequstAdd(String &mac, IPAddress ip, String &topic, uint8_t maxTries) {
void addDeviceStatus(String &mac, String &topic, bool status) {
bool addedToQueue = false;

for (uint8_t i = 0; i < mqttMessagesQueueSize; i++) {
if (mqttMessagesQueue[i].waiting == false) {
DynamicJsonDocument jsonBuffer(JSON_OBJECT_SIZE(4) + 100);
if (xSemaphoreTake(mqttQueueSemaphore, pdMS_TO_TICKS(1000)) == pdTRUE) {
for (uint8_t i = 0; i < mqttMessagesQueueSize; i++) {
if (mqttMessagesQueue[i].waiting == false) {
DynamicJsonDocument jsonBuffer(JSON_OBJECT_SIZE(4) + 100);

JsonObject rootJSON = jsonBuffer.to<JsonObject>();
rootJSON["MAC"] = mac.c_str();
rootJSON["pingResult"] = status;
JsonObject rootJSON = jsonBuffer.to<JsonObject>();
rootJSON["MAC"] = mac.c_str();
rootJSON["pingResult"] = status;

char data[measureJson(rootJSON) + 1];
serializeJson(rootJSON, data, sizeof(data));
char data[measureJson(rootJSON) + 1];
serializeJson(rootJSON, data, sizeof(data));

Sjson(rootJSON, Serial);
mqttMessagesQueue[i].waiting = true;

mqttMessagesQueue[i].waiting = true;
mqttMessagesQueue[i].topic = topic;
mqttMessagesQueue[i].payload = data;
mqttMessagesQueue[i].nextTry = 0;
mqttMessagesQueue[i].topic = topic;
mqttMessagesQueue[i].payload = data;

addedToQueue = true;
mqttMessagesQueue[i].nextTry = 0;

break;
addedToQueue = true;
break;
}
}

xSemaphoreGive(mqttQueueSemaphore);
}

if (!addedToQueue) {
Expand Down
12 changes: 7 additions & 5 deletions src/main.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,21 @@ extern void ledBlinkDuration(float onTime = 1, float offTime = 1);
#endif
#endif

struct deviceStruct {
struct wakeMessageStruct {
String mac;
uint16_t port = 9;

bool retrieveStatus = false;
String channel;
String topic;
String ip;

bool secureOn = false;
String secureOnPassword;
};

struct statusStruct {
struct statusMessageStruct {
String mac;
String channel;
String topic;
String ip;
};

Expand Down Expand Up @@ -125,10 +125,12 @@ const size_t mqttMessagesQueueSize = 12;
uint8_t mqttMessagesQueueIndex = 0;
mqttMessageStruct mqttMessagesQueue[mqttMessagesQueueSize];

SemaphoreHandle_t mqttQueueSemaphore = NULL;

const size_t icmpQueueSize = 24;
icmpQueueStruct icmpQueue[icmpQueueSize];

TaskHandle_t icmpTaskHandler = NULL;

SemaphoreHandle_t icmpQueueSemaphore = NULL;

#endif

0 comments on commit 8e0a365

Please sign in to comment.