Skip to content

platonic-io/hs-disruptor

Repository files navigation

hs-disruptor

hs-disruptor is a Haskell port of the LMAX Disruptor, which is a high performance inter-thread messaging library. The developers at LMAX, which operates a financial exchange, reported in 2010 that they could process more than 100,000 transactions per second at less than 1 millisecond latency.

At its core it's just a lock-free concurrent queue, but it also provides building blocks for achieving several useful concurrent programming tasks that typical queues don't (or at least don't make obvious how to do). The extra features include:

  • Multi-cast (many consumers can in parallel process the same event);
  • Batching (both on producer and consumer side);
  • Back-pressure;
  • Sharding for scalability;
  • Dependencies between consumers.

It's also performs better than most queues, as we shall see further down.

Example

import Control.Concurrent
import Control.Concurrent.Async
import Disruptor.SP

main :: IO ()
main = do

  -- Create the shared ring buffer.
  let bufferCapacity = 128
  rb <- newRingBuffer bufferCapacity

  -- The producer keeps a counter and produces events that are merely the pretty
  -- printed value as a string of that counter.
  let produce :: Int -> IO (String, Int)
      produce n = return (show n, n + 1)

      -- The counter starts at zero.
      initialProducerState = 0

      -- No back-pressure is applied in this example.
      backPressure :: Int -> IO ()
      backPressure _ = return ()

  producer <- newEventProducer rb produce backPressure initialProducerState

  -- The consumer merely prints the string event to the terminal.
  let consume :: () -> String -> SequenceNumber -> EndOfBatch -> IO ()
      consume () event snr endOfBatch =
        putStrLn (event ++ if endOfBatch then " (end of batch)" else "")

      -- The consumer doesn't need any state in this example.
      initialConsumerState = ()

      -- Which other consumers do we need to wait for before consuming an event?
      dependencies = []

      -- What to do in case there are no events to consume?
      waitStrategy = Sleep 1

  consumer <- newEventConsumer rb consume initialConsumerState dependencies waitStrategy

  -- Tell the ring buffer which the last consumer is, to avoid overwriting
  -- events that haven't been consumed yet.
  setGatingSequences rb [ecSequenceNumber consumer]

  withEventProducer producer $ \ap ->
    withEventConsumer consumer $ \ac -> do
      threadDelay (3 * 1000 * 1000) -- 3 sec
      cancel ap
      cancel ac

You can run the above example with cabal run readme-example.

A couple of things we could change to highlight the features we mentioned in the above section:

  1. Add a second consumer that saves the event to disk, this consumer would be slower than the current one which logs to the terminal, but we could use buffer up events in memory and only actually write when the end of batch flag is set to speed things up;

  2. We could also shard depending on the sequence number, e.g. have two slower consumers that write to disk and have one of them handle even sequence numbers while the other handles odd ones;

  3. The above producer writes one event at the time to the ring buffer, but since we know at which sequence number the last consumer is at we can easily make writes in batches as well;

  4. Currently the producer doesn't apply any back-pressure when the ring buffer is full, in a more realistic example where the producer would, for example, create events from requests made to a http server we could use back-pressure to tell the http server to return status code 429 (too many requests);

  5. If we have one consumer that writes to the terminal and another one that concurrently writes to disk, we could add a third consumer that does something with the event only if it has both been logged and stored to disk (i.e. the third consumer depends on both the first and the second).

How it works

The ring buffer is implemented using a bounded array, it keeps track of a monotonically increasing sequence number and it knows its the capacity of the array, so to find out where to write the next value by simply taking the modulus of the sequence number and the capacity. This has several advantages over traditional queues:

  1. We never remove elements when dequeing, merely overwrite them once we gone all way around the ring. This removes write contention between the producer and the consumer, one could also imagine avoiding garbage collection by only allocating memory the first time around the ring (but we don't do this in Haskell);

  2. Using an array rather than linked list increasing striding due to spatial locality.

The ring buffer also keeps track of up to which sequence number its last consumer has consumed, in order to not overwrite events that haven't been handled yet.

This also means that producers can ask how much capacity left a ring buffer has, and do batched writes. If there's no capacity left the producer can apply back-pressure upstream as appropriate.

Consumers need keep track of which sequence number they have processed, in order to avoid having the ring buffer overwrite unprocessed events as already mentioned, but this also allows consumers to depend on each other.

When a consumer is done processing an event, it asks the ring buffer for the event at its next sequence number, the ring buffer then replies that either there are no new events, in which case the consumer applies it wait strategy, or the ring buffer can reply that there are new events, the consumer the handles each one in turn and the last one will be have the end of batch flag set, so that the consumer can effectively batch the processing.

Performance

hs-disruptor, which hasn't been optimised much yet, is about 2x slower than LMAX's Java version on their single-producer single-consumer benchmark (1P1C) (basically the above example) on a ~2 years old Linux laptop.

The same benchmark compared to other Haskell libraries:

In the triple-producer single-consumer (3P1C) benchmark, the Java version is 5x slower than the Java 1P1C case. And our 3P1C is 4.6x slower than our 1P1C version and our 3P1C version is 2.7x slower than the Java version.

The same benchmark compared to other Haskell libraries:

For a slightly more "real world" example, we modified the 3P1C test to have three producers that log messages while the consumer writes them to a log file and compared it to fast-logger. The hs-disruptor benchmark has a throughput of 3:4 that of fast-logger. When we bump it to ten concurrently logging threads the hs-disruptor benchmark has a throughput of 10:7 that of fast-logger.

See the file benchmark.sh for full details about how the benchmarks are run.

As always take benchmarks with a grain of salt, we've tried to make them as fair with respect to each other and as true to the original Java versions as possible. If you see anything that seems unfair, or if you get very different results when trying to reproduce the numbers, then please file an issue.

Contributing

Any kind of improvements are most welcome! Please consider first opening an issue before embarking on any big changes though.

See also

Presentations

Writings

License

See the file LICENSE.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published