Skip to content

Commit

Permalink
Address several comments on the design.
Browse files Browse the repository at this point in the history
This clarifies the consumer behavior with respect to positioning in the
`step` partitions, renames `step` to `input-step`, explains how input
steps are triggered and gives rationale for the design, explains how
the algorithm can know if output has been produced, and describes an
optional `output-step` Kafka topic.

Thanks to @ryzhyk for raising related issues in
#7 and
#3.
  • Loading branch information
blp committed Jul 25, 2023
1 parent d457e25 commit 8529146
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 50 deletions.
163 changes: 128 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ snapshot of the following between two steps of the computation:
2. **State** inside the circuit across all the workers.
3. **Output** produced by the circuit in a previous step but not yet
acknowledged by its destination.

We need the input, output, and state captured at the same step, or to
be able to recompute them. There are multiple ways to achieve this,
as summarized in [Fault Tolerance Approaches](approaches.md). We
Expand All @@ -126,7 +126,7 @@ from that point, discarding any duplicate output that this produces.

The following sections describe some details for input and output and
state.

## Input and output

We adopt Apache Kafka for input and output. Kafka is already widely
Expand All @@ -142,22 +142,34 @@ each with `W` partitions[^1]:
writes to these topics. Worker `w` reads from partition `w` in each
of these topics.

