Skip to content

Commit f514161

Browse files
a-w50james-ctc
authored andcommitted
proposal for unique_ptr and const ref
Signed-off-by: aw <aw@pionix.de>
1 parent e46109f commit f514161

File tree

4 files changed

+39
-43
lines changed

4 files changed

+39
-43
lines changed

include/utils/message_queue.hpp

+13-13
Original file line numberDiff line numberDiff line change
@@ -23,44 +23,44 @@ using json = nlohmann::json;
2323
struct Message {
2424
std::string topic; ///< The MQTT topic where this message originated from
2525
std::string payload; ///< The message payload
26+
};
2627

27-
Message(const std::string& topic, const std::string& payload);
28+
struct ParsedMessage {
29+
std::string topic;
30+
json data;
2831
};
2932

33+
using MessageCallback = std::function<void(const Message&)>;
34+
3035
/// \brief Simple message queue that takes std::string messages, parsed them and dispatches them to handlers
3136
class MessageQueue {
37+
3238
private:
3339
std::thread worker_thread;
34-
std::queue<std::shared_ptr<Message>> message_queue;
40+
std::queue<std::unique_ptr<Message>> message_queue;
3541
std::mutex queue_ctrl_mutex;
36-
std::function<void(std::shared_ptr<Message> message)> message_callback;
42+
MessageCallback message_callback;
3743
std::condition_variable cv;
3844
bool running;
3945

4046
public:
4147
/// \brief Creates a message queue with the provided \p message_callback
42-
explicit MessageQueue(const std::function<void(std::shared_ptr<Message> message)>& message_callback);
48+
explicit MessageQueue(MessageCallback);
4349
~MessageQueue();
4450

4551
/// \brief Adds a \p message to the message queue which will then be delivered to the message callback
46-
void add(std::shared_ptr<Message> message);
52+
void add(std::unique_ptr<Message>);
4753

4854
/// \brief Stops the message queue
4955
void stop();
5056
};
5157

