Skip to content

Latest commit

 

History

History
553 lines (390 loc) · 43.8 KB

2017-08-21.md

File metadata and controls

553 lines (390 loc) · 43.8 KB

Live PageRanking

Today we are going to work through a cute program, one that I hope will remind you that timely dataflow does some pretty cool things, and that it can be surprisingly uncomplicated at the same time.

We are going to build a computation that computes PageRank, but against a graph that you can continually change. The PageRank computation will run, and you can stick your head in at any time and issue changes to the graph, and the computation will pick up what you've put down and course-correct to head towards the new correct result. If you do the exercise at the end, you can even change the reset distribution PageRank uses, live!

Better still, our implementation will be wholly incremental. As the computation starts to stabilize it accelerates, and if you apply small changes to converged (or mostly converged) computations, they re-converge quickly, rather than starting over from scratch.

I'm sure lots of folks have told you that their pagerank algorithm "converges quickly", and perhaps you've been hurt before. This isn't going to be a super-fast implementation; if you want to see PageRank go fast on timely, you can check out these posts with Malte Schwarzkopf. However, we are going to see roughly single-digit millisecond times to re-converge after single edge changes, in case that helps put this in any context. Depending on your rate of edge churn, you could have live, or nearly live, PageRanks for your graph.

Pageranking Background

I'm going to borrow pretty heavily from A uniform approach to accelerated PageRank computation, something I did over ten years back (wow). This is a fine paper to read if you are interested in PageRank, or similar ranking schemes.

There are many explanations of PageRank, but perhaps you just have a directed graph, perhaps you imagine that directed links confer some vote of authority from the source to the destination (e.g. should you link to this post, you are vouching for it), and perhaps you fancy some ranking of the pages by these conferred authorities. PageRank does this by solving a system of equations for "equilibrium rank".

One equivalent interpretation of PageRank is a version of the "random surfer model", in which each page acts as a source of hypothetical web browsers who like clicking on random links. Round after round, each surfer follows a random link leaving their current page with some probability (usually 0.85), and gets bored and wanders away otherwise (probability 0.15). To keep the system moving, there is some assumed inflow of surfers arriving at each page each round (alternately, but equivalently, when surfers get bored they wander to a random page rather outdoors).

We could implement this by actually recording a number for each page, the number of hypothetical surfers there right now, and explicitly simulate the process: in each round, some fraction (0.85) of the current resident surfers are distributed among outgoing links, the remaining fraction (0.15) are discarded to the wind, and a fixed amount are introduced as new surfers. If we actually perform this simulation, we would find that the number of surfers at each page stabilizes. In fact, after not too long we find that in each round we are sending exactly the same numbers of surfers from most, if not all, pages to their neighbors. Soon thereafter it becomes all!

Incrementalization

If we keep sending the same numbers around, that seems like a bit of a waste. Why not send the change in the number of surfers? Like, if your page gets ten more surfers, your distribution of surfers to your outgoing links can change by at most ten (actually, at most 8.5); just figure out which outgoing links experience the change, and tell them what that change is. Should the counts stop changing, you can stop sending updates.

This is the algorithm we are going to implement.

It turns out this algorithm is also super-amenable to incremental changes in the edge graph, by the same reasoning. If there is a change in the edge graph, this changes the distribution of surfers out of some pages. We can just determine the changes and circulate them, and (having installed the new edges) the system begins to converge to the correct limit for the updated graph.