* `output[0]` through `output[K-1]`. Worker `w` writes to partition `w`
in each of these topics. Whatever is reading output from DBSP reads
from these topics.
* `output[0]` through `output[K-1]`. Worker `w` writes to partition
`w` in each of these topics. Whatever is reading output from DBSP
reads from these topics. The producer tags each event, in a [Kafka
header](kafka.md#events), with the sequence number of the step that
produced it.

We also need to keep track of how the input was divided up into
batches for the computation. We use another Kafka topic with `W`
partitions for this:

* `step`. Each event is an `H`-element array of event offsets. If
partition `w` with event offset `seq` holds such an array
* `input-step`. Each event is an `H`-element array of event offsets.
If partition `w` with event offset `seq` holds such an array
`offsets[]`, then worker `w` included `input[h]` up to `offsets[h]`
in its computations for sequence number `seq`.

There is a required invariant: if `e` is the least number of events
in any partition in `output`, then every partition in `step` has at
least `e` events.
in any partition in `output`, then every partition in `input-step`
has at least `e` events.

To a lesser extent, we also need to keep track of which step sequence
number corresponds to which output. Tagging `output[*]` with a Kafka
header as described above may be sufficient. If necessary, we can add
another Kafka topic with `W` partitions:

* `output-step`: Each event is an `K`-element array of event offsets.
If partition `w` with event offset `seq` holds such an array
`offsets[]`, then worker `w` produced `output[k]` up to `offsets[k]`
as output for the step with sequence number `seq`.

[^1]: Section [Scale in and out](#scale-in-and-out) generalizes to the
case where the number of Kafka partitions differs from the number
Expand All @@ -178,6 +190,50 @@ the whole computation. We could instead use a distributed database
that includes fault tolerance of its own. FoundationDB is a good
candidate.

## Triggering DBSP steps

DBSP steps work on batches of input data. The workers must agree when
to trigger a step. Two approaches come to mind:

* **Centralized trigger**, in which a coordinator tells each host to
trigger a step and how much data to include in the step. To trigger
a step, the coordinator produces an event to each partition in
`input-step`, and each host consumes the partitions for its own
workers.

With this approach, it is conceptually simple to trigger a step
based on properties that span all the workers, such as "`input[0]`
received at least 1000 events total across all its partitions".

This approach does not seem well suited for Kafka, because Kafka
does not appear to support monitoring a partition for new data
without reading the data from that partition. Thus, a coordinator
that wanted to evaluate a property like the one above would have to
consume all of the data from all of partitions in `input[0]`, which
would be a bottleneck. (Kafka would allow a coordinator to
periodically poll all the partitions for new data without consuming
it, so this could

* **Distributed trigger**, in which each host waits until it is
independently ready to trigger a step. When a host is ready, it
produces an event into the partitions for its workers in
`input-step`. Each host also waits for other hosts to produce
events into their partitions and, as soon as any one of them does,
it produces an event into the partitions for its own workers. Thus,
a step begins as soon as any one of the hosts is ready.

In this approach, it is hard to step based on a property that spans
all the workers. To do so, one could introduce extra communication
among the hosts or with a coordinator.

This approach should work well with Kafka. All the hosts need to
consume all of the partitions in `input-step`, but this should not
increase network traffic very much because their events should be
small arrays of event offsets.

We adopt the distributed trigger approach for the reasons described
above.

## Algorithm

On a given host, take the following variables:
Expand All @@ -186,9 +242,13 @@ On a given host, take the following variables:
* `dbsp`, the `DBSPHandle` for the host's multithreaded circuit runtime.
* `input_handles`, the vector of `H` `CollectionHandle` input handles.
* `output_handles`, the vector of `K` `OutputHandle` output handles.
* `output_tail[*]`, a `K`-element vector. Each element of
`output_tail[k]` is a map from each worker `w` in `workers` to the
sequence number of the last step that appears in `output[k]` in
partition `w`. (These can be obtained by reading the sequence
number header in the last event in each partition in `output[k]`.)
* `consumer`, a Kafka consumer, subscribed to partitions `workers` in
`input[*]` and to all partitions in `step`, all initially positioned
at the start of the partition.
`input[*]` and to all partitions in `input-step`.
* `producer`, a Kafka producer for the same broker as `consumer`.

On each worker:
Expand All @@ -198,14 +258,28 @@ On each worker:
naturally be the same across all workers. See [Coordinated
commit](rocksdb.md#coordinated-commit) for details for two ways to
do this.

- Open a transaction on the state database (if one isn't already
open).

Then on the host, for `seq` in `start_seq..`:

- Seek `consumer` in every partition of `input-step` to `start_seq`.
This is either the offset of an event that exists, or just past the
end. If it's later than that, or if it's so early that Kafka
already expired it, that's a fatal error.

- Seek `consumer` in every subscribed partition of `input[*]` to the
position that corresponds to `start_seq`. (To find this position,
we actually need to read the event with event offset `start_seq - 1`
in `input-step`.)

Then on the host, for `seq` in `start_seq..`, repeat the following.
At the beginning of each iteration, `consumer` is positioned at event
offset `seq` in every partition of `input-step` and at the offset
corresponding to `seq` in the subscribed partitions in `input[*]`.

1. Put data into the circuit for this step. First, attempt to read
one event from each partition in `workers` from `step`:
one event from each partition in `workers` from `input-step`, to obtain
the input step size for `seq`:

* If we get an event for one or more of the workers, then we're
replaying the log after restarting. In this case, there will
Expand All @@ -218,27 +292,35 @@ Then on the host, for `seq` in `start_seq..`:
once per program run.

For `h` in `0..H`, for `w` in `workers`, read as much data from
`input[h]` in partition `w` as `step` said (or nothing, if we
filled it in as empty), and feed it to input handle `h` for
`input[h]` in partition `w` as `input-step` said (or nothing, if
we filled it in as empty), and feed it to input handle `h` for
worker `w`.

Afterward, our consumer in `input-step` is at offset `seq + 1` in
the partitions in `workers`, and `seq` in the rest.

* Otherwise, if we didn't get any events, we're running normally
instead of replaying. Block until we read one event from any
partition in `step` or until we can read any number of events
from partitions `workers` in `input[*]`.
partition in `input-step` or until we can read any number of
events from partitions `workers` in `input[*]`.

If we read any events from `step` in any partition in `workers`,
that's a fatal error. It means that some other process is
running as one of our workers.
If we read any events from `input-step` in any partition in
`workers`, that's a fatal error. It means that some other
process is running as one of our workers.

For `h` in `0..H`, for `w` in `workers`, feed what we got (if
anything) from `input[h]` in partition `w` to input handle `h`
for worker `w`.

For `w` in `workers`, write to `step` what we just fed to the
circuit. (We need to write this immediately, because the other
hosts cannot continue before they read it.)

For `w` in `workers`, write to `input-step` what we just fed to
the circuit. (We need to write this immediately, because the
other hosts cannot continue past step 7 before before they read
it.)

Afterward, our consumer in `input-step` is at offset `seq + 1`
for `workers` and at least one other partition, and `seq` in any
others.

This step can mostly be parallelized into the individual workers,
but some care is needed for the blocking part in the "normal" path.

Expand All @@ -250,17 +332,23 @@ Then on the host, for `seq` in `start_seq..`:

6. For `k` in `0..K`, for `w` in `workers`, write data from output
handle `k` in worker `w` to partition `w` of `output[k]`, unless
that output was already produced in a previous run.
`seq <= output_tail[k][w]`[^2]. If we decide that `output-step`
should exist, also write an event to partition `w` of
`output-step`.

This can be run in parallel across the individual workers.

7. Read one event from `step` for each partition where we didn't
already read one in step 1 above, blocking as necessary. Also
block until our own write or writes to `step` commit.

This is necessary because we need all of `step` to commit before
any of `output[*]`. Otherwise, we could violate `step`'s
invariant.
7. Our consumer in `input-step` is at offset `seq + 1` in some
partitions and may be at offset `seq` in others. In the partitions
where we're at offset `seq`, if any, read one event, blocking as
necessary. Afterward, the consumer is now at offset `seq + 1` in
every partition.

Also block until our own write or writes to `input-step` commit.

This is necessary because we need all of `input-step` to commit
before any of `output[*]`. Otherwise, we could violate
`input-step`'s invariant.

8. Commit the Kafka transaction.

Expand All @@ -270,6 +358,11 @@ Then on the host, for `seq` in `start_seq..`:
database implementation can still roll back to the most recently
coordinated commit.)

[^2]: If `seq <= output_tail[k][w]`, then this output was already
produced in a previous run. DBSP is designed to produce deterministic
output, so in this case we could read the old output and compare it
against the new and issue a warning or a fatal error if it differs.

# Scale in and out

The computational resources in our distributed system are hosts and
Expand All @@ -283,7 +376,7 @@ scale-in/out is how to distribute and re-distribute data among the
workers for efficient processing. The following sections describe
strategies for DBSP input and output and for the exchange operator.

### Input and output
### Input and output

We have described distributed DBSP such that the number of workers `W`
equals the number of partitions `P` in each Kafka topic for input and
Expand Down
31 changes: 16 additions & 15 deletions approaches.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ handles. We use the following Kafka topics, each with `W` partitions:
in each of these topics. Whatever is reading output from DBSP reads
from these topics.

* `step`. Each event is an `H`-element array of event offsets.
* `input-step`. Each event is an `H`-element array of event offsets.
If `offsets[]` is the array in the event in partition `w` with event
offset `seq`, then worker `w` included `input[h]` up to `offsets[h]`
in its computations for sequence number `seq`.

There is a required invariant: if `e` is the least number of events
in any partition in `output`, then every partition in `step` has at
least `e` events.
in any partition in `output`, then every partition in `input-step`
has at least `e` events.

On a given host, take the following variables:

Expand All @@ -41,14 +41,14 @@ On a given host, take the following variables:
* `input_handles`, the vector of `H` `CollectionHandle` input handles.
* `output_handles`, the vector of `K` `OutputHandle` output handles.
* `consumer`, a Kafka consumer, subscribed to partitions `workers` in
`input[*]` and to all partitions in `step`, all initially positioned
at the start of the partition.
`input[*]` and to all partitions in `input-step`, all initially
positioned at the start of the partition.
* `producer`, a Kafka producer for the same broker as `consumer`.

For `seq` in `0..`:

1. Put data into the circuit for this step. First, attempt to read
one event from each partition in `workers` from `step`:
one event from each partition in `workers` from `input-step`:

* If we get an event for one or more of the workers, then we're
replaying the log after restarting. In this case, there will
Expand All @@ -67,20 +67,20 @@ For `seq` in `0..`:

* Otherwise, if we didn't get any events, we're running normally
instead of replaying. Block until we read one event from any
partition in `step` or until we can read any number of events
partition in `input-step` or until we can read any number of events
from partitions `workers` in `input[*]`.

If we read any events from `step` in any partition in `workers`,
If we read any events from `input-step` in any partition in `workers`,
that's a fatal error. It means that some other process is
running as one of our workers.

For `h` in `0..H`, for `w` in `workers`, feed what we got (if
anything) from `input[h]` in partition `w` to input handle `h`
for worker `w`.

For `w` in `workers`, write to `step` what we just fed to the
circuit. (We need to commit this write immediately, so that the
other hosts can read it before they commit their `output`
For `w` in `workers`, write to `input-step` what we just fed to
the circuit. (We need to commit this write immediately, so that
the other hosts can read it before they commit their `output`
changes. Otherwise, we could violate the invariant.)

This step can mostly be parallelized into the individual workers,
Expand All @@ -94,12 +94,13 @@ For `seq` in `0..`:
handle `k` in worker `w` to partition `w` of `output[k]`, unless
that output was already produced in a previous run.

7. Read one event for `step` for each partition where we didn't
7. Read one event for `input-step` for each partition where we didn't
already read one in step 1 above, blocking as necessary. Also
block until our own write or writes to `step` commit.
block until our own write or writes to `input-step` commit.

This is necessary because we need all of `step` to commit before
any of `output[*]`. Otherwise, we could violate the invariant.
This is necessary because we need all of `input-step` to commit
before any of `output[*]`. Otherwise, we could violate the
invariant.

8. Commit the Kafka transaction.

Expand Down

0 comments on commit 8529146

Please sign in to comment.