Skip to content

Commit

Permalink
Cache head and tail indices
Browse files Browse the repository at this point in the history
This leads to reduced cache coherency traffic and increased throughput.

Closes #18
  • Loading branch information
rigtorp committed Jul 22, 2021
1 parent 883098d commit 3a0507a
Show file tree
Hide file tree
Showing 6 changed files with 683 additions and 43 deletions.
71 changes: 47 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
[![C/C++ CI](https://github.com/rigtorp/SPSCQueue/workflows/C/C++%20CI/badge.svg)](https://github.com/rigtorp/SPSCQueue/actions)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/rigtorp/SPSCQueue/master/LICENSE)

A single producer single consumer wait-free and lock-free fixed size
queue written in C++11.
A single producer single consumer wait-free and lock-free fixed size queue
written in C++11. This implementation is faster than both
[*boost::lockfree::spsc*](https://www.boost.org/doc/libs/1_76_0/doc/html/boost/lockfree/spsc_queue.html)
and [*folly::ProducerConsumerQueue*](https://github.com/facebook/folly/blob/master/folly/docs/ProducerConsumerQueue.md).

## Example

Expand Down Expand Up @@ -122,16 +124,38 @@ pages on Linux.

## Implementation

![Memory layout](https://github.com/rigtorp/SPSCQueue/blob/master/spsc.png)

The underlying implementation is a
[ring buffer](https://en.wikipedia.org/wiki/Circular_buffer).

Care has been taken to make sure to avoid any issues with
[false sharing](https://en.wikipedia.org/wiki/False_sharing). The head
and tail pointers are aligned and padded to the false sharing range
(cache line size). The slots buffer is padded with
the false sharing range at the beginning and end.
![Memory layout](https://github.com/rigtorp/SPSCQueue/blob/master/spsc.svg)

The underlying implementation is based on a [ring
buffer](https://en.wikipedia.org/wiki/Circular_buffer).

Care has been taken to make sure to avoid any issues with [false
sharing](https://en.wikipedia.org/wiki/False_sharing). The head and tail indices
are aligned and padded to the false sharing range (cache line size).
Additionally the slots buffer is padded with the false sharing range at the
beginning and end, this prevents false sharing with any adjacent allocations.

This implementation has higher throughput than a typical concurrent ring buffer
by locally caching the head and tail indices in the writer and reader
respectively. The caching increases throughput by reducing the amount of cache
coherency traffic.

To understand how that works first consider a read operation in absence of
caching: the head index (read index) needs to be updated and thus that cache
line is loaded into the L1 cache in exclusive state. The tail (write index)
needs to be read in order to check that the queue is not empty and is thus
loaded into the L1 cache in shared state. Since a queue write operation needs to
read the head index it's likely that a write operation requires some cache
coherency traffic to bring the head index cache line back into exclusive state.
In the worst case there will be one cache line transition from shared to
exclusive for every read and write operation.

Next consider a queue reader that caches the tail index: if the cached tail
index indicates that the queue is empty, then load the tail index into the
cached tail index. If the queue was non-empty multiple read operations up until
the cached tail index can complete without stealing the writer's tail index
cache line's exclusive state. Cache coherency traffic is therefore reduced. An
analogous argument can be made for the queue write operation.

This implementation allows for arbitrary non-power of two capacities, instead
allocating a extra queue slot to indicate full queue. If you don't want to waste
Expand All @@ -151,26 +175,25 @@ the implementation:
- A single threaded test that the functionality works as intended,
including that the element constructor and destructor is invoked
correctly.
- A multithreaded fuzz test that all elements are enqueued and
- A multi-threaded fuzz test that all elements are enqueued and
dequeued correctly under heavy contention.

## Benchmarks

Throughput benchmark measures throughput between 2 threads for a
`SPSCQueue<int>` of size 256.
Throughput benchmark measures throughput between 2 threads for a queue of `int`
elements.

Latency benchmark measures round trip time between 2 threads
communicating using 2 queues of type `SPSCQueue<int>`.
Latency benchmark measures round trip time between 2 threads communicating using
2 queues of `int` elements.

Benchmark results for a AMD Ryzen 9 3900X 12-Core Processor, the 2 threads are
running on different cores on the same chiplet:

The following numbers are for a 2 socket machine with 2 x Intel(R)
Xeon(R) CPU E5-2620 0 @ 2.00GHz.

| NUMA Node / Core / Hyper-Thread | Throughput (ops/ms) | Latency RTT (ns) |
| ------------------------------- | ------------------: | ---------------: |
| #0,#0,#0 & #0,#0,#1 | 63942 | 60 |
| #0,#0,#0 & #0,#1,#0 | 37739 | 238 |
| #0,#0,#0 & #1,#0,#0 | 25744 | 768 |

| SPSCQueue | 362723 | 133 |
| boost::lockfree::spsc | 209877 | 222 |
| folly::ProducerConsumerQueue | 148818 | 147 |
## Cited by

SPSCQueue have been cited by the following papers:
Expand Down
42 changes: 27 additions & 15 deletions include/rigtorp/SPSCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ template <typename T, typename Allocator = std::allocator<T>> class SPSCQueue {
public:
explicit SPSCQueue(const size_t capacity,
const Allocator &allocator = Allocator())
: capacity_(capacity), allocator_(allocator), writeIdx_(0), readIdx_(0) {
: capacity_(capacity), allocator_(allocator) {
// The queue needs at least one element
if (capacity_ < 1) {
capacity_ = 1;
Expand Down Expand Up @@ -93,7 +93,7 @@ template <typename T, typename Allocator = std::allocator<T>> class SPSCQueue {
SPSCQueue &operator=(const SPSCQueue &) = delete;

template <typename... Args>
void emplace(Args &&... args) noexcept(
void emplace(Args &&...args) noexcept(
std::is_nothrow_constructible<T, Args &&...>::value) {
static_assert(std::is_constructible<T, Args &&...>::value,
"T must be constructible with Args&&...");
Expand All @@ -102,14 +102,15 @@ template <typename T, typename Allocator = std::allocator<T>> class SPSCQueue {
if (nextWriteIdx == capacity_) {
nextWriteIdx = 0;
}
while (nextWriteIdx == readIdx_.load(std::memory_order_acquire))
;
while (nextWriteIdx == readIdxCache_) {
readIdxCache_ = readIdx_.load(std::memory_order_acquire);
}
new (&slots_[writeIdx + kPadding]) T(std::forward<Args>(args)...);
writeIdx_.store(nextWriteIdx, std::memory_order_release);
}

template <typename... Args>
bool try_emplace(Args &&... args) noexcept(
bool try_emplace(Args &&...args) noexcept(
std::is_nothrow_constructible<T, Args &&...>::value) {
static_assert(std::is_constructible<T, Args &&...>::value,
"T must be constructible with Args&&...");
Expand All @@ -118,8 +119,11 @@ template <typename T, typename Allocator = std::allocator<T>> class SPSCQueue {
if (nextWriteIdx == capacity_) {
nextWriteIdx = 0;
}
if (nextWriteIdx == readIdx_.load(std::memory_order_acquire)) {
return false;
if (nextWriteIdx == readIdxCache_) {
readIdxCache_ = readIdx_.load(std::memory_order_acquire);
if (nextWriteIdx == readIdxCache_) {
return false;
}
}
new (&slots_[writeIdx + kPadding]) T(std::forward<Args>(args)...);
writeIdx_.store(nextWriteIdx, std::memory_order_release);
Expand Down Expand Up @@ -153,8 +157,11 @@ template <typename T, typename Allocator = std::allocator<T>> class SPSCQueue {

T *front() noexcept {
auto const readIdx = readIdx_.load(std::memory_order_relaxed);
if (writeIdx_.load(std::memory_order_acquire) == readIdx) {
return nullptr;
if (readIdx == writeIdxCache_) {
writeIdxCache_ = writeIdx_.load(std::memory_order_acquire);
if (writeIdxCache_ == readIdx) {
return nullptr;
}
}
return &slots_[readIdx + kPadding];
}
Expand Down Expand Up @@ -205,11 +212,16 @@ template <typename T, typename Allocator = std::allocator<T>> class SPSCQueue {
Allocator allocator_;
#endif

// Align to avoid false sharing between writeIdx_ and readIdx_
alignas(kCacheLineSize) std::atomic<size_t> writeIdx_;
alignas(kCacheLineSize) std::atomic<size_t> readIdx_;

// Padding to avoid adjacent allocations to share cache line with readIdx_
char padding_[kCacheLineSize - sizeof(readIdx_)];
// Align to cache line size in order to avoid false sharing
// readIdxCache_ and writeIdxCache_ is used to reduce the amount of cache
// coherency traffic
alignas(kCacheLineSize) std::atomic<size_t> writeIdx_ = {0};
alignas(kCacheLineSize) size_t readIdxCache_ = 0;
alignas(kCacheLineSize) std::atomic<size_t> readIdx_ = {0};
alignas(kCacheLineSize) size_t writeIdxCache_ = 0;

// Padding to avoid adjacent allocations to share cache line with
// writeIdxCache_
char padding_[kCacheLineSize - sizeof(writeIdxCache_)];
};
} // namespace rigtorp
Binary file modified spsc.odg
Binary file not shown.
Binary file removed spsc.png
Binary file not shown.
Loading

0 comments on commit 3a0507a

Please sign in to comment.