Skip to content
This repository has been archived by the owner on Oct 4, 2021. It is now read-only.

Commit

Permalink
add mqtt base on publish, don't queue full-topics (save memory)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelDvP committed Jan 24, 2021
1 parent ce18bcf commit 9657f2d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 26 deletions.
49 changes: 26 additions & 23 deletions src/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_
return;
}
// register in our libary with the callback function.
// We store both the original topic and the fully-qualified one
mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(message->topic), std::move(cb));
mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(cb));
}

// subscribe to the command topic if it doesn't exist yet
Expand Down Expand Up @@ -175,7 +174,7 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
// show subscriptions
shell.printfln(F("MQTT topic subscriptions:"));
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
shell.printfln(F(" %s"), mqtt_subfunction.full_topic_.c_str());
shell.printfln(F(" %s/%s"), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str());
}
shell.println();

Expand Down Expand Up @@ -224,10 +223,15 @@ void Mqtt::incoming(const char * topic, const char * payload) {
}

// received an MQTT message that we subscribed too
void Mqtt::on_message(const char * topic, const char * payload, size_t len) {
void Mqtt::on_message(const char * fulltopic, const char * payload, size_t len) {
if (len == 0) {
return; // ignore empty payloads
}
if (strncmp(fulltopic, mqtt_base_.c_str(), strlen(mqtt_base_.c_str())) != 0) {
return; // not for us
}
char topic[100];
strlcpy(topic, &fulltopic[1 + strlen(mqtt_base_.c_str())], 100);

// convert payload to a null-terminated char string
char message[len + 2];
Expand All @@ -238,7 +242,7 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) {

// see if we have this topic in our subscription list, then call its callback handler
for (const auto & mf : mqtt_subfunctions_) {
if (strcmp(topic, mf.full_topic_.c_str()) == 0) {
if (strcmp(topic, mf.topic_.c_str()) == 0) {
if (mf.mqtt_subfunction_) {
// matching function, call it. If it returns true keep quit
if ((mf.mqtt_subfunction_)(message)) {
Expand Down Expand Up @@ -307,7 +311,7 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic
shell.print(F(" Subscribed MQTT topics: "));
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
if (mqtt_subfunction.device_type_ == device_type) {
shell.printf(F("%s "), mqtt_subfunction.topic_.c_str());
shell.printf(F("%s/%s "), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str());
}
}
shell.println();
Expand Down Expand Up @@ -535,15 +539,7 @@ std::shared_ptr<const MqttMessage> Mqtt::queue_message(const uint8_t operation,

// take the topic and prefix the hostname, unless its for HA
std::shared_ptr<MqttMessage> message;
if ((strncmp(topic.c_str(), "homeassistant/", 13) == 0)) {
// leave topic as it is
message = std::make_shared<MqttMessage>(operation, topic, payload, retain);
} else {
// prefix the base
std::string full_topic(100, '\0');
snprintf_P(&full_topic[0], full_topic.capacity() + 1, PSTR("%s/%s"), mqtt_base_.c_str(), topic.c_str());
message = std::make_shared<MqttMessage>(operation, full_topic, payload, retain);
}
message = std::make_shared<MqttMessage>(operation, topic, payload, retain);

// if the queue is full, make room but removing the last one
if (mqtt_messages_.size() >= MAX_MQTT_MESSAGES) {
Expand Down Expand Up @@ -672,13 +668,20 @@ void Mqtt::process_queue() {
// fetch first from queue and create the full topic name
auto mqtt_message = mqtt_messages_.front();
auto message = mqtt_message.content_;
char topic[MQTT_TOPIC_MAX_SIZE];
if ((strncmp(message->topic.c_str(), "homeassistant/", 13) == 0)) {
// leave topic as it is
strcpy(topic, message->topic.c_str());
} else {
snprintf_P(topic, MQTT_TOPIC_MAX_SIZE, PSTR("%s/%s"), mqtt_base_.c_str(), message->topic.c_str());
}

// if we're subscribing...
if (message->operation == Operation::SUBSCRIBE) {
LOG_DEBUG(F("Subscribing to topic: %s"), message->topic.c_str());
uint16_t packet_id = mqttClient_->subscribe(message->topic.c_str(), mqtt_qos_);
LOG_DEBUG(F("Subscribing to topic: %s"), topic);
uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_);
if (!packet_id) {
LOG_DEBUG(F("Error subscribing to %s"), message->topic.c_str());
LOG_DEBUG(F("Error subscribing to %s"), topic);
}

mqtt_messages_.pop_front(); // remove the message from the queue
Expand All @@ -694,25 +697,25 @@ void Mqtt::process_queue() {

// else try and publish it
uint16_t packet_id =
mqttClient_->publish(message->topic.c_str(), mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
LOG_DEBUG(F("Publishing topic %s (#%02d, retain=%d, try#%d, size %d, pid %d)"),
message->topic.c_str(),
topic,
mqtt_message.id_,
message->retain,
mqtt_message.retry_count_ + 1,
message->payload.size(),
packet_id);

LOG_TRACE(message->payload.c_str());
if (packet_id == 0) {
// it failed. if we retried n times, give up. remove from queue
if (mqtt_message.retry_count_ == (MQTT_PUBLISH_MAX_RETRY - 1)) {
LOG_ERROR(F("Failed to publish to %s after %d attempts"), message->topic.c_str(), mqtt_message.retry_count_ + 1);
LOG_ERROR(F("Failed to publish to %s after %d attempts"), topic, mqtt_message.retry_count_ + 1);
mqtt_publish_fails_++; // increment failure counter
mqtt_messages_.pop_front(); // delete
return;
} else {
mqtt_messages_.front().retry_count_++;
LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), message->topic.c_str(), mqtt_message.retry_count_ + 1);
LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), topic, mqtt_message.retry_count_ + 1);
return; // leave on queue for next time so it gets republished
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,11 @@ class Mqtt {
struct MQTTSubFunction {
uint8_t device_type_; // which device type, from DeviceType::
const std::string topic_; // short topic name
const std::string full_topic_; // the fully qualified topic name, usually with the base prefixed
mqtt_subfunction_p mqtt_subfunction_; // can be empty

MQTTSubFunction(uint8_t device_type, const std::string && topic, const std::string && full_topic, mqtt_subfunction_p mqtt_subfunction)
MQTTSubFunction(uint8_t device_type, const std::string && topic, mqtt_subfunction_p mqtt_subfunction)
: device_type_(device_type)
, topic_(topic)
, full_topic_(full_topic)
, mqtt_subfunction_(mqtt_subfunction) {
}
};
Expand Down

0 comments on commit 9657f2d

Please sign in to comment.