5258
/// \brief Contains a message queue driven list of handler callbacks
5359
class MessageHandler {
54-
public:
55-
struct MessageDetails {
56-
std::string topic;
57-
std::shared_ptr<json> data;
58-
};
59-
6060
private:
6161
std::unordered_set<std::shared_ptr<TypedHandler>> handlers;
6262
std::thread handler_thread;
63-
std::queue<MessageDetails> message_queue;
63+
std::queue<std::unique_ptr<ParsedMessage>> message_queue;
6464
std::mutex handler_ctrl_mutex;
6565
std::mutex handler_list_mutex;
6666
std::condition_variable cv;
@@ -74,7 +74,7 @@ class MessageHandler {
7474
~MessageHandler();
7575

7676
/// \brief Adds a \p message to the message queue which will be delivered to the registered handlers
77-
void add(MessageDetails message);
77+
void add(std::unique_ptr<ParsedMessage>);
7878

7979
/// \brief Stops the message handler
8080
void stop();

include/utils/mqtt_abstraction_impl.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class MQTTAbstractionImpl {
129129
static int open_nb_socket(const char* addr, const char* port);
130130
bool connectBroker(std::string& socket_path);
131131
bool connectBroker(const char* host, const char* port);
132-
void on_mqtt_message(std::shared_ptr<Message> message);
132+
void on_mqtt_message(const Message& message);
133133
void on_mqtt_connect();
134134
static void on_mqtt_disconnect();
135135

lib/message_queue.cpp

+13-17
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,30 @@
1010

1111
namespace Everest {
1212

13-
Message::Message(const std::string& topic, const std::string& payload) : topic(topic), payload(payload) {
14-
}
15-
16-
MessageQueue::MessageQueue(const std::function<void(std::shared_ptr<Message> message)>& message_callback) :
17-
message_callback(message_callback), running(true) {
13+
MessageQueue::MessageQueue(MessageCallback message_callback_) :
14+
message_callback(std::move(message_callback_)), running(true) {
1815
this->worker_thread = std::thread([this]() {
1916
while (true) {
20-
std::shared_ptr<Message> message;
2117
std::unique_lock<std::mutex> lock(this->queue_ctrl_mutex);
2218
this->cv.wait(lock, [this]() { return !this->message_queue.empty() || this->running == false; });
2319
if (!this->running) {
2420
return;
2521
}
2622

27-
message = this->message_queue.front();
23+
const auto message = std::move(this->message_queue.front());
2824
this->message_queue.pop();
2925
lock.unlock();
3026

3127
// pass the message to the message callback
32-
this->message_callback(message);
28+
this->message_callback(*message);
3329
}
3430
});
3531
}
3632

37-
void MessageQueue::add(std::shared_ptr<Message> message) {
33+
void MessageQueue::add(std::unique_ptr<Message> message) {
3834
{
3935
std::lock_guard<std::mutex> lock(this->queue_ctrl_mutex);
40-
this->message_queue.push(message);
36+
this->message_queue.push(std::move(message));
4137
}
4238
this->cv.notify_all();
4339
}
@@ -64,11 +60,11 @@ MessageHandler::MessageHandler() : running(true) {
6460
return;
6561
}
6662

67-
auto message = std::move(this->message_queue.front());
63+
const auto message = std::move(this->message_queue.front());
6864
this->message_queue.pop();
6965
lock.unlock();
7066

71-
auto& data = *message.data;
67+
const auto& data = message->data;
7268

7369
// get the registered handlers
7470
std::vector<std::shared_ptr<TypedHandler>> local_handlers;
@@ -89,7 +85,7 @@ MessageHandler::MessageHandler() : running(true) {
8985
continue;
9086
}
9187
if (data.at("type") == "call") {
92-
handler_fn(message.topic, data.at("data"));
88+
handler_fn(message->topic, data.at("data"));
9389
}
9490
} else if (handler->type == HandlerType::Result) {
9591
// unpack result
@@ -99,25 +95,25 @@ MessageHandler::MessageHandler() : running(true) {
9995
if (data.at("type") == "result") {
10096
// only deliver result to handler with matching id
10197
if (handler->id == data.at("data").at("id")) {
102-
handler_fn(message.topic, data.at("data"));
98+
handler_fn(message->topic, data.at("data"));
10399
}
104100
}
105101
} else if (handler->type == HandlerType::SubscribeVar) {
106102
// unpack var
107103
if (handler->name != data.at("name")) {
108104
continue;
109105
}
110-
handler_fn(message.topic, data.at("data"));
106+
handler_fn(message->topic, data.at("data"));
111107
} else {
112108
// external or unknown, no preprocessing
113-
handler_fn(message.topic, data);
109+
handler_fn(message->topic, data);
114110
}
115111
}
116112
}
117113
});
118114
}
119115

120-
void MessageHandler::add(MessageDetails message) {
116+
void MessageHandler::add(std::unique_ptr<ParsedMessage> message) {
121117
{
122118
std::lock_guard<std::mutex> lock(this->handler_ctrl_mutex);
123119
this->message_queue.push(std::move(message));

lib/mqtt_abstraction_impl.cpp

+12-12
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ namespace Everest {
2626
const auto mqtt_keep_alive = 600;
2727

2828
MessageWithQOS::MessageWithQOS(const std::string& topic, const std::string& payload, QOS qos) :
29-
Message(topic, payload), qos(qos) {
29+
Message{topic, payload}, qos(qos) {
3030
}
3131

3232
MQTTAbstractionImpl::MQTTAbstractionImpl(const std::string& mqtt_server_address, const std::string& mqtt_server_port,
3333
const std::string& mqtt_everest_prefix,
3434
const std::string& mqtt_external_prefix) :
35-
message_queue(([this](std::shared_ptr<Message> message) { this->on_mqtt_message(message); })),
35+
message_queue(([this](const Message& message) { this->on_mqtt_message(message); })),
3636
mqtt_server_address(mqtt_server_address),
3737
mqtt_server_port(mqtt_server_port),
3838
mqtt_everest_prefix(mqtt_everest_prefix),
@@ -57,7 +57,7 @@ MQTTAbstractionImpl::MQTTAbstractionImpl(const std::string& mqtt_server_address,
5757
MQTTAbstractionImpl::MQTTAbstractionImpl(const std::string& mqtt_server_socket_path,
5858
const std::string& mqtt_everest_prefix,
5959
const std::string& mqtt_external_prefix) :
60-
message_queue(([this](std::shared_ptr<Message> message) { this->on_mqtt_message(message); })),
60+
message_queue(([this](const Message& message) { this->on_mqtt_message(message); })),
6161
mqtt_server_socket_path(mqtt_server_socket_path),
6262
mqtt_everest_prefix(mqtt_everest_prefix),
6363
mqtt_external_prefix(mqtt_external_prefix),
@@ -265,28 +265,28 @@ std::future<void> MQTTAbstractionImpl::spawn_main_loop_thread() {
265265
return future;
266266
}
267267

268-
void MQTTAbstractionImpl::on_mqtt_message(std::shared_ptr<Message> message) {
268+
void MQTTAbstractionImpl::on_mqtt_message(const Message& message) {
269269
BOOST_LOG_FUNCTION();
270270

271-
const std::string& topic = message->topic;
272-
const std::string& payload = message->payload;
271+
const auto& topic = message.topic;
272+
const auto& payload = message.payload;
273273

274274
try {
275-
std::shared_ptr<json> data;
275+
json data;
276276
bool is_everest_topic = false;
277277
if (topic.find(mqtt_everest_prefix) == 0) {
278278
EVLOG_verbose << fmt::format("topic {} starts with {}", topic, mqtt_everest_prefix);
279279
is_everest_topic = true;
280280
try {
281-
data = std::make_shared<json>(json::parse(payload));
281+
data = json::parse(payload);
282282
} catch (nlohmann::detail::parse_error& e) {
283283
EVLOG_warning << fmt::format("Could not decode json for incoming topic '{}': {}", topic, payload);
284284
return;
285285
}
286286
} else {
287287
EVLOG_debug << fmt::format("Message parsing for topic '{}' not implemented. Wrapping in json object.",
288288
topic);
289-
data = std::make_shared<json>(json(payload));
289+
data = json(payload);
290290
}
291291

292292
bool found = false;
@@ -306,7 +306,7 @@ void MQTTAbstractionImpl::on_mqtt_message(std::shared_ptr<Message> message) {
306306

307307
if (topic_matches) {
308308
found = true;
309-
handler.add({topic, data});
309+
handler.add(std::unique_ptr<ParsedMessage>(new ParsedMessage{topic, std::move(data)}));
310310
}
311311
}
312312
lock.unlock();
@@ -642,9 +642,9 @@ void MQTTAbstractionImpl::publish_callback(void** state, struct mqtt_response_pu
642642
auto message_queue = static_cast<MessageQueue*>(*state);
643643

644644
// topic_name and application_message are NOT null-terminated, hence copy construct strings
645-
message_queue->add(std::make_shared<Message>(
645+
message_queue->add(std::unique_ptr<Message>(new Message{
646646
std::string(static_cast<const char*>(published->topic_name), published->topic_name_size),
647-
std::string(static_cast<const char*>(published->application_message), published->application_message_size)));
647+
std::string(static_cast<const char*>(published->application_message), published->application_message_size)}));
648648
}
649649

650650
} // namespace Everest

0 commit comments

Comments
 (0)