Skip to content

Latest commit

 

History

History
463 lines (326 loc) · 38.2 KB

2017-07-24.md

File metadata and controls

463 lines (326 loc) · 38.2 KB

Throughput and Latency in Differential Dataflow

In this post we'll talk through a fairly easy use of differential dataflow that distills down some of the trade-offs that I like exploring. In this case, this is going to be the trade-off between throughput and latency, and how there can be some non-intuitive interplay between the two.

Let's start with a review of differential dataflow from 10,000 feet.

Differential dataflow

So differential dataflow is a computational framework I've been working on for a while, and it has the look and feel of something SQL-ish, something MapReduce-ish, and something a bit more than either of these. The idea is that you write a program in a high-level "declarative" way, and the system will then execute it for you in a distributed fashion, across as many core and processes as you let it.

For example, imagine you have a pile of pairs of integers, let's call them (src, dst) suggesting that maybe these are edges in a graph. You'd like to get a handle on the distribution of the number of out-going edges from each source, which we can write by counting the number of times each src appears, and then counting the number of times each count appears in the result:

edges
    .map(|(src, dst)| src)
    .count()
    .map(|(src, cnt)| cnt)
    .count()

All that we've done here, starting from a hypothetical collection edges of our (src, dst) edge pairs, is to extract the source field and then count the resulting records, and from the results extract the counts (out-degrees) and count each of them up. The result should be a bunch of pairs (degree, count) telling us how many times each degree is observed in the graph.

You can go and run this! It is essentially the degrees.rs example in the differential dataflow repository. The program takes as parameters numbers of nodes, numbers of edges, and another number that let's leave as zero for now for pedagogical reasons.

Echidnatron% cargo run --release --example degrees -- 1000000 5000000 0 inspect
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
    Running `target/release/examples/degrees 1000000 5000000 0 inspect`
observed: ((1, 33690), (Root, 0), 1)
observed: ((2, 84287), (Root, 0), 1)
observed: ((3, 140266), (Root, 0), 1)
observed: ((4, 175693), (Root, 0), 1)
observed: ((5, 174643), (Root, 0), 1)
observed: ((6, 146679), (Root, 0), 1)
observed: ((7, 104567), (Root, 0), 1)
observed: ((8, 65263), (Root, 0), 1)
observed: ((9, 36276), (Root, 0), 1)
observed: ((10, 18223), (Root, 0), 1)
observed: ((11, 8239), (Root, 0), 1)
observed: ((12, 3390), (Root, 0), 1)
observed: ((13, 1267), (Root, 0), 1)
observed: ((14, 484), (Root, 0), 1)
observed: ((15, 162), (Root, 0), 1)
observed: ((16, 46), (Root, 0), 1)
observed: ((17, 14), (Root, 0), 1)
observed: ((18, 4), (Root, 0), 1)
observed: ((19, 2), (Root, 0), 1)
observed: ((20, 1), (Root, 0), 1)
round 0 finished after Duration { secs: 1, nanos: 120217518 } (loading)
Echidnatron%

About 1.2 seconds, to produce a bunch of records of the form (degree, count) which are pretty plausible. With one million nodes and five million edges we expect on average five out-going edges, and that is roughly what we see.

We've just processed five million edges in a little over a second, which is plausibly decent throughput, and who really knows about the latency. I mean, one second is great for some queries, and pretty horrible for other queries. Maybe let's increase the number of nodes and edges by a factor of ten, which gives us a report that look like (minus printing the results to the screen):

Echidnatron% cargo run --release --example degrees -- 10000000 50000000 0        
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
    Running `target/release/examples/degrees 10000000 50000000 0`
round 0 finished after Duration { secs: 12, nanos: 315594487 } (loading)
Echidnatron%

Now we are doing 50 million edges in twelve-ish seconds, which is just over four million edges per second throughput, but the latency is now up at twelve seconds which is not anything we are meant to be impressed by. As your pile of input data get larger and larger, the amount of time you should expect a computation to take should only increase, and your delight at watching largely the same computation run over and over should decrease proportionately.

Interactivity

Differential dataflow distinguishes itself from other compute platforms by allowing the user to change the inputs, even once the computation is up and running. The system responds as quickly as it can with the corresponding changes in the outputs, exactly the difference between whatever the answer should be and whatever it showed you just before.

