Skip to content

Commit

Permalink
Added on_thread callbacks to async_sink.h
Browse files Browse the repository at this point in the history
  • Loading branch information
gabime committed Dec 7, 2024
1 parent 01ef17e commit a1be409
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions include/spdlog/sinks/async_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <thread>
#include <atomic>
#include <cassert>
#include <functional>

#include "../details/async_log_msg.h"
#include "../details/mpmc_blocking_q.h"
Expand Down Expand Up @@ -31,23 +32,18 @@ class async_sink final : public dist_sink<Mutex> {
discard_new // Discard the log message if the queue is full
};

explicit async_sink(size_t queue_size = default_queue_size) {
async_sink(size_t queue_size, std::function<void()> on_thread_start, std::function<void()> on_thread_stop) {
if (queue_size == 0 || queue_size > max_queue_size) {
throw spdlog_ex("async_sink: invalid queue size");
}
// printf("........... Allocating queue: slot: %zu X %zu bytes ====> %lld KB ..............\n",
// queue_size, sizeof(details::async_log_msg), (sizeof(details::async_log_msg) * queue_size)/1024);
q_ = std::make_unique<queue_t>(queue_size);

worker_thread_ = std::thread([this] {
details::async_log_msg incoming_msg;
for (;;) {
q_->dequeue(incoming_msg);
if (incoming_msg.message_type() == async_log_msg::type::terminate) {
break;
}
base_t::sink_it_(incoming_msg);
}
worker_thread_ = std::thread([this, on_thread_start, on_thread_stop] {
if (on_thread_start) on_thread_start();
this->worker_loop();
if (on_thread_stop) on_thread_stop();
});
}
~async_sink() override {
Expand All @@ -58,6 +54,11 @@ class async_sink final : public dist_sink<Mutex> {
}
};

async_sink(): async_sink(default_queue_size, nullptr, nullptr) {}
explicit async_sink(size_t queue_size): async_sink(queue_size, nullptr, nullptr) {}
async_sink(std::function<void()> on_thread_start, std::function<void()> on_thread_stop):
async_sink(default_queue_size, on_thread_start, on_thread_stop) {}

async_sink(const async_sink &) = delete;
async_sink &operator=(const async_sink &) = delete;
async_sink(async_sink &&) = default;
Expand Down Expand Up @@ -98,7 +99,25 @@ class async_sink final : public dist_sink<Mutex> {
assert(false);
throw spdlog_ex("async_sink: invalid overflow policy");
}
}

void worker_loop () {
details::async_log_msg incoming_msg;
for (;;) {
q_->dequeue(incoming_msg);
switch (incoming_msg.message_type()) {
case async_log_msg::type::log:
base_t::sink_it_(incoming_msg);
break;
case async_log_msg::type::flush:
base_t::flush_();
break;
case async_log_msg::type::terminate:
return;
default:
assert(false);
}
}
}

std::atomic<overflow_policy> overflow_policy_ = overflow_policy::block;
Expand Down

0 comments on commit a1be409

Please sign in to comment.