Skip to content

Commit

Permalink
Merge pull request #271 from odygrd/bounded_queue
Browse files Browse the repository at this point in the history
add bounded queue blocking and transit event soft and hard limits
  • Loading branch information
odygrd authored Mar 22, 2023
2 parents 9610320 + 3b6999e commit 9cb4a4f
Show file tree
Hide file tree
Showing 26 changed files with 984 additions and 531 deletions.
9 changes: 1 addition & 8 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,7 @@ jobs:
with_tests: ON
cmake_options: -DQUILL_SANITIZE_THREAD=ON

# Builds with bounded queue and no exceptions
- cxx: g++-10
build_type: Release
std: 17
os: ubuntu-20.04
with_tests: OFF
cmake_options: -DQUILL_USE_BOUNDED_QUEUE=ON

# Builds with no exceptions
- cxx: g++-10
build_type: Release
std: 17
Expand Down
21 changes: 19 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,27 @@
- Add back `backend_thread_sleep_duration` in `Config.h` ([#256](https://github.com/odygrd/quill/pull/256))
- For `quill::rotating_file_handler(...)` and `quill::time_rotating_file_handler(...)` the `backup_count` argument is
now default to `std::numeric_limits<std::uint32_t>::max()`
- When the logging file is deleted from the command line while the logger is still using it, then a new file will be reopened for writing.
- When the logging file is deleted from the command line while the logger is still using it, then a new file will be
reopened for writing.
- Added `quill::Clock` which enables taking and converting TSC timestamps to system clock timestamps.
When `TimestampClockType::Tsc` is used as the default clock type in `Config.h` this class
can also be used to generate timestamps that are in sync with the timestamps in the log file. ([#264](https://github.com/odygrd/quill/pull/264))
can also be used to generate timestamps that are in sync with the timestamps in the log
file. ([#264](https://github.com/odygrd/quill/pull/264))
- Both `Unbounded` and `Bounded` queue modes can now be used without having to recompile `quill` library. This is still
not a runtime option, you still need to recompile your target and pass `QUILL_USE_BOUNDED_QUEUE` as a flag.
See [example_bounded_queue_message_dropping.cpp](https://github.com/odygrd/quill/blob/master/examples/example_bounded_queue_message_dropping.cpp)
- Added `QUILL_USE_BOUNDED_BLOCKING_QUEUE` option that makes possible to use a bounded queue which blocks the hot
thread rather than dropping messages ([#270](https://github.com/odygrd/quill/pull/270))
See [example_bounded_queue_blocking.cpp](https://github.com/odygrd/quill/blob/master/examples/example_bounded_queue_blocking.cpp)
- Renamed `backend_thread_max_transit_events` to `backend_thread_transit_events_soft_limit` in
Config.h ([#270](https://github.com/odygrd/quill/pull/270))
- Added `backend_thread_transit_events_hard_limit` in Config.h ([#270](https://github.com/odygrd/quill/pull/270))
- Added `backend_thread_use_transit_buffer` in Config.h ([#270](https://github.com/odygrd/quill/pull/270))
- CMake: `QUILL_X86ARCH` and `QUILL_USE_BOUNDED_QUEUE` options have been removed. The users can decide on enabling these
options on their side and quill doesn't need to be recompiled as a library. For example :
```cmake
target_compile_definitions(<target> PUBLIC QUILL_X86ARCH QUILL_USE_BOUNDED_QUEUE)
```

## v2.7.0

Expand Down
6 changes: 0 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ option(QUILL_FMT_EXTERNAL "Use external fmt library instead of bundled" OFF)

option(QUILL_NO_EXCEPTIONS "Build without exceptions with -fno-exceptions flag" OFF)

option(QUILL_USE_BOUNDED_QUEUE "Build with bounded queue instead of unbounded" OFF)

option(QUILL_X86ARCH "Enables x86 optimisations, needs to be combined with -march" OFF)

option(QUILL_BUILD_EXAMPLES "Build the examples" OFF)

option(QUILL_BUILD_TESTS "Build the tests (Requires https://github.com/google/googletest to be installed)" OFF)
Expand Down Expand Up @@ -108,8 +104,6 @@ if (QUILL_MASTER_PROJECT)
endif ()

message(STATUS "QUILL_NO_EXCEPTIONS: " ${QUILL_NO_EXCEPTIONS})
message(STATUS "QUILL_X86ARCH: " ${QUILL_X86ARCH})
message(STATUS "QUILL_USE_BOUNDED_QUEUE: " ${QUILL_USE_BOUNDED_QUEUE})
message(STATUS "QUILL_FMT_EXTERNAL: " ${QUILL_FMT_EXTERNAL})
message(STATUS "QUILL_NO_THREAD_NAME_SUPPORT: " ${QUILL_NO_THREAD_NAME_SUPPORT})

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ formatting, ordering the log messages from multiple hot threads and finally outp
text.
The logging thread always empties all the queues of the hot threads on the highest priority (to avoid allocating a new
queue or dropping messages on the hot path). To achieve that, it internally buffers the log messages and then
writes them later when the hot thread queues are empty or when a limit is reached `backend_thread_max_transit_events`.
writes them later when the hot thread queues are empty or when a limit is
reached `backend_thread_transit_events_soft_limit`.

I haven't found an easy way to compare the throughput against other logging libraries while doing asynchronous logging.
For example some libraries will drop the log messages ending in producing much smaller log files than the expected,
Expand Down
3 changes: 3 additions & 0 deletions benchmarks/backend_throughput/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
add_executable(BENCHMARK_quill_backend_throughput quill_backend_throughput.cpp)
target_link_libraries(BENCHMARK_quill_backend_throughput quill)

add_executable(BENCHMARK_quill_backend_throughput_no_buffering quill_backend_throughput_no_buffering.cpp)
target_link_libraries(BENCHMARK_quill_backend_throughput_no_buffering quill)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include "quill/Quill.h"
#include <chrono>
#include <iostream>

static constexpr size_t total_iterations = 4'000'000;

/**
* The backend worker just spins, so we just measure the total time elapsed for total_iterations
*/
int main()
{
// main thread affinity
quill::detail::set_cpu_affinity(0);

quill::Config cfg;
cfg.backend_thread_yield = false;
cfg.backend_thread_cpu_affinity = 1;
cfg.backend_thread_use_transit_buffer = false;

quill::configure(cfg);

// Start the logging backend thread and give it some tiem to init
quill::start();
std::this_thread::sleep_for(std::chrono::milliseconds{100});

// Create a file handler to write to a file
quill::Handler* file_handler = quill::file_handler("quill_backend_total_time.log", "w");
file_handler->set_pattern("%(ascii_time) [%(thread)] %(fileline) %(level_name) %(message)");
quill::Logger* logger = quill::create_logger("bench_logger", file_handler);
quill::preallocate();

// start counting the time until backend worker finishes
auto const start_time = std::chrono::steady_clock::now();
for (size_t iteration = 0; iteration < total_iterations; ++iteration)
{
LOG_INFO(logger, "Iteration: {} int: {} double: {}", iteration, iteration * 2,
static_cast<double>(iteration) / 2);
}

// block until all messages are flushed
quill::flush();

auto const end_time = std::chrono::steady_clock::now();
auto const delta = end_time - start_time;
auto delta_d = std::chrono::duration_cast<std::chrono::duration<double>>(delta).count();

std::cout << fmt::format(
"Throughput is {:.2f} million msgs/sec average, total time elapsed: {} ms for {} "
"log messages \n",
total_iterations / delta_d / 1e6,
std::chrono::duration_cast<std::chrono::milliseconds>(delta).count(), total_iterations)
<< std::endl;
}
10 changes: 9 additions & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,12 @@ add_executable(example_trivial_system_clock example_trivial_system_clock.cpp)
target_link_libraries(example_trivial_system_clock quill)

add_executable(example_user_defined_types example_user_defined_types.cpp)
target_link_libraries(example_user_defined_types quill)
target_link_libraries(example_user_defined_types quill)

add_executable(example_bounded_queue_message_dropping example_bounded_queue_message_dropping.cpp)
target_link_libraries(example_bounded_queue_message_dropping quill)
target_compile_definitions(example_bounded_queue_message_dropping PUBLIC QUILL_USE_BOUNDED_QUEUE)

add_executable(example_bounded_queue_blocking example_bounded_queue_blocking.cpp)
target_link_libraries(example_bounded_queue_blocking quill)
target_compile_definitions(example_bounded_queue_blocking PUBLIC QUILL_USE_BOUNDED_BLOCKING_QUEUE)
32 changes: 32 additions & 0 deletions examples/example_bounded_queue_blocking.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// ALWAYS define QUILL_USE_BOUNDED_BLOCKING_QUEUE before including quill.
// A better option is to ALWAYS pass this flag when you are building e.g in CMake target_compile_definitions(<target> PUBLIC QUILL_USE_BOUNDED_BLOCKING_QUEUE)
#define QUILL_USE_BOUNDED_BLOCKING_QUEUE

#include "quill/Quill.h"

int main()
{
quill::Handler* handler = quill::stdout_handler(); /** for stdout **/
// quill::Handler* handler = quill::file_handler("quickstart.log", "w"); /** for writing to file **/
handler->set_pattern("%(ascii_time) [%(thread)] %(fileline:<28) LOG_%(level_name) %(message)");

// set configuration
quill::Config cfg;
cfg.default_handlers.push_back(handler);

// to simulate message blocking easily we can to set a high sleep duration to the logging thread
cfg.backend_thread_sleep_duration = std::chrono::seconds{1};

// and a small queue size
cfg.default_queue_capacity = 4'096;

// Apply configuration and start the backend worker thread
quill::configure(cfg);
quill::start();

for (size_t i = 0; i < 1000; ++i)
{
LOG_INFO(quill::get_logger(), "Hello {} #{}", "world", i);
LOG_ERROR(quill::get_logger(), "This is a log error example {} #{}", 7, i);
}
}
32 changes: 32 additions & 0 deletions examples/example_bounded_queue_message_dropping.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// ALWAYS define QUILL_USE_BOUNDED_QUEUE before including quill.
// A better option is to ALWAYS pass this flag when you are building e.g in CMake target_compile_definitions(<target> PUBLIC QUILL_USE_BOUNDED_QUEUE)
#define QUILL_USE_BOUNDED_QUEUE

#include "quill/Quill.h"

int main()
{
quill::Handler* handler = quill::stdout_handler(); /** for stdout **/
// quill::Handler* handler = quill::file_handler("quickstart.log", "w"); /** for writing to file **/
handler->set_pattern("%(ascii_time) [%(thread)] %(fileline:<28) LOG_%(level_name) %(message)");

// set configuration
quill::Config cfg;
cfg.default_handlers.push_back(handler);

// to simulate message dropping easily we can to set a high sleep duration to the logging thread
cfg.backend_thread_sleep_duration = std::chrono::seconds{1};

// and a small queue size
cfg.default_queue_capacity = 4'096;

// Apply configuration and start the backend worker thread
quill::configure(cfg);
quill::start();

for (size_t i = 0; i < 1000; ++i)
{
LOG_INFO(quill::get_logger(), "Hello {} #{}", "world", i);
LOG_ERROR(quill::get_logger(), "This is a log error example {} #{}", 7, i);
}
}
9 changes: 0 additions & 9 deletions quill/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ set(SOURCE_FILES
src/detail/HandlerCollection.cpp
src/detail/LoggerCollection.cpp
src/detail/SignalHandler.cpp
src/detail/ThreadContextCollection.cpp

src/handlers/ConsoleHandler.cpp
src/handlers/FileHandler.cpp
Expand Down Expand Up @@ -133,14 +132,6 @@ if (QUILL_NO_EXCEPTIONS)
endif ()
endif ()

if (QUILL_X86ARCH)
target_compile_definitions(${TARGET_NAME} PUBLIC -DQUILL_X86ARCH)
endif ()

if (QUILL_USE_BOUNDED_QUEUE)
target_compile_definitions(${TARGET_NAME} PUBLIC -DQUILL_USE_BOUNDED_QUEUE)
endif ()

if (QUILL_NO_THREAD_NAME_SUPPORT)
target_compile_definitions(${TARGET_NAME} PUBLIC -DQUILL_NO_THREAD_NAME_SUPPORT)
endif ()
Expand Down
43 changes: 39 additions & 4 deletions quill/include/quill/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,59 @@ struct Config
*/
std::chrono::nanoseconds backend_thread_sleep_duration = std::chrono::nanoseconds{0};

/**
* The backend worker will drain all hot queues and buffer the messages by default.
* If this option is set to false then it will not buffer and simple process the message
* with the lowest timestamp from the SPSC queues.
* @note It is not recommended to set this to false, unless for example you want to limit
* the logging thread memory usage
*/
size_t backend_thread_use_transit_buffer = true;

/**
* The backend worker thread gives priority to reading the messages from SPSC queues from all
* the hot threads first and buffers them temporarily.
*
* However if the hot threads keep pushing messages to the queues
* e.g logging in a loop then no logs can ever be processed.
*
* When the soft limit is reached then this number of events (default 800) will be logged to the
* log files before continuing reading the SPSC queues
*
* The SPSC queues are emptied on each iteration.
* This means that the actual messages from the SPSC queues can be much more
* than the backend_thread_transit_events_soft_limit.
*
* @note This number represents a limit across ALL hot threads
* @note applicable only when backend_thread_use_transit_buffer = true;
*/
size_t backend_thread_transit_events_soft_limit = 800;

/**
* The backend worker thread gives priority to reading the messages from SPSC queues from all
* the hot threads first and buffers them temporarily.
*
* However if the hot threads keep pushing messages to the queues
* e.g logging in a loop then no logs can ever be processed.
*
* This variable sets the maximum transit events number.
* When that number is reached then half of them will get flushed to the log files before
* continuing reading the SPSC queues
* As the backend thread is buffering messages it can keep buffering for ever if the hot
* threads keep pushing.
*
* This limit is the maximum size of the backend thread buffer. When reached the backend worker
* thread will stop reading the SPSC queues until the buffer has space again.
*
* @note This is limit PER hot thread
* @note applicable only when backend_thread_use_transit_buffer = true;
*/
size_t backend_thread_max_transit_events = 800;
size_t backend_thread_transit_events_hard_limit = 100'000;

/**
* The backend worker thread pops all the SPSC queues log messages and buffers them to a local
* ring buffer queue as transit events. The transit_event_buffer is unbounded. The initial
* capacity of the buffer is customisable. Each newly spawned hot thread will have his own
* transit_event_buffer. This capacity is not in bytes but in items.
* It must be a power of two.
* @note applicable only when backend_thread_use_transit_buffer = true;
*/
uint32_t backend_thread_initial_transit_event_buffer_capacity = 64;

Expand All @@ -77,6 +111,7 @@ struct Config
* They are checked again at the next iteration. Messages are checked on microsecond precision.
*
* Enabling this option might delaying popping messages from the SPSC queues.
* @note applicable only when backend_thread_use_transit_buffer = true;
*/
bool backend_thread_strict_log_timestamp_order = true;

Expand Down
35 changes: 24 additions & 11 deletions quill/include/quill/Logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#pragma once

#include "quill/detail/misc/Common.h"

#include "quill/Fmt.h"
#include "quill/LogLevel.h"
#include "quill/QuillError.h"
Expand All @@ -13,7 +15,6 @@
#include "quill/detail/Serialize.h"
#include "quill/detail/ThreadContext.h"
#include "quill/detail/ThreadContextCollection.h"
#include "quill/detail/misc/Common.h"
#include "quill/detail/misc/Rdtsc.h"
#include "quill/detail/misc/TypeTraitsCopyable.h"
#include "quill/detail/misc/Utilities.h"
Expand Down Expand Up @@ -129,7 +130,8 @@ class alignas(detail::CACHE_LINE_ALIGNED) Logger
fmt::detail::check_format_string<std::remove_reference_t<FmtArgs>...>(format_string);
}

detail::ThreadContext* const thread_context = _thread_context_collection.local_thread_context();
detail::ThreadContext* const thread_context =
_thread_context_collection.local_thread_context<QUILL_QUEUE_TYPE>();

// For windows also take wide strings into consideration.
#if defined(_WIN32)
Expand All @@ -147,16 +149,27 @@ class alignas(detail::CACHE_LINE_ALIGNED) Logger
detail::get_args_sizes<0>(c_string_sizes, fmt_args...);

// request this size from the queue
std::byte* write_buffer = thread_context->spsc_queue().prepare_write(static_cast<uint32_t>(total_size));
std::byte* write_buffer =
thread_context->spsc_queue<QUILL_QUEUE_TYPE>().prepare_write(static_cast<uint32_t>(total_size));

#if defined(QUILL_USE_BOUNDED_QUEUE)
if (QUILL_UNLIKELY(write_buffer == nullptr))
if constexpr (QUILL_QUEUE_TYPE == detail::QueueType::BoundedNonBlocking)
{
if (QUILL_UNLIKELY(write_buffer == nullptr))
{
// not enough space to push to queue message is dropped
thread_context->increment_dropped_message_counter();
return;
}
}
else if constexpr (QUILL_QUEUE_TYPE == detail::QueueType::BoundedBlocking)
{
// not enough space to push to queue message is dropped
thread_context->increment_dropped_message_counter();
return;
while (write_buffer == nullptr)
{
// not enough space to push to queue, keep trying
write_buffer =
thread_context->spsc_queue<QUILL_QUEUE_TYPE>().prepare_write(static_cast<uint32_t>(total_size));
}
}
#endif

// we have enough space in this buffer, and we will write to the buffer

Expand All @@ -183,8 +196,8 @@ class alignas(detail::CACHE_LINE_ALIGNED) Logger
"The committed write bytes can not be greater than the requested bytes");
assert((write_buffer >= write_begin) &&
"write_buffer should be greater or equal to write_begin");
thread_context->spsc_queue().finish_write(static_cast<uint32_t>(write_buffer - write_begin));
thread_context->spsc_queue().commit_write();
thread_context->spsc_queue<QUILL_QUEUE_TYPE>().finish_write(static_cast<uint32_t>(write_buffer - write_begin));
thread_context->spsc_queue<QUILL_QUEUE_TYPE>().commit_write();
}

/**
Expand Down
Loading

0 comments on commit 9cb4a4f

Please sign in to comment.