That parameter we left as a zero in degrees.rs was the parameter that controlled the number of changes to introduce in each round, where the program is bright enough to just exit if you've indicated zero changes. Putting a larger number there, for example one, will see the program introduce batch after batch of that many changes to the underlying collection.

With relatively few changes, one might reasonably expect that we don't need to take the full twelve seconds each time we update the input. We could, but maybe we should improve our expectations. Let's do that now, performing just one change at a time:

Echidnatron% cargo run --release --example degrees -- 10000000 50000000 1 | head
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
    Running `target/release/examples/degrees 10000000 50000000 1`
round 0 finished after Duration { secs: 11, nanos: 943622768 } (loading)
round 1 finished after Duration { secs: 0, nanos: 76203 }
round 2 finished after Duration { secs: 0, nanos: 68693 }
round 3 finished after Duration { secs: 0, nanos: 59410 }
round 4 finished after Duration { secs: 0, nanos: 45207 }
round 5 finished after Duration { secs: 0, nanos: 45090 }
...

I piped the command to head because otherwise it reaches round 20,000 before I can stop the process, and it becomes hard to read the output.

What do we see here, up in the meaningful part of the data? Latencies that look like tens of microseconds, and generally less than one hundred microseconds (in the first 100 rounds, the largest latency was 83 microseconds). We put a single change in, and tens of microseconds later we get the answer back. That is interactive!

Actually, we didn't really see the answer, so let's take a moment to look at that. I'm snipping out the initial report of all the degrees, and just showing the output for the changes.

Echidnatron% cargo run --release --example degrees -- 10000000 50000000 1 inspect | head
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
    Running `target/release/examples/degrees 10000000 50000000 1 inspect`
...
round 0 finished after Duration { secs: 12, nanos: 74729211 } (loading)
observed: ((5, 1757099), (Root, 1), -1)
observed: ((5, 1757098), (Root, 1), 1)
observed: ((6, 1459805), (Root, 1), -1)
observed: ((6, 1459807), (Root, 1), 1)
observed: ((7, 1042894), (Root, 1), -1)
observed: ((7, 1042893), (Root, 1), 1)
round 1 finished after Duration { secs: 0, nanos: 134114 }
...

Notice that this took a little longer, which I attribute to actually showing us the output; microseconds are pretty small things. But also notice that with one change to the input (one edge added, one edge removed) we have six changes in the outputs. Three of the degree counts have changed, apparently two nodes of degrees five and seven now have degree six. So while we started with just one change, we've actually processed a bit more than that.

Nonetheless, when not a lot changes in your computation, you shouldn't have to do very much work. As your input data change, you should expect low-latency responses to your standing computations. At least, if your computations were expressed in differential dataflow you should expect this. It's one reason I like to try to express things in differential dataflow.

What about throughput now, though? Let's approximate and say it takes us about 50 microseconds for each of these rounds. That turns into .. calculates .. about 20,000 updates per second. This is a lot less than four million edges per second when we were just starting out the computation, so we might actually have something to look in to here.

Trade-offs

We saw high throughput when we had no particular expectations about latency, and we saw low throughput when we started to perform work that we expected to return quickly. What was the distinction? How do we control the trade-off, and maybe more interesting (to me) should we control the trade-off?

In differential dataflow the trade-off is managed by the amount of work you offer to the system before requesting that it actually come back with some answers. In our degrees.rs example, that third parameter controls how many updates we present before letting the system get to work. The more updates we present, the more efficiently we can retire the batch of them. If we just present one update at a time the system has much less flexibility in how it evaluates the changes.

Crucially, we aren't talking about returning fewer answers, or applying lots of changes simultaneously. We do want to see the same output as if we had applied the input updates one at a time, but we may have the flexibility to work on multiple input updates concurrently.

Let's take a peek at the code in degrees.rs that feeds the computation with changes:

if batch > 0 {
    for wave in 1 .. {
        for round in 0 .. batch {
            input.advance_to(((wave * batch) + round) * peers + index);
            input.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
            input.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));                    
        }

        input.advance_to((wave + 1) * batch * peers);
        input.flush();

        let timer = ::std::time::Instant::now();
        worker.step_while(|| probe.less_than(input.time()));

        if index == 0 {
            println!("round {} finished after {:?}", wave * batch * peers, timer.elapsed());
        }
    }
}

