Skip to content

Commit

Permalink
add more Unbounded Queue Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
odygrd committed Nov 11, 2024
1 parent 21acf4b commit e9c6825
Showing 1 changed file with 104 additions and 6 deletions.
110 changes: 104 additions & 6 deletions test/unit_tests/UnboundedQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,60 @@ TEST_CASE("unbounded_queue_read_write_multithreaded_plain_ints")
auto* write_buffer = buffer.prepare_write<quill::QueueType::UnboundedBlocking>(sizeof(uint32_t));
#endif

while (!write_buffer)
REQUIRE(write_buffer);

std::memcpy(write_buffer, &i, sizeof(uint32_t));
buffer.finish_write(sizeof(uint32_t));
buffer.commit_write();
}
}
});

std::thread consumer_thread(
[&buffer]()
{
for (uint32_t wrap_cnt = 0; wrap_cnt < 20; ++wrap_cnt)
{
for (uint32_t i = 0; i < 8192; ++i)
{
auto read_result = buffer.prepare_read();
while (!read_result.read_pos)
{
std::this_thread::sleep_for(std::chrono::microseconds{2});
read_result = buffer.prepare_read();
}

auto const value = reinterpret_cast<uint32_t const*>(read_result.read_pos);
REQUIRE_EQ(*value, i);
buffer.finish_read(sizeof(uint32_t));
buffer.commit_read();
}
}
});

producer_thread.join();
consumer_thread.join();
REQUIRE(buffer.empty());
}

TEST_CASE("unbounded_queue_read_write_multithreaded_plain_ints_produce_fully")
{
UnboundedSPSCQueue buffer{1024};

std::thread producer_thread(
[&buffer]()
{
for (uint32_t wrap_cnt = 0; wrap_cnt < 20; ++wrap_cnt)
{
for (uint32_t i = 0; i < 8192; ++i)
{
#if defined(_MSC_VER)
write_buffer = buffer.prepare_write(sizeof(uint32_t), quill::QueueType::UnboundedBlocking);
auto* write_buffer = buffer.prepare_write(sizeof(uint32_t), quill::QueueType::UnboundedBlocking);
#else
write_buffer = buffer.prepare_write<quill::QueueType::UnboundedBlocking>(sizeof(uint32_t));
auto* write_buffer = buffer.prepare_write<quill::QueueType::UnboundedBlocking>(sizeof(uint32_t));
#endif
}

REQUIRE(write_buffer);

std::memcpy(write_buffer, &i, sizeof(uint32_t));
buffer.finish_write(sizeof(uint32_t));
Expand All @@ -43,8 +88,8 @@ TEST_CASE("unbounded_queue_read_write_multithreaded_plain_ints")
}
});

// Delay creating the consumer thread
std::this_thread::sleep_for(std::chrono::milliseconds{300});
// Produce everything before starting the consumer
producer_thread.join();

std::thread consumer_thread(
[&buffer]()
Expand All @@ -68,6 +113,59 @@ TEST_CASE("unbounded_queue_read_write_multithreaded_plain_ints")
}
});

consumer_thread.join();
REQUIRE(buffer.empty());
}

TEST_CASE("unbounded_queue_read_write_multithreaded_plain_ints_consume_quickly")
{
UnboundedSPSCQueue buffer{1024};

// Start the consumer first
std::thread consumer_thread(
[&buffer]()
{
for (uint32_t wrap_cnt = 0; wrap_cnt < 20; ++wrap_cnt)
{
for (uint32_t i = 0; i < 8192; ++i)
{
auto read_result = buffer.prepare_read();
while (!read_result.read_pos)
{
std::this_thread::sleep_for(std::chrono::nanoseconds{100});
read_result = buffer.prepare_read();
}

auto const value = reinterpret_cast<uint32_t const*>(read_result.read_pos);
REQUIRE_EQ(*value, i);
buffer.finish_read(sizeof(uint32_t));
buffer.commit_read();
}
}
});

std::thread producer_thread(
[&buffer]()
{
for (uint32_t wrap_cnt = 0; wrap_cnt < 20; ++wrap_cnt)
{
for (uint32_t i = 0; i < 8192; ++i)
{
#if defined(_MSC_VER)
auto* write_buffer = buffer.prepare_write(sizeof(uint32_t), quill::QueueType::UnboundedBlocking);
#else
auto* write_buffer = buffer.prepare_write<quill::QueueType::UnboundedBlocking>(sizeof(uint32_t));
#endif

REQUIRE(write_buffer);

std::memcpy(write_buffer, &i, sizeof(uint32_t));
buffer.finish_write(sizeof(uint32_t));
buffer.commit_write();
}
}
});

producer_thread.join();
consumer_thread.join();
REQUIRE(buffer.empty());
Expand Down

0 comments on commit e9c6825

Please sign in to comment.