There is a bunch of math for all this in that paper up there, if you are curious (it's really not so bad, either). Or you can just trust me and come along for the ride.

A Timely Implementation

We are going to implement this computation in timely dataflow. If you are not familiar with timely dataflow, great! This example is actually designed to show off several of the moving parts at the same time, without (I hope) getting in a horrible tangle.

Note: The code is intentionally simplified, so it isn't going to go lightning fast so much as be relatively simple and clear. Again, if you want to see PageRank go fast on timely, you can check out these posts with Malte Schwarzkopf.

Timely dataflow is a computational framework where you write programs that act on collections of timestamped datastreams. What does this mean? Rather than imperatively dictate what must happen in which order, you describe computation that can respond to newly arrived timestamped data.

For example, we are going to need to respond to a stream of edge changes in a graph. In timely dataflow we might model that as a stream of ((usize, usize), i64) data, representing an edge (the source and destination) as well as a signed integer for how the edge changed (+1 for "arrived" and -1 for "departed").

These changes are presented to our computation as a stream, but where the data have timestamps. Each batch of edge changes has a logical "time" that is free to mean many things; in our case it will correspond with "round of applying that rule up there about surfers". If some edge changes arrive with timestamp 100, that means "as part of the 100th iteration, apply these edge changes". You can also not have edge changes for some round, which will allow the algorithm to do a round of computation in peace; this is how computations on static datasets usually go.

Timely dataflow is pretty good about keeping track of times at which there are changes. If you want to do 100 rounds of computation and then apply some edge changes, timely will circulate changes from time 0 until there are none, and if it isn't round 100 yet it will fast-forward to that round. If you have some more changes for round 200, timely will do the (relatively few) rounds of computation to re-stabilize, and the fast-forward to round 200.

Why would you do this rather than hand-write some code? Timely implementations can be distributed across any number of workers, which gives you a pretty quick scaling story. Also, by writing your computation as dataflow, it composes much better; imagine we've actually got as input a stream of un-parsed html files, and our output stream of rank changes actually wants to trigger further downstream computation.

Some Boilerplate

Let's start with some timely dataflow boiler plate, before getting into the details of how we structure the computation itself. This should give a clearer picture for what the program might look like, and what we get to use when writing the program.

Eliding many extern crate and use statements (the full program is available as examples/pagerank.rs), the skeleton of a timely dataflow program often looks like so:

fn main() {

    // hand control over to timely.
    timely::execute_from_args(std::env::args(), |worker| {

        // create a new input handle.
        let mut input = InputHandle::new();

        // create a new timely dataflow.
        worker.dataflow(|scope| {

            // create an edge stream from our input.
            let edge_stream = input.to_stream(scope);

            // NOTE: we will write lots more *here*.
            unimplemented!();

        });

        // send some edge "changes".
        input.send(((0, 1), 1));
        input.send(((1, 2), 1));
        input.send(((2, 1), 1));

        // advance time and change more.
        input.advance_to(100);
        input.send(((2, 1), -1));
    }
}

We pretty quickly hand control over to timely, which asks us to describe what each worker in the computation should do. Timely is a data-parallel dataflow system, and will happily spin up as many workers as you ask it to. However, each worker is supposed to be doing the same thing (though with different data).

Within the worker context, we create a new input handle, think about building a dataflow computation (but stop early), and then imagine sending some edge changes (recall 1 is addition and -1 is subtraction), advance the time of the input a little, and then send some more changes.

If we were to look into the computation (and we imagine that unimplemented!() didn't cause the program to explode), we would see three pieces of data moving along the edge_stream stream with timestamp zero. We would also see, just a bit later, one piece of data with timestamp one hundred. If we stuck around and sent more data, at increasing times, we would see these data with their timestamps as well.

This is all great and, other than the unimplemented!(), timely dataflow would excel at running this on multiple computers and doing the nearly nothing it does. So let's try and build out the example.

Iteration

Our intended computation takes a stream of edge changes, and probably a stream of rank changes. But, we don't have a stream of rank changes. Yet!

Our plan is describe how we would take a stream of edge changes and rank changes, and produce an output stream of new rank changes. As in, how we would perform one step of our surfer-redistribution rule up above. With this computation figured out, we are just going to connect the output stream of changes back to the source of rank changes, delayed by one round of computation.

This may sound like a bit of a cheat, but it is actually the right thing to do. Timely dataflow allows you to construct cyclic dataflows, as long as you promise to advance the timestamp of records that return to the head of the cycle. Often we see this in iterative contexts, where the timestamp is a "round" indicator, much like i in for i in 0 .. n.

Let's imagine in the context of the dataflow call up above, we just want to define a stream of changes to ranks, bump it into the stream of edges, and then connect it back. If we ignore the details of the bump operator, this looks like:

        // create a new timely dataflow.
        worker.dataflow(|scope| {

            // create an edge stream from our input.
            let edge_stream = input.to_stream(scope);

            // create a new feedback stream, which will be changes to ranks.
            let (handle, rank_stream) = scope.loop_variable(usize::max_value(), 1);

            // bump the two together and connect back.
            edge_stream.bump(&rank_stream)
                       .connect_loop(handle);
        });

All that we've done here is define a new loop variable, which comes as a pair of handle and stream: the handle is eventually used to define the data, and the stream is the immediately usable stream that only later becomes defined. Having integrated edge_stream and rank_stream somehow, via bump, we connect the results back and have a cyclic dataflow.

The usize::max_value() and 1 parameters to loop_variable indicate the upper bound on the timestamp (e.g., could have been 100) and the amount by which the timestamp should increment each time around the loop.

Note: Timely dataflow will keep moving data around for as long as data exists. While you can go and build really cool cycles, bear in mind that if you continue to circulate data forever, your computation may never settle down. That could be fine, but bear it in mind! If you want a cyclic dataflow to settle down, you'll either need to set a lower upper bound on the number of rounds, or get your dataflow to quiesce by not sending any more data.

Bumping Together

So now we've come to the hard part. We need to figure out what to do with a stream of timestamped edge changes, and a stream of timestamped rank changes. We can put off the complicated logic for just a moment, and build the skeleton of the bump dataflow operator first.

The edge changes flowing around will have type ((usize, usize), i64) and the rank changes will have type (usize, i64). The first usize field in each of these indicates the source node. The i64 in each indicate the amount of change.

Because we may be running with multiple workers, we'll want to shuffle each stream so that changes to the same source arrive at the same worker. To shuffle data between workers we could use the exchange operator. It doesn't change the logical contents of the stream, but it does move the records between workers based on a supplied function (from record to integer; we would just pick out the first field). Another way to exchange data is, as part of building a new operator we can specify how its inputs must be shuffled for the operator to be correct. We are going to do that instead.

Our plan is to build a binary operator, taking both edge_stream and rank_stream as input. I'm going to blat down the skeleton of such an operator now, using one of timely dataflow's generic binary operators, binary_frontier.

worker.dataflow(|scope| {

    // create a new input, into which we can push edge changes.
    let edge_stream = input.to_stream(scope);

    // create a new feedback stream, which will be changes to ranks.
    let (handle, rank_stream) = scope.loop_variable(usize::max_value(), 1);

    // bring edges and ranks together!
    let changes = edge_stream.binary_frontier(
        &rank_stream,
        Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
        Exchange::new(|x: &(usize, i64)| x.0 as u64),
        "PageRank",
        |_capability| {
            // TODO: Operator logic goes here
        }
    );

    changes
        .connect_loop(handle);
});

All we've done here is replace bump from up above with this new beastly binary_frontier call. However, the call is really just (i) taking two stream arguments, edge_stream and rank_stream, (ii) taking two shuffling instructions, the Exchanges, (iii) taking a tasteful and descriptive name, "PageRank", and (iv) some closure thing that we haven't written yet.

I could imagine much of this makes some sense, other than the "closure thing that we haven't written yet". Everything else is just explaining to timely which streams we want to connect as input, how their data should be shuffled before presenting it to the operator, and how the operator should promote itself if anyone ever asks (we won't).

The closure is where the good stuff happens. That is where we actually get to drive the operator around, and .. figure out what to do with all of these edge updates and rank updates. Will we survive, or will we be overwhelmed with complexity?

Operator Initialization

Let's now describe what the operator should do. Actually, first let's talk about what this closure, the one we are supposed to write that takes |_capability|, should do. This closure is where we get to initialize the logic and state of the operator, and potentially acquire a capability to send data (_capability). We need to return another closure, one that describes how the operator actually behaves.

Let's map out what our initialization closure looks like, as it isn't too complicated.

    |_capability| {

        // where we stash out-of-order data.
        let mut edge_stash = HashMap::new();
        let mut rank_stash = HashMap::new();

        // lists of edges, ranks, and changes.
        let mut edges = Vec::new();
        let mut ranks = Vec::new();
        let mut delta = Vec::new();

        move |input1, input2, output| {
            // TODO: actual operator logic here.
        }
    }

These five declarations are for the internal state of our operator. This is just Rust declaration syntax, there's nothing magical here. When we eventually use these in the closure below, Rust is bright enough to transfer their ownership to the returned closure. We don't have to create a new type called BumpOperatorState with five member fields and a run method or anything like that.

Apparently we are going to stash some things, and keep lists of other things. This isn't very clear yet, in part because type inference in Rust means we don't have to specify the types if Rust can sort them out from how we use them. In an attempt to be helpful, let me explain:

The _stash hash maps are going to be for storing received inputs, respectively edge changes and rank changes, until it is time to process them. We could receive such changes out of order, or one iteration could bleed into the next, that sort of thing. Instead, we will stash all changes first, keyed by timestamp, and then extract the changes when it is their round.

The three lists represent edges, ranks, and some temporary storage, respectively: edges[node] contains a list of destinations from node, ranks[node] is the current number of surfers hanging out at node, and delta is a temporary buffer we use to accumulate output changes in ranks before sending them (so that we can cancel some and avoid sending them).

Notice that we didn't use _capability. If you are familiar with Rust, the leading _ means "don't warn me about unused variables". It turns out this operator will not need the ability to send data before receiving any, and this is the only reason one needs the capability.

The the last thing to do is explain what happens when we need to run the operator.

Operator Behavior

The closure we need to specify now takes three arguments: handles to the two inputs, and a handle to the output. This is how the operator can read from its input queues, and write to its output queue if it is so inclined.

Our program structure is going to be pretty typical: we read from our input queues but stash the results until we are sure we want to process them. Timely dataflow delivers timestamped data as it arrives, not "in timestamp order" or anything. We'll want to be careful to make sure that we only work with changes for each round at a time.

Having stashed whatever is read on each input, our operator then checks out each of the stashes and asks if there are any times that neither input can any longer produce. This involves looking at the frontier() of each input (and why we used binary_frontier), which describes which times we should still expect to see. If neither frontier could result in some stashed time, we should process it!

    move |input1, input2, output| {

        // hold on to edge changes until it is time.
        input1.for_each(|time, data| {
            edge_stash.entry(time).or_insert(vec![]).extend(data.drain(..));
        });

        // hold on to rank changes until it is time.
        input2.for_each(|time, data| {
            rank_stash.entry(time).or_insert(vec![]).extend(data.drain(..));
        });

        // capture the frontiers of both inputs, to judge the round.
        let frontiers = &[input1.frontier(), input2.frontier()];

        // consider each stashed time for edge changes.
        for (time, edge_changes) in edge_stash.iter_mut() {
            if frontiers.iter().all(|f| !f.less_equal(time)) {
                // TODO: apply `edge_changes`.                
            }
        }
        // discard empty edge stashes
        edge_stash.retain(|_time, stash| stash.len() > 0);

        // consider each stashed time for rank changes.
        for (time, rank_changes) in rank_stash.iter_mut() {
            if frontiers.iter().all(|f| !f.less_equal(time)) {
                // TODO: apply `rank_changes`.
            }
        }
        // discard empty rank stashes
        rank_stash.retain(|_time, stash| stash.len() > 0);
    }

Perhaps not the star of the code above, the two calls to retain are very important. This is where we formally decline the ability to use time any more. Once we let it go, it's gone, and the system is bright enough to see (and enforce) this.

What about the final two TODO items? Here is the code for edge_changes, which uses two helper methods I wrote (compact and allocate) which (i) sorts and cancels diffs and (ii) distributes rank among neighbors, respectively.

    // TODO: apply `edge_changes`.                
    let mut session = output.session(&time);

    compact(edge_changes);

    for ((src, dst), diff) in edge_changes.drain(..) {

        // 0. ensure enough state allocated, init 1K surfers.
        while edges.len() <= src { edges.push(Vec::new()); }
        while ranks.len() <= src { ranks.push(1_000); }

        // 1. subtract previous distribution.
        allocate(ranks[src], &edges[src][..], &mut delta);
        for x in delta.iter_mut() { x.1 *= -1; }

        // 2. update edges.
        edges[src].push((dst, diff));
        compact(&mut edges[src]);

        // 3. re-distribute allocations.
        allocate(ranks[src], &edges[src][..], &mut delta);

        // 4. compact down and send cumulative changes.
        compact(&mut delta);
        for (dst, diff) in delta.drain(..) {
            session.give((dst, diff));
        }
    }

The rough idea is that we are pushing a lot of (usize, i64) changes into delta, corresponding to changes in outgoing rank. We do this by subtracting the old allocation and adding the new allocation. It's not very elegant, but it is relatively concise.

The only timely methods here are output.session(&time) and session.give((dst, diff)). These are the two parts to sending data in a timely dataflow operator: we need to exercise the capability to send data at a time, and then we feed the resulting session with whatever data we want to send. In this case, it is whatever the cumulative changes were between the previous distribution of rank and the subsequent distribution.

There is similar code for rank_changes. The only substantive difference is that instead of

        // 2. update edges.
        edges[src].push((dst, diff));
        compact(&mut edges[src]);

we have

        // 2. update ranks.
        ranks[src] += diff;

In both cases, we determine the previous distribution of rank and the new distribution of rank. We cancel these against each other as appropriate, and then transmit any changes as output. Those changes are what will come back around next iteration, and lead to more work.

That's all of the dataflow construction. The program is ready to go.

Execution

Our harness above feeds in three edges and then removes one. I've added some reporting that tells us of the changed ranks, (i) how many changes, (ii) the sum of their absolute values, and (iii) the maximum absolute value. It also reports how long it took to get to that round. This could be pretty dull for such a small graph, but let's check.

Echidnatron% cargo run --release --example pagerank
    Finished release [optimized] target(s) in 0.0 secs
    Running `target/release/examples/pagerank`
Duration { secs: 0, nanos: 166961 }:    (Root, 1)      2    2499    1666
Duration { secs: 0, nanos: 220348 }:    (Root, 2)      2    2082    1388
Duration { secs: 0, nanos: 228283 }:    (Root, 3)      2    1736    1157
Duration { secs: 0, nanos: 234816 }:    (Root, 4)      2    1446    964
Duration { secs: 0, nanos: 241467 }:    (Root, 5)      2    1205    804
Duration { secs: 0, nanos: 247821 }:    (Root, 6)      2    1004    670
Duration { secs: 0, nanos: 254024 }:    (Root, 7)      2    837     558
Duration { secs: 0, nanos: 260351 }:    (Root, 8)      2    698     465
Duration { secs: 0, nanos: 266878 }:    (Root, 9)      2    581     387
...
Duration { secs: 0, nanos: 720926 }:    (Root, 47)     1    1       1
Duration { secs: 0, nanos: 727150 }:    (Root, 48)     1    1       1
Duration { secs: 0, nanos: 736664 }:    (Root, 101)    1    6890    6890
Duration { secs: 0, nanos: 742801 }:    (Root, 102)    1    5742    5742
Echidnatron%

What we see here are some pretty standard reduction in the amount of error (about a factor of 0.85) each iteration, until the 48th iteration, at which point the error finally vanishes.

The only work remaining in the system is that edge change at round 100, and timely correctly reports to the operator that there will be nothing else received before it. We pick up that change, removing the (2, 1) edge, and in just two iterations of changes all is settled again (node two decrements rank headed for node one, who informs node two, who is no longer connected to node one).

To get a sense of scale, 1_000 new surfers are introduced at each page each round and a 0.15 fraction of all surfers retire each round. This means that there are 3 x 1,000 / (0.15) = 20,000 surfers in play in stable state. The sum and max tell us when this error is relatively small, which it is around iteration 48, but the (relatively) catastrophic change at round 100 introduces a serious shake-up.

Moar Execution

There is a pretty gross performance bug (or three) in the operator implementation above, so we aren't going to do epic scale graphs. Instead, let's look at a computation that loads up 10m edges on 1m nodes and runs that until it stabilizes.

Echidnatron% cargo run --release --example pagerank -- 1000000 10000000
    Finished release [optimized] target(s) in 0.0 secs
    Running `target/release/examples/pagerank 1000000 10000000`
Duration { secs: 12, nanos: 209272888 }:    (Root, 1)    999951    833440369    2730
Duration { secs: 13, nanos: 726563426 }:    (Root, 2)    999951    693998940    2704
Duration { secs: 15, nanos: 255796509 }:    (Root, 3)    999951    578298549    2150
Duration { secs: 16, nanos: 708072918 }:    (Root, 4)    999951    481904094    1812
Duration { secs: 18, nanos: 153717372 }:    (Root, 5)    999951    401564212    1516
Duration { secs: 19, nanos: 615845563 }:    (Root, 6)    999951    334615539    1258
Duration { secs: 21, nanos: 054239582 }:    (Root, 7)    999951    278834174    1058
Duration { secs: 22, nanos: 469149840 }:    (Root, 8)    999951    232344944    874
Duration { secs: 23, nanos: 889478274 }:    (Root, 9)    999951    193627090    734
...
Duration { secs: 50, nanos: 949865016 }:    (Root, 30)   975941    4210777      20
Duration { secs: 51, nanos: 619382262 }:    (Root, 31)   956152    3503803      18
Duration { secs: 52, nanos: 203779878 }:    (Root, 32)   927219    2924928      16
...
Duration { secs: 56, nanos: 638983100 }:    (Root, 90)   65        72           2
Duration { secs: 56, nanos: 639020015 }:    (Root, 91)   32        36           2
Duration { secs: 56, nanos: 639047710 }:    (Root, 92)   4         5            2
Echidnatron%

We see a pretty steady decline from a very large amount of error, down to by round 30 relatively small error (I just 20 to be relatively small to 1,000). Pretty soon thereafter, we are stable.

What about shaking things up again? Let's take a stable computation like this and start making some changes. Like, let's add and remove random edges once every ten iterations. The computation starts as before, identical other than the timings, but once we finish with iteration 92 we leap ahead to the first of the changes (in round 1000 apparently).

...
Duration { secs: 56, nanos: 278149758 }:    (Root, 90)      65      72      2
Duration { secs: 56, nanos: 278175255 }:    (Root, 91)      32      36      2
Duration { secs: 56, nanos: 278184477 }:    (Root, 92)      4       5       2
Duration { secs: 56, nanos: 278218595 }:    (Root, 1001)    18      2357    781
Duration { secs: 56, nanos: 278325405 }:    (Root, 1002)    163     2004    55
Duration { secs: 56, nanos: 278791626 }:    (Root, 1003)    768     1624    7
Duration { secs: 56, nanos: 279336169 }:    (Root, 1004)    889     992     2
Duration { secs: 56, nanos: 279733224 }:    (Root, 1005)    653     724     2
Duration { secs: 56, nanos: 280085938 }:    (Root, 1006)    575     636     2
Duration { secs: 56, nanos: 280370364 }:    (Root, 1007)    461     511     2
Duration { secs: 56, nanos: 280568053 }:    (Root, 1008)    320     356     2
Duration { secs: 56, nanos: 280743687 }:    (Root, 1009)    282     312     2
Duration { secs: 56, nanos: 280885367 }:    (Root, 1010)    214     239     2
Duration { secs: 56, nanos: 281016424 }:    (Root, 1011)    205     1748    454
Duration { secs: 56, nanos: 281233057 }:    (Root, 1012)    340     1451    28
Duration { secs: 56, nanos: 281745549 }:    (Root, 1013)    843     1144    8
...

As you can see, the errors spike up from a small, focused error with large maximum value (relative to 1_000), but quickly smearing itself across many distinct nodes with a much smaller maximum value. Before we can completely stabilize, the graph changes again, and the process repeats.

Much later on, in terms of rounds but not seconds (0.372s), we finish (after 100 such updates):

...
Duration { secs: 56, nanos: 654068593 }:    (Root, 1998)    12      15      2
Duration { secs: 56, nanos: 654091300 }:    (Root, 1999)    27      29      2
Duration { secs: 56, nanos: 654113077 }:    (Root, 2000)    27      30      2
Echidnatron%

This leads us to our final experiment: starting up the computation as before, but immediately changing edges every iteration and not stopping. Well, not for 1,000 rounds anyhow.

Duration { secs: 12, nanos: 748735475 }:    (Root, 1)       999951  833440369       2730
Duration { secs: 14, nanos: 342988000 }:    (Root, 2)       999951  693998937       2704
Duration { secs: 15, nanos: 855482183 }:    (Root, 3)       999951  578298527       2150
Duration { secs: 17, nanos: 371157902 }:    (Root, 4)       999951  481904063       1812
Duration { secs: 18, nanos: 873135659 }:    (Root, 5)       999951  401564091       1516
Duration { secs: 20, nanos: 430297118 }:    (Root, 6)       999951  334615429       1258
Duration { secs: 21, nanos: 881825876 }:    (Root, 7)       999951  278833873       1058
Duration { secs: 23, nanos: 326636522 }:    (Root, 8)       999951  232346169       874
Duration { secs: 24, nanos: 816912132 }:    (Root, 9)       999951  193627491       734
...

The computation starts as before, except with slight changes in each round after the first. However, the computation doesn't stabilize in the same way as above

...
Duration { secs: 57, nanos: 374040069 }:    (Root, 90)      5990    11088   700
Duration { secs: 57, nanos: 377692710 }:    (Root, 91)      6183    10643   414
Duration { secs: 57, nanos: 381386013 }:    (Root, 92)      6226    10310   500
Duration { secs: 57, nanos: 384849706 }:    (Root, 93)      5847    9848    523
Duration { secs: 57, nanos: 388373823 }:    (Root, 94)      5974    9285    362
Duration { secs: 57, nanos: 391942045 }:    (Root, 95)      6014    9809    624
Duration { secs: 57, nanos: 395320921 }:    (Root, 96)      5711    10258   826
Duration { secs: 57, nanos: 398527141 }:    (Root, 97)      5379    9579    361
Duration { secs: 57, nanos: 401901920 }:    (Root, 98)      5668    10267   1019
Duration { secs: 57, nanos: 405310015 }:    (Root, 99)      5750    11051   710
...

Notice that these are about the same times as above (faster, actually), but while the count and sum decrease dramatically, the maximum doesn't really. This is because each time we make a change we are causing a local disturbance. That disturbance causes real error around that node. Each page in this graph has rank roughly 6,000, and adding an edge to a page with ten other edges delivers some 600 new surfers to a page. This happens each round, so the error vector while small on average has relatively large maximum errors.

It would take just a few rounds, and fractions of a second, to sort out the changes and arrive at a stable state. We see this as the computation winds down, after 1,000 updates:

...
Duration { secs: 72, nanos: 870500941 }:    (Root, 1000)    7005    10768   531
Duration { secs: 72, nanos: 875108222 }:    (Root, 1001)    6999    9299    45
Duration { secs: 72, nanos: 879345685 }:    (Root, 1002)    6456    7598    6
Duration { secs: 72, nanos: 883359688 }:    (Root, 1003)    6104    6770    3
Duration { secs: 72, nanos: 886763122 }:    (Root, 1004)    5221    5789    3
Duration { secs: 72, nanos: 889600198 }:    (Root, 1005)    4356    4827    3
...
Duration { secs: 72, nanos: 901214500 }:    (Root, 1031)    12      13      2
Echidnatron%

It takes another thirty rounds and thirty milliseconds to settle the remaining few thousand errors, even though the maximum drops almost immediately. In this case, we can think of thirty milliseconds as the "exposure" in terms of how long you'd need to get a 100% solid answer.

Backwards Error Analysis

For interested persons, there is actually a pretty decent interpretation of the error at any point. It turns out that at all times, the ranks we currently have are actually the correct answer for the slightly changed input problem. The current ranks are the correct answer to the computation where the initial conditions (the 1_000 surfers at each page) are tweaked by the outstanding changes in rank for each page.

When we tracked the number of changes, their sum, and their maximum, this was to get a sense for how much we would have to change our source distribution of surfers to be "correct". When these numbers are small, like 6 just above, it means that we have found the solution to a version of the same problem where everyone's initial surfer counts lie in [994, 1006], which isn't what we asked for but is pretty close.

For folks more familiar with the "1/n" calibration (where we ensure all ranks sum to one, using floating point values), an error of 6 would correspond to a change to the reset distribution of

0.15 * (6 / 1000) / n

which ends up at just less than a tenth of a percent. You can crank up the accuracy by changing the 1_000 to a 1_000_000, with the downside that changes persist for longer.

Conclusions

You can totally write a live pageranking program in timely dataflow using not all that much code (it's about 200 lines, including comments and measurement). I wouldn't want to sell you on the performance of this implementation (there are plenty of ways to spruce it up), but the functionality of maintaining a live iterative system that you can tweak as you like is there.

PageRank is a bit special, as a linear system it is relatively easy to update the computation and still be sure that we are still converging to a well-defined limit. Not all iterative systems have that property, including notable examples like label propagation for connected components (where you either need to totally restart, or using something like differential dataflow).

Exercise: Our computation didn't have an input stream of rank changes, because it wasn't clear what they would mean. But they do have a meaning! The initial stream of rank changes are the changes to the a priori ranks for each page. If you have some external reason that guardian.co.uk is meritorious, at least more so than xxx.hot-llama-action.vt, you can supply these as initial changes. Moreover, you can live-update these as well. Try it!

Addendum: Bugzes! (2017-09-06)

It turns out there were bugs! A few were just silly errors that demonstrated that I couldn't handle the round-off error when distributing an integral number of things between an integral number of other things. But, there was a serious business error that I thought I would explain.

Ok, recall that the way the PageRanking code works is by associating a rank with each page, stored in ranks[page]. The intent at all times is that ranks[page] should be (i) scaled down just a bit (a factor of 5/6 in the code), and then (ii) distributed uniformly among outgoing edges. Whenever ranks[page] changes, or the outgoing edges change, we undo whatever distribution of rank we had previously, re-do the new distribution, and ship any resulting differences.

Conceptually, this is still correct. All the math is still right. However, there is an annoying problem with neither integers nor floating point numbers actually being real numbers. This annoying problem meant that our computation might not converge in some contexts.

Imagine ranks[page] changes by +1, which means there is now one more unit of rank we could distribute. Or it might get absorbed at the node. It all depends on how ranks[page] and rank[page] + 1 get distributed. Let's take a peek at the code and see what happens:

// this method allocates some rank between elements of `edges`.
fn allocate(rank: i64, edges: &[(usize, i64)], send: &mut Vec<(usize, i64)>) {
    if !edges.is_empty() {
        assert!(rank >= 0);
        assert!(edges.iter().all(|x| x.1 > 0));

        let distribute = (rank * 5) / 6;
        let degree = edges.len() as i64;
        let share = distribute / degree;
        for i in 0 .. edges.len() {
            if (i as i64) < (distribute % (edges.len() as i64)) {
                send.push((edges[i].0, edges[i].1 * (share + 1)));
            }
            else {
                send.push((edges[i].0, edges[i].1 * share));
            }
        }
    }
}

This method is what I think one wants to distribute rank uniformly among the positive entries of edges (which we imagine have count one despite being i64). We first figure out how much rank we will actually distribute versus stash locally (distribute), and we hand out roughly distribute / degree to each neighbor. We bump this up for a few nodes so that the total is actually distribute, to make sure we aren't losing mass.

By distributing exactly distribute, we are sure that if someone changes rank by diff, then we distribute changes of at most diff, and probably more like 5 * diff / 6. Of course, when diff is small (like +/-1) we probably end up distributing exactly diff in changes.

Why would this converge? What causes small changes to stop circulating?

Ok, so they don't have to, it turns out. You could have a cycle, for example, in which every node has rank = 1, and so plans to distribute .. zero. But, if a single +1 happens to change the rank, then it will now send a +1 as an output. If each node in the cycle has an initial rank of 1 (or is otherwise poised to "not absorb" the next unit of rank it gets) each node will pass along the increment. Eventually, though, the increment circulates around and around and hits a node that wants to absorb its next unit of rank (it can't circulate forever without being absorbed, because each time it moves it increments a rank).

This sounds like it converges, so what is the problem? No problem, if all the changes are positive. Or if all the changes are negative. If there are positive and negative changes, problem.

Imagine that cycle again, but with a +1 change and a -1 change, both going around the cycle. Each +1 produces a new +1 as output, and each -1 produces a -1 as output, retracting the change. Nothing is ever absorbed, and the cycle continues forever. Or, forever-ish.

Stashing Updates

One of the main features of the algorithm we implemented, called "Update Iteration" in the paper, is that we don't have to propagate changes. Each node can just sit on some received changes if it has a mind to. In the linked paper, these were for good reasons, like "I'll go faster if I wait". We are going to hold updates back because "we might not converge otherwise", which is a special case of "go faster".

There was previously a line in the code that indicated what we should do when we receive a rank update diff for a page src. It was pretty casual, and said we should just fold in the changes:

        // 2. update ranks.
        ranks[src] += diff;

Ok, we aren't going to do that any more. Instead, we are going to maintain a stash of outstanding differences, diffs, and add diff to the corresponding entry. If the entry is now large enough, where I chose a threshold so that src would be certain to absorb at least some of diff, only then do we include the change and determine output differences.

        // 2. update ranks.
        diffs[src] += diff;
        if diffs[src].abs() >= 6 {
            ranks[src] += diffs[src];
            diffs[src] = 0;
        }

Now, you might say "we may converge, but not to the right answer!", and that is .. true. We can always get the correct answer out, by only circulating positive updates for a while, then only circulating negative updates, but we can also not worry too much about it, because the amount we are holding back is within rounding error. We are holding the changes back because 5 * (ranks[src] + diff) / 6 equals 5 * ranks[src] / 6 + diff; if that isn't true we can just send the changes along and be assured that the total error decreases. Hence the threshold 6, by the way. We could also just directly test that rounding condition, if we had a mind to.

Performance

This change "speeds up" the computation a bit, especially once we've started running and make relatively few changes each round. If you recall from above, it took about 15 seconds (from 57.405 to 72.870) to go from round 100 to round 1000. These are the rounds where we make sparse updates (one change per round), and the only work in each round is the distribution of relatively small and relatively few updates. With the change above, the small updates are held back until they grow big enough, leading to much less computation (and a different answer).

It now takes under two seconds to move from round 100 to round 1000.

Duration { secs: 59, nanos: 698451552 }:        (Root, 97)      3283    6936    361
Duration { secs: 59, nanos: 700766979 }:        (Root, 98)      3611    7785    1016
Duration { secs: 59, nanos: 703259610 }:        (Root, 99)      3921    8629    707
Duration { secs: 59, nanos: 705412769 }:        (Root, 100)     3383    8396    662
...
Duration { secs: 61, nanos: 420612280 }:        (Root, 999)     2467    5814    500
Duration { secs: 61, nanos: 422341008 }:        (Root, 1000)    2716    5770    529
Duration { secs: 61, nanos: 423996542 }:        (Root, 1001)    2325    3969    45
Duration { secs: 61, nanos: 425271051 }:        (Root, 1002)    1894    2356    7
Duration { secs: 61, nanos: 426159112 }:        (Root, 1003)    1368    1379    2
Duration { secs: 61, nanos: 426756059 }:        (Root, 1004)    857     860     2
Duration { secs: 61, nanos: 427136115 }:        (Root, 1005)    537     540     2
Duration { secs: 61, nanos: 427399284 }:        (Root, 1006)    341     346     2
Duration { secs: 61, nanos: 427546367 }:        (Root, 1007)    182     187     2
Duration { secs: 61, nanos: 427634718 }:        (Root, 1008)    105     106     2
Duration { secs: 61, nanos: 427694704 }:        (Root, 1009)    74      75      2
Duration { secs: 61, nanos: 427732237 }:        (Root, 1010)    42      45      3
Duration { secs: 61, nanos: 427755162 }:        (Root, 1011)    21      21      1

We do still end in a state where if we played forward all of the positive updates, and then all of the negative updates, we would still reach the same limit as before. We are just more circumspect while executing, to make sure we don't end up in some defective behavior.

Plus, holding back updates leads to improved performance. We have an order of magnitude higher throughput, and we only sacrifice accuracy transiently. Aren't we clever!