For non-zero batch sizes, for wave after wave, we introduce batch many insertions and deletions (drawn from two seeded pseudorandom number generators that ensure we only remove elements we've already inserted). After each batch many insertions and deletions, we flush the input and repeatedly step the worker until it can confirm that all updates have drained from the system.

Importantly, as we increase batch we are offering more work. Notice that by batching we aren't changing the computation or the input to the computation: we still go through the exact same sequence of insertions and deletions. We are just waiting longer before announcing "time to work!".

To get a sense for the trade-off, let's re-run degrees.rs with a few different batch sizes, starting at one and working up in powers of ten.

Echidnatron% cargo run --release --example degrees -- 10000000 50000000 1 | head      
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
    Running `target/release/examples/degrees 10000000 50000000 1`
round 0 finished after Duration { secs: 10, nanos: 878102154 } (loading)
round 1 finished after Duration { secs: 0, nanos: 74635 }
round 2 finished after Duration { secs: 0, nanos: 53723 }
round 3 finished after Duration { secs: 0, nanos: 43759 }
round 4 finished after Duration { secs: 0, nanos: 32573 }
round 5 finished after Duration { secs: 0, nanos: 37064 }

This was the single update experiment, where we actually have two changes introduced into the system and even more produced as we go (resulting in six for the first round, if you recall from above).

Echidnatron% cargo run --release --example degrees -- 10000000 50000000 10 | head
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
    Running `target/release/examples/degrees 10000000 50000000 10`
round 0 finished after Duration { secs: 10, nanos: 938721389 } (loading)
round 10 finished after Duration { secs: 0, nanos: 134215 }
round 20 finished after Duration { secs: 0, nanos: 83958 }
round 30 finished after Duration { secs: 0, nanos: 81475 }
round 40 finished after Duration { secs: 0, nanos: 108566 }
round 50 finished after Duration { secs: 0, nanos: 78710 }

Now we are doing ten updates in each batch, and while the latencies have increased (they should, right? we are doing more work!) they have not increased by ten times. We will want to be more careful measuring the throughput, but eyeballing things, the latencies went up by between two and three times, meaning the throughput is increasing by ten divided by this, which is not far off of a four times increase.

Our latencies are still in the hundred microsecond range, though. Still pretty decent.

Echidnatron% cargo run --release --example degrees -- 10000000 50000000 100 | head
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
    Running `target/release/examples/degrees 10000000 50000000 100`
round 0 finished after Duration { secs: 11, nanos: 33951223 } (loading)
round 100 finished after Duration { secs: 0, nanos: 441516 }
round 200 finished after Duration { secs: 0, nanos: 427961 }
round 300 finished after Duration { secs: 0, nanos: 382548 }
round 400 finished after Duration { secs: 0, nanos: 416123 }
round 500 finished after Duration { secs: 0, nanos: 380637 }

We've now gone up by another factor of ten in volume of updates, and again less than a factor of ten increase in latency. Our throughput has increased again, and if we approximate the rounds as doing 100 updates in 400 microseconds, this is 250,000 updates per second.

Echidnatron% cargo run --release --example degrees -- 10000000 50000000 1000 | head
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
    Running `target/release/examples/degrees 10000000 50000000 1000`
round 0 finished after Duration { secs: 10, nanos: 636939074 } (loading)
round 1000 finished after Duration { secs: 0, nanos: 2829445 }
round 2000 finished after Duration { secs: 0, nanos: 2748813 }
round 3000 finished after Duration { secs: 0, nanos: 2621052 }
round 4000 finished after Duration { secs: 0, nanos: 2954105 }
round 5000 finished after Duration { secs: 0, nanos: 2437152 }

Another factor of ten and we are now doing batches of one thousand changes in about three milliseconds, corresponding to nearly 400,000 updates per second.

We can vary the batch size a lot, but let's take it all the way up to a crazy level, just to make a point. Let's do batches of 50 million updates, enough to erase the original graph completely.

Echidnatron% cargo run --release --example degrees -- 10000000 50000000 50000000 | head
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
    Running `target/release/examples/degrees 10000000 50000000 50000000`
round 0 finished after Duration { secs: 10, nanos: 716818104 } (loading)
round 50000000 finished after Duration { secs: 122, nanos: 115270708 }
round 100000000 finished after Duration { secs: 89, nanos: 69803469 }
round 150000000 finished after Duration { secs: 91, nanos: 617039787 }
^C
Echidnatron%

Well each of these rounds takes a while. They settle down to around 90 seconds, which for fifty million updates sure is a lot more than the ten seconds it takes to start thing up. Or, the throughput is much lower than four million updates per second, which is what we announced when loading the data; it looks like maybe half a million updates per second (50M / 90s).

As it turns out, the fifty million updates we are doing here are very different from the first fifty million edges we added. Although there are similar numbers of updates, we get to see the change in the output after each of the fifty million updates. When the data are loaded, we only get to see the change in the output after all of the initial fifty million edges are inserted. This is an important distinction that we'll revisit a little later on (ed: next post?).

Visualizing trade-offs

Perhaps those walls of text from the console were helpful, but while they contain lots of data they probably aren't the best way to understand the trade-off between throughput and latency.

Let's try a different approach now, by plotting throughput against latency, where each data point is produced from some setting of the batch size. To keep the experiment simple, once loaded we will run the computation for an additional ten seconds, recording the number of rounds we are able to process. With the batch size, this number will tell us (i) how many rounds we can process in ten seconds (latency) and (ii) how many updates we can process in ten seconds (throughput).

batch size latency (s) throughput (elts/s)
1 0.000022360 44723.61
10 0.000096040 104122.98
100 0.000642212 155711.79
1000 0.004543951 220072.78
10000 0.030170409 331450.60
100000 0.209353117 477661.86
1000000 1.654989484 604233.44

As the batch size increases we see an almost comparable increase in latency, but each time the latency doesn't increase by 10x the corresponding gains manifest in the throughput.

Let's plot these numbers. There are a few ways to look at it, but I like the throughput on the x-axis and the latency on the y-axis. From my point of view, the amount of change in the input is out of our control; the world changes, and we get to see how rapidly we can keep up with it as it does. For each offered load, we can look up the position on the x-axis and see the steady-state latency we would get.

lt-large

Of course, this line doesn't go up and to the right forever. To prove this point let's take a much smaller example, a graph with ten nodes and fifty edges.

batch size latency (s) throughput (elts/s)
1 0.000015030 66531.79
10 0.000025207 396714.95
100 0.000080650 1239930.37
1000 0.000646346 1547159.74
10000 0.007631104 1310426.40
100000 0.088007205 1136270.59
1000000 0.908435692 1100793.38
10000000 7.3448221245 1361503.35

In this computation the throughput plateaus with reasonably small batches of about one thousand elements. At this point, we've gotten all of the locality benefits we hope to extract, and larger batches just means more memory we have to shuffle through. Differential dataflow also sorts per-key updates, and with this many updates we have many for each key, and sorting is generally super-linear.

Anyhow, let's plot these numbers, and see how different they look.

lt-small

Yes, these numbers look different. Although they are all better in absolute terms, because a small graph is easier to update than a large graph, they demonstrate that the attainable throughput will asymptote. Once we get near 1.6 million updates per second, there just doesn't seem to be a batch size that works for us. Really what this means is that there should be a vertical asymtote thereabouts, in that once we have more than 1.6 million updates per second, things just go sideways. Or, vertical in this orientation.

Evaluating trade-offs

One thing I like about thinking of the latency-throughput trade-off is that it helps me to better understand the implications of some possible optimizations, or a change in the system configuration.

For example, if we add more workers, do we expect the latency (and throughput) to improve, or just the throughput? Often this sort of scaling is evaluated in fixed "weak" and "strong" scaling experiments, where the load is either increased proportional to the workers, or not increased at all. The results reveal whether the system handles more load, or even handles the same load but faster. These are interesting for sure, but I think we can learn even more by looking at the change in the latency-throughput curves.

Let's look at the latency-throughput curves for multiple workers on our 10 million node, 50 million edge computation. The solid line with circles is one worker, the dashed line with squares is two workers.

lt-compare

Well, what do you think? The two worker line looks pretty similar to the single worker line, with each measurement a bit to the right and up a little; for the same batch size two workers yield at most twice the throughput and a bit more latency, which I think makes some sense because we have twice as many resources. So what do you wager the "speed-up" is for latency at a given offered load? Probably not quite 2x, is what I'm guessing most folks would assume; you couldn't have made things more than twice as fast, and you probably didn't even manage that.

Let me pull out a few data points for comparison. Both experiments produced a reading of approximately 220,000 elements per second (it is the fourth circle and third square). The latencies are:

latency throughput
one worker 0.004543951s 220072.78
two workers 0.000906783s 220559.92

Servicing the same offered load of about 220k elements per second, the two worker set-up provides a five-fold improvement in latency. Let that sink in for a bit.

What is going on is a "virtuous cycle". Imagine you are feeding the workers at a rate that stabilizes at batch sizes of 1,000 for a single worker. If you hand 1,000 updates to two workers, they will finish faster than the single worker; not twice as fast necessarily, but fast enough that by the time they are ready to go again there are fewer than 1,000 input elements to process. This will take even less time for the two of them to process, and so the third time around there is even less work available. Where does it stabilize? Apparently, from the latency-throughput curve, at a 5x reduction in latency.

Another way to look at it is that the latency-throughput function has a steeper derivative than the pictures might make you think (note that the y-axis is in log-scale). If we force twice as much load on the system, the response time can get much much worse. Similarly, introducing a second worker that picks up half of the slack can in principle recover as much as you would lose going in the other direction.

One conclusion from this is that while latency is a great metric to think about, it doesn't capture the rate at which we do computation, just how good we are at ordering the computation we do so that we can confirm results as soon as possible. By adding a second core we haven't made any computation run five times faster, we've just given the system the breathing room to respond five times as often.

Addendum: Scaling (2017-07-29)

I sneaked onto a computer with many cores, and came back with some more scaling information for latency-throughput trade-offs. The goal here was to see whether the trend seen above on my laptop, of cores increasing the responsiveness of the system more dramatically than one might expect, continues, and for how long it continues.

So we do the same computation, 10 million node, 50 million edge degree computation and maintenance, increasing the number of workers from 1 up through 16, at each power of two. We take a reading with each power of ten batch size, producing:

scaling

The curves are unlabeled, but they are probably the ones you think they are. From left to right the worker counts increase, generally increasing the throughput of the system at the expense of the minimal achievable latency.

I find this plot a bit more striking than the one for my laptop. Two threads really does look like you are getting twice the throughput, four threads looks like maybe four times the throughput, and .. then .. the scaling seems to break down a bit, doesn't it. I think there is a good explanation for that (not exactly a system issue; more of a program issue) and I'll try and explain that in future addenderata.

For the moment though, we see many situations where increasing the core count can substantially improve the latency for a fixed throughput. Taking 400,000 elements per second as a target, which for one core requires a quarter of a second, we can achieve

latency throughput
one worker 0.244437870s 409101.91
two workers 0.004326536s 462263.54
four workers 0.000701516s 570193.62
eight workers 0.000182272s 438904.32

Or we could aim for around one million elements per second, which one worker doesn't manage, and get readings like

latency throughput
two workers 2.179154755s 917787.04
four workers 0.043897407s 911215.55
eight workers 0.007832616s 1021370.17
sixteen workers 0.001645368s 972427.02

From all of this we can somewhat conclude that adding resources has the ability to dramatically improve the responsiveness of your differential dataflow computation. We shouldn't conclude things about other systems, as this could just be because differential dataflow slows down a lot to give high throughput and has lots of ground to regain.

There remains the mystery of why we can't seem to get past 2.4 million updates per second, which I think I'll be able to explain. I have a theory, and once I get some data it should make a good story!

Addendum: Scaling (2017-07-29)

I've got another picture for you, which gets at the secret behind why the scaling seemed to break down. It is roughly the same experiment as above, with a minor change. See if you can spot it:

cargo run --release --example degrees -- 10000000 500000000 <batch> -w <workers>

When we use this command line instead of the ones up above, we get latency-throughput curves that look like this:

scaling2

Notice that the x-axis, throughput, goes out lots farther than up above. We max out at 12.5 million elements per second. It's a pretty small change, but one that greatly affects how easy it is to compute the answer.

Addendum: Open-loop measurements (2017-08-14)

I was recently reading a slide deck from Azul (presented here; go watch it) which makes an excellent point, one I hadn't clearly understood: when taking latency measurements, there is a huge difference between closed-loop (where the test harness waits for responses before making new requests) and open-loop (where the test harness does not wait).

The self-regulation of a closed-loop harness can cause it to take fewer measurements in periods of poor latency.

The example Azul gave, which made this super clear for me, is: imagine a system that for the first half of every second responds to requests in one millisecond. For the second half of every second it enqueues the request, doing something like garbage collection or the like.

  1. A closed-loop harness, which issues one request (or a batch) and then awaits the response before issuing the next, will see some 500 measurements of "one millisecond" and one measurement of "500 milliseconds". If you plot that distribution, it looks great. There is a tail, but hey its reality and real measurements have tails.

  2. An open-loop harness, which issues requests independent of the state of the responses to its requests, will see some 500 measurements of "one millisecond" and some 500 measurements of "up to half a second". If you plot that distribution, it looks pretty terrible. What used to be the "tail" up above is now "half of the responses. Barf. The previous reality was better.

The difference is that the closed-loop harness slows down testing when the system slows down. That's really important to notice. It could be that for your system closed-loop testing is all you need (perhaps you have interactive sessions with few clients?), but it seems like a great point to make clear.

All those measurements up above are closed-loop testing. The average latency numbers are "elapsed time / requests complete". Writing that out, it seems like it was a horrible lie from the beginning. (Historical note: in the Naiad paper we actually did an open-loop harness for our "real-time streaming" experiment. So, not total suck when it matters.)

Making things right

Let's look at the difference between open-loop testing and closed-loop testing. We have a bunch of measurements from up above, and we are going to re-do a bunch of them with a different test harness. And, as you will see, we will need to change a few other things too.

How do you write an open-loop harness? Timely dataflow workers are generally written as a loop where you repeatedly do dataflow work, and at any point can stuff some more inputs into the dataflow. Our workers are going to plan out when they should introduce each request, and if they fall behind they can introduce larger batches of inputs with the logical times recorded.

The code I wrote expects a ns_per_request, the number of nanoseconds to wait between each request, and whenever it gets a chance it plays requests out to catch up with "real time".

while (request_counter * ns_per_request < elapsed_ns {
    input.advance_to(request_counter * ns_per_request);
    input.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
    input.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));
    request_counter += 1;
}

