Skip to content
/ uniq Public

A lock-free (multi reader / multi writer) circular buffered queue.

License

Notifications You must be signed in to change notification settings

bittnkr/uniq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

UniQ Multithreading Libray

A multithreading class model build around a lock-free circular buffered queue.

A lock-free circular buffered queue

This repository has asolution to the 3-thread consensus, implemented as a MRMW (multi-read/multi-write) circular buffer. In the context of a multi-threaded producer/consumer testcase.

For the sake of simplicity, in this docs I'm using a simplified JavaScript pseudocode, familiar for anyone using C-like languages. For the real thing, refer to source code.

The Queue object

class Queue(size) {
  data = Array(size); 
  in = 0, out = 0
  mask = size-1
   
  push(item) // send the item & return the id of the job 
  pop() // get the next item.
}

Properties

data Is a common array (non atomic). Updated without locking. Secured by in & out.

size must be a power of 2 (1, 2, 4, 8, 16...) The buffer will be indexed by binary mask (data[t & mask]) limiting the memory access inside. At minimum, a single bit.

in Holds a the ID of the next element. Always ahead or equal out.

out Is the ID of the last element (next to be removed). Never greater than in

Both are atomic variables, always incremented.

No boundary check is needed, because mask and the way integer overflow happens.

Methods

Here, the heart of the solution, where we solve the 3-thread consensus.

Inserting data into the queue. The push() operation.

push(item) 
{
  let i
  do {
    i = in // get a local copy the next id

    while (i - out == size) sleep() // if full, wait for space
  
  } while ( (data[i & mask]) || (CompareAndSwap(in, i+1, i) != i) )
  // if the seat is lost or CAS failed, try again. 

  data[i & mask] = item // now is safe to update the buffer using local i
  return i // id of the job 
}
  • If the thread is preempted at any point between i = in and CompareAndSwap(input, i+1, i), on return the CAS will fail and the loop go to the next seat. Without any kind of locking.

  • I think that (data[i & mask]) || should not be really neeed, but my computer hangs without it. And it prevents the use of the expensive CAS instruction.

  • Here I check for nullability of the content. But in the C++ implementation it was replaced by an isFree boolean array.

Removing data from the queue. The pop() operation

pop() 
{
  let o
  do {
    o = out // id of the next candidate

    while (o == in) sleep() // if empty, wait for items

  } while ( !(data[o & mask]) || CompareAndSwap(out, o+1, o) != o )
  // if the candidate is gone or CAS failed, try again. 

  o &= mask // round to fit the buffer
  int r = data[o] // save the return
  buffer[o] = 0 // release the seat
  return r
}

Both methods have two nested while() loops:

  • First we get the next seat/candidates with i = in and o = out

  • Then we check if the buffer is full or empty, sleep()ing until state change.

  • If the seat/candidate is available, try increment the atomic register CompareAndSwap(out, o+1, o).

  • If the CAS fail (!=o), go to the next seat.

Notes

  • The load operation used by data[h] = 0 and !(data[o & mask]) are naturaly atomic.

  • The last thing done by pop() is release the seat.

  • The flow happens without any kind of lock.

States

If in == out the Queue is empty,

If in-out == size the Queue is full.

In these cases the queue do not lock, but make a voluntary preemption calling the sleep() function.

Testing - The producer/consumer pattern

Now, with our Queue defined, its time to put it on fire...

Lets create two groups of threads: one producing and another consuming data.

The first type of thread is the producer, it will put a bunch of numbers into the queue Q.

producer() 
{
  for(let i=1; i <= N; i++)
    Q.push(i) 
    
  Q.push(-1) // end of job

  Total += N
  log("Produced:", N)
}

Here we push -1 into the queue to signal the termination of the job. Also can be implemented with external flags.

Now, the work of the consumer is remove elements out of the queue, until receiving a termination.

consumer()
{
  let value, sum = 0

  do { 
    value = Q.pop()
    sum += value
  } while( value != -1 ) 

  Total -= sum 
  log("Consumed:", sum)
}

Producers increment Total and consumers decrement it. At the end, it must be zero, proofing that there is no leaks.

Running the horses

Lets start our threads and check if what we get out the queue its equal to what we pushed into.

let producers = []
  , consumers = []

for(int i=0; i < 4; i++)
{
  producers[i] = Thread(producer)
  consumers[i] = Thread(consumer)
}

wait(producers, consumers)

log("Total: %d", Total)
  • Here we create 8 threads to flow data. For sake of simplicity, with the same number of producers and consumers, but it works equally in asymmetric conditions.
  • I've tested with at most 512 threads on an old Windows XP machine, (where I got the better performace).

This is the output I got from the C++ implementation, flowing 10M items:

Creating 4 producers & 4 consumers
Flowing 10.000.000 items trough the queue.

Produced: 2.500.000
Consumed: 1.349.413
Produced: 2.500.000
Consumed: 1.359.293
Consumed: 3.876.642
Produced: 2.500.000
Consumed: 3.414.652
Produced: 2.500.000

Total: 0

real    0m0,581s

Note that producers always pushed the same amount of items (2.5M), but consumers get different quantities, ranging from 1.3 to 3.8M. This is the expected behaviour.

Follow our expected Total:0 proofing that all produced was consumed.

Then the time took by the operation: 581 ms. A throughput of 17.2 M flow/s. (Enough for an old Dell M6300 Core duo).

I made a series of benchmarks, varying the buffer size and the number of threads. Follow the results.

Benchmarks - Measuring the flow

Throughput x Buffer Size

With default buffer size (64), varying the number of threads:

Throughput x Threads

  • The cost per operation with 2 threads is the same for 512.

Code released under GNU 3.0 and docs under Creative Commons (CC BY-SA 3.0) licenses.

About

A lock-free (multi reader / multi writer) circular buffered queue.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published