A multithreading class model build around a lock-free circular buffered queue.
This repository has asolution to the 3-thread consensus, implemented as a MRMW (multi-read/multi-write) circular buffer. In the context of a multi-threaded producer/consumer testcase.
For the sake of simplicity, in this docs I'm using a simplified JavaScript pseudocode, familiar for anyone using C-like languages. For the real thing, refer to source code.
class Queue(size) {
data = Array(size);
in = 0, out = 0
mask = size-1
push(item) // send the item & return the id of the job
pop() // get the next item.
}
data
Is a common array (non atomic). Updated without locking. Secured by in
& out
.
size
must be a power of 2 (1, 2, 4, 8, 16...) The buffer will be indexed by binary mask (data[t & mask]
) limiting the memory access inside. At minimum, a single bit.
in
Holds a the ID of the next element. Always ahead or equal out
.
out
Is the ID of the last element (next to be removed). Never greater than in
Both are atomic variables, always incremented.
No boundary check is needed, because mask
and the way integer overflow happens.
Here, the heart of the solution, where we solve the 3-thread consensus.
push(item)
{
let i
do {
i = in // get a local copy the next id
while (i - out == size) sleep() // if full, wait for space
} while ( (data[i & mask]) || (CompareAndSwap(in, i+1, i) != i) )
// if the seat is lost or CAS failed, try again.
data[i & mask] = item // now is safe to update the buffer using local i
return i // id of the job
}
-
If the thread is preempted at any point between
i = in
andCompareAndSwap(input, i+1, i)
, on return the CAS will fail and the loop go to the next seat. Without any kind of locking. -
I think that
(data[i & mask]) ||
should not be really neeed, but my computer hangs without it. And it prevents the use of the expensive CAS instruction. -
Here I check for nullability of the content. But in the C++ implementation it was replaced by an isFree boolean array.
pop()
{
let o
do {
o = out // id of the next candidate
while (o == in) sleep() // if empty, wait for items
} while ( !(data[o & mask]) || CompareAndSwap(out, o+1, o) != o )
// if the candidate is gone or CAS failed, try again.
o &= mask // round to fit the buffer
int r = data[o] // save the return
buffer[o] = 0 // release the seat
return r
}
Both methods have two nested while()
loops:
-
First we get the next seat/candidates with
i = in
ando = out
-
Then we check if the buffer is full or empty,
sleep()
ing until state change. -
If the seat/candidate is available, try increment the atomic register
CompareAndSwap(out, o+1, o)
. -
If the CAS fail (
!=o
), go to the next seat.
-
The load operation used by
data[h] = 0
and!(data[o & mask])
are naturaly atomic. -
The last thing done by
pop()
is release the seat. -
The flow happens without any kind of lock.
If in == out
the Queue is empty,
If in-out == size
the Queue is full.
In these cases the queue do not lock, but make a voluntary preemption calling the sleep()
function.
Now, with our Queue defined, its time to put it on fire...
Lets create two groups of threads: one producing and another consuming data.
The first type of thread is the producer
, it will put a bunch of numbers into the queue Q
.
producer()
{
for(let i=1; i <= N; i++)
Q.push(i)
Q.push(-1) // end of job
Total += N
log("Produced:", N)
}
Here we push -1 into the queue to signal the termination of the job. Also can be implemented with external flags.
Now, the work of the consumer
is remove elements out of the queue, until receiving a termination.
consumer()
{
let value, sum = 0
do {
value = Q.pop()
sum += value
} while( value != -1 )
Total -= sum
log("Consumed:", sum)
}
Producers increment Total
and consumers decrement it. At the end, it must be zero, proofing that there is no leaks.
Lets start our threads and check if what we get out the queue its equal to what we pushed into.
let producers = []
, consumers = []
for(int i=0; i < 4; i++)
{
producers[i] = Thread(producer)
consumers[i] = Thread(consumer)
}
wait(producers, consumers)
log("Total: %d", Total)
- Here we create 8 threads to flow data. For sake of simplicity, with the same number of producers and consumers, but it works equally in asymmetric conditions.
- I've tested with at most 512 threads on an old Windows XP machine, (where I got the better performace).
This is the output I got from the C++ implementation, flowing 10M items:
Creating 4 producers & 4 consumers
Flowing 10.000.000 items trough the queue.
Produced: 2.500.000
Consumed: 1.349.413
Produced: 2.500.000
Consumed: 1.359.293
Consumed: 3.876.642
Produced: 2.500.000
Consumed: 3.414.652
Produced: 2.500.000
Total: 0
real 0m0,581s
Note that producers always pushed the same amount of items (2.5M), but consumers get different quantities, ranging from 1.3 to 3.8M. This is the expected behaviour.
Follow our expected Total:0
proofing that all produced was consumed.
Then the time took by the operation: 581 ms. A throughput of 17.2 M flow/s. (Enough for an old Dell M6300 Core duo).
I made a series of benchmarks, varying the buffer size and the number of threads. Follow the results.
With default buffer size (64), varying the number of threads:
- The cost per operation with 2 threads is the same for 512.
Code released under GNU 3.0 and docs under Creative Commons (CC BY-SA 3.0) licenses.