Although we may introduce records a bit later than they arrived, we do know when they arrived and we will ultimately use that time to measure latency.

At the same time, we are watching our timely computation to see how promptly results get emitted. The timestamp our computation uses is "nanoseconds from start of computation", and we are going to watch for the gaps between when times were submitted and when they are marked as complete by timely.

// Determine completed ns.
let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0].inner);

// Determine real elapsed ns.
let elapsed = timer.elapsed();
let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);

// any un-recorded measurements that are now complete should be recorded.
while (measurements.len() * ns_per_request < acknowledged_ns {
    let requested_at = measurements.len() * ns_per_request);
    measurements.push(elapsed_ns - requested_at);
}

This should capture all of the gaps between the logical moment in the experiment where we had the ability to produce the result, and the real moment in the experiment where we noticed that its consequences were complete.

This is not only "open-loop", in that we keep feeding inputs without awaiting completion, but it is also generally more fair than what we were doing for our closed-loop measurements. Previously, we imagined the simultaneous arrival of a batch of requests, and measured the time to retire them. There was no discussion of how long the record hung out in buffers awaiting the opportunity to get submitted as a batch, which is something we are now tracking.

How does it perform? I have the code produce the median, 99th percentile, and maximum latencies over a ten second test.

throughput median 99th max
100,000/s 459,076ns 26,465,622ns 94,155,622ns
200,000/s 14,744,855ns 163,095,860ns 237,240,860ns
300,000/s 95,300,159ns 304,404,372ns 1,274,797,030ns
400,000/s 331,634,458ns 1,723,336,728ns 2,302,040,085ns
500,000/s 882,740,273ns 2,791,137,348ns 2,875,759,348ns

