From 9657f2dc958bfc49f010db0a1a69e81a84277e8f Mon Sep 17 00:00:00 2001 From: MichaelDvP Date: Sun, 24 Jan 2021 17:33:12 +0100 Subject: [PATCH] add mqtt base on publish, don't queue full-topics (save memory) --- src/mqtt.cpp | 49 ++++++++++++++++++++++++++----------------------- src/mqtt.h | 4 +--- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/src/mqtt.cpp b/src/mqtt.cpp index d2d51946..2896420b 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -78,8 +78,7 @@ void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_ return; } // register in our libary with the callback function. - // We store both the original topic and the fully-qualified one - mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(message->topic), std::move(cb)); + mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(cb)); } // subscribe to the command topic if it doesn't exist yet @@ -175,7 +174,7 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) { // show subscriptions shell.printfln(F("MQTT topic subscriptions:")); for (const auto & mqtt_subfunction : mqtt_subfunctions_) { - shell.printfln(F(" %s"), mqtt_subfunction.full_topic_.c_str()); + shell.printfln(F(" %s/%s"), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str()); } shell.println(); @@ -224,10 +223,15 @@ void Mqtt::incoming(const char * topic, const char * payload) { } // received an MQTT message that we subscribed too -void Mqtt::on_message(const char * topic, const char * payload, size_t len) { +void Mqtt::on_message(const char * fulltopic, const char * payload, size_t len) { if (len == 0) { return; // ignore empty payloads } + if (strncmp(fulltopic, mqtt_base_.c_str(), strlen(mqtt_base_.c_str())) != 0) { + return; // not for us + } + char topic[100]; + strlcpy(topic, &fulltopic[1 + strlen(mqtt_base_.c_str())], 100); // convert payload to a null-terminated char string char message[len + 2]; @@ -238,7 +242,7 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) { // see if we have this topic in our subscription list, then call its callback handler for (const auto & mf : mqtt_subfunctions_) { - if (strcmp(topic, mf.full_topic_.c_str()) == 0) { + if (strcmp(topic, mf.topic_.c_str()) == 0) { if (mf.mqtt_subfunction_) { // matching function, call it. If it returns true keep quit if ((mf.mqtt_subfunction_)(message)) { @@ -307,7 +311,7 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic shell.print(F(" Subscribed MQTT topics: ")); for (const auto & mqtt_subfunction : mqtt_subfunctions_) { if (mqtt_subfunction.device_type_ == device_type) { - shell.printf(F("%s "), mqtt_subfunction.topic_.c_str()); + shell.printf(F("%s/%s "), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str()); } } shell.println(); @@ -535,15 +539,7 @@ std::shared_ptr Mqtt::queue_message(const uint8_t operation, // take the topic and prefix the hostname, unless its for HA std::shared_ptr message; - if ((strncmp(topic.c_str(), "homeassistant/", 13) == 0)) { - // leave topic as it is - message = std::make_shared(operation, topic, payload, retain); - } else { - // prefix the base - std::string full_topic(100, '\0'); - snprintf_P(&full_topic[0], full_topic.capacity() + 1, PSTR("%s/%s"), mqtt_base_.c_str(), topic.c_str()); - message = std::make_shared(operation, full_topic, payload, retain); - } + message = std::make_shared(operation, topic, payload, retain); // if the queue is full, make room but removing the last one if (mqtt_messages_.size() >= MAX_MQTT_MESSAGES) { @@ -672,13 +668,20 @@ void Mqtt::process_queue() { // fetch first from queue and create the full topic name auto mqtt_message = mqtt_messages_.front(); auto message = mqtt_message.content_; + char topic[MQTT_TOPIC_MAX_SIZE]; + if ((strncmp(message->topic.c_str(), "homeassistant/", 13) == 0)) { + // leave topic as it is + strcpy(topic, message->topic.c_str()); + } else { + snprintf_P(topic, MQTT_TOPIC_MAX_SIZE, PSTR("%s/%s"), mqtt_base_.c_str(), message->topic.c_str()); + } // if we're subscribing... if (message->operation == Operation::SUBSCRIBE) { - LOG_DEBUG(F("Subscribing to topic: %s"), message->topic.c_str()); - uint16_t packet_id = mqttClient_->subscribe(message->topic.c_str(), mqtt_qos_); + LOG_DEBUG(F("Subscribing to topic: %s"), topic); + uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_); if (!packet_id) { - LOG_DEBUG(F("Error subscribing to %s"), message->topic.c_str()); + LOG_DEBUG(F("Error subscribing to %s"), topic); } mqtt_messages_.pop_front(); // remove the message from the queue @@ -694,25 +697,25 @@ void Mqtt::process_queue() { // else try and publish it uint16_t packet_id = - mqttClient_->publish(message->topic.c_str(), mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_); + mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_); LOG_DEBUG(F("Publishing topic %s (#%02d, retain=%d, try#%d, size %d, pid %d)"), - message->topic.c_str(), + topic, mqtt_message.id_, message->retain, mqtt_message.retry_count_ + 1, message->payload.size(), packet_id); - + LOG_TRACE(message->payload.c_str()); if (packet_id == 0) { // it failed. if we retried n times, give up. remove from queue if (mqtt_message.retry_count_ == (MQTT_PUBLISH_MAX_RETRY - 1)) { - LOG_ERROR(F("Failed to publish to %s after %d attempts"), message->topic.c_str(), mqtt_message.retry_count_ + 1); + LOG_ERROR(F("Failed to publish to %s after %d attempts"), topic, mqtt_message.retry_count_ + 1); mqtt_publish_fails_++; // increment failure counter mqtt_messages_.pop_front(); // delete return; } else { mqtt_messages_.front().retry_count_++; - LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), message->topic.c_str(), mqtt_message.retry_count_ + 1); + LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), topic, mqtt_message.retry_count_ + 1); return; // leave on queue for next time so it gets republished } } diff --git a/src/mqtt.h b/src/mqtt.h index 3e3e0580..1a55aec1 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -199,13 +199,11 @@ class Mqtt { struct MQTTSubFunction { uint8_t device_type_; // which device type, from DeviceType:: const std::string topic_; // short topic name - const std::string full_topic_; // the fully qualified topic name, usually with the base prefixed mqtt_subfunction_p mqtt_subfunction_; // can be empty - MQTTSubFunction(uint8_t device_type, const std::string && topic, const std::string && full_topic, mqtt_subfunction_p mqtt_subfunction) + MQTTSubFunction(uint8_t device_type, const std::string && topic, mqtt_subfunction_p mqtt_subfunction) : device_type_(device_type) , topic_(topic) - , full_topic_(full_topic) , mqtt_subfunction_(mqtt_subfunction) { } };