Here throughput really means "offered load". The distinction is that, as we only run the experiment for a fixed duration and seem to have latencies crawling up towards that duration, we may not have actually measured a stable configuration. That is on me, and the experimental rig should get better.

How do these numbers compare with those from before? Let me copy and paste the corresponding measurements we had from the batch-sizing experiments

batch size latency (s) throughput (elts/s)
1 0.000022360 44723.61
10 0.000096040 104122.98
100 0.000642212 155711.79
1000 0.004543951 220072.78
10000 0.030170409 331450.60
100000 0.209353117 477661.86
1000000 1.654989484 604233.44

It is hard to make direct comparisons, because we held different things constant in each case, but I think it is safe to say that the open-loop measurements are "worse" than the closed loop ones. Worse in the sense that they suggest a less-good system, but better in the sense that it was a better experiment, and it seems that not everything fell apart when going from closed to open loop testing.

Things fall apart

Let's try out two threads. Each of them are going to play out some target rate of requests, and record the rate at which they get retired. I've rigged the code so that they hit disjoint but overlapping times with their requests. I'll report the maximum of the reported medians, 99th percentiles, and maximums between the two workers. I'll also halve the target rates, so that the "system" has throughputs corresponding to the single worker.

throughput median 99th max
100,000/s 236,489,514ns 812,808,412ns 5,375,793,773ns
200,000/s 401,989,270ns 8,242,452,539ns 9,399,414,985ns

Ok, let's stop here. The median latency is up in the hundreds of milliseconds, which is pretty much forever in timely dataflow terms. For the second row, the maximum measurement is roughly the duration of the experiment, which should warn us about something. Partly, that my measurement only captures completed measurements, which biases it in the "faster" direction. Also that not a lot really got done in that second row.

What the hell is going on?

Things get put back together

After a touch of sleuthing around, this turns out to be a pretty fundamental defect in how my implementation of timely dataflow did its progress tracking logic. There is a bunch of details, but the short version is that there was a lot of code that looked like:

for (time, diff) in updates {
    frontier.update(time, diff);
}

where frontier is some clever type that tracks the lower envelope of all times in play at the moment. It is like a fancy min for partial orders, which maintains all of the elements in the set that are not greater than some other element.

The problem was, it maintains this lower envelope after each call to update. This means that if updates is ever quite large, and if the updates each change the frontier (say, for example: describing the consumption of a message at each time in sequence), then this thing is going to end up taking quadratic time. One way that updates might get large is if one worker heads of to do some work, the other worker starts slamming it with messages, which only cause it to take more time the next go round.


Now, this is wrong. The updating shouldn't have to take quadratic time, because we only need the invariant to be correct after we have applied all of the updates. So, I went and touched some code that hadn't been touch in years, and switched all of the interfaces from single element updates, to batch iterator updates.

We are now measuring some new code. This correction has the potential to improve even the single-threaded case, as whenever we dip off for one of those several hundred millisecond breaks in the 99th percentile tail, the next thing we are going to do is drop in several thousand records. They get batched somewhat, but this could actually improve the behavior of the system even for one worker:

throughput median 99th max
100,000/s 278,569ns 26,223,434ns 97,659,698ns
200,000/s 8,157,470ns 229,676,978ns 309,386,978ns
300,000/s 54,992,987ns 241,406,483ns 1,186,357,171ns
400,000/s 203,661,730ns 566,640,573ns 1,560,094,012ns
500,000/s 561,085,336ns 2,601,315,288ns 2,687,729,288ns

It does. At least, the median latencies are all down. The 99th and max aren't much changed, but I think they are mostly artifacts of the state compaction logic, rather than overhead in progress tracking.

What about for multiple workers? That seems to have been fixed as well. Holding the offered load fixed as we increase the workers, we see general reductions in the median latency, with a bit of a mixed bag up at the 99th percentile and beyond.

throughput median 99th max
100,000/s 214,541ns 24,753,896ns 64,039,274ns
200,000/s 686,818ns 82,600,775ns 143,860,775ns
300,000/s 4,604,735ns 889,050,353ns 988,980,359ns
400,000/s 22,654,736ns 2,020,938,633ns 2,108,488,633ns
500,000/s 94,732,829ns 1,342,960,963ns 1,429,652,963ns

I think these numbers are pretty interesting, especially near the median latency where we observe a 10x drop through the doubling of workers. The tail latencies are less amazing, and seem like a great thing to try and reign in. I have some theories about where the time goes (or rather, what blocks the completion), and it is neat to have a better measurement framework in which to evaluate potential changes.