-
Notifications
You must be signed in to change notification settings - Fork 4
Architecture
This document aims to serve as a high-level illustration of the major
components of congregation, and a helpful guide to anyone looking
to contribute to the project. To that end, each of the following core
modules will be described in more detail below:
- lang
- dag
- comp
- part
- codegen
- dispatch
- net
- assemble
(congregation/lang/*
)
lang
is an SQL-like frontend which is the user-facing component of
congregation, and contains all the relational functions that a user is able
to use when building workflows. From a user's perspective, each function
in the lang
module represents a single SQL query which takes some
input data, performs an operation on it, and produces some output. From
a developer's perspective, however, each query returns a Node object,
which is then passed to other queries in order to construct a DAG object,
which will be discussed in the next section. For documentation on the various
queries available, refer to the Queries and Examples sections.
(congregation/dag/*
)
This module is invoked by the frontend in order to construct a directed
acyclic graph (DAG), which is ingested by the compiler. It contains both
the high-level Dag class, and the various Node classes which can be
chained together to form a Dag object. For example, consider the following
workflow:
from congregation import create_column, lang
a = create_column("a", "INTEGER")
b = create_column("b", "INTEGER")
c = create_column("c", "INTEGER")
d = create_column("d", "INTEGER")
e = create_column("e", "INTEGER")
f = create_column("f", "INTEGER")
g = create_column("g", "INTEGER")
h = create_column("h", "INTEGER")
i = create_column("i", "INTEGER")
rel_one = lang.create("in1", [a, b, c], {1})
rel_two = lang.create("in2", [d, e, f], {2})
rel_three = lang.create("in3", [g, h, i], {3})
cc = lang.concat([rel_one, rel_two, rel_three], "cc")
agg = lang.aggregate(cc, "agg", ["b"], "a", "sum")
mult = lang.multiply(agg, "mult", "a", [7])
lang.collect(mult, {1, 2, 3})
Here we have 3 input datasets rel_one
, rel_two
, and rel_three
,
owned by parties 1
, 2
, and 3
, respectively. First, the three datasets are
vertically concatenated. Then, a sum aggregation over column a
and keyed
on column b
is applied, followed by a multiplication of the values in column
a
by a scalar (7
) before being returned to the input parties. From this
sequence of operations, congregation would construct a Dag object equivalent
to the following:
Each node in this Dag represents an operation to be applied on an input relation,
and each vertex corresponds to some output relation. For congregation (and more
generally, relational) workflows, a relation is just a two-dimensional array of values,
like a CSV file. In the above image, each node can be thought of as receiving an input
relation and performing some operation on it, which produces some output relation. Thus,
the vertices in the image are named according to the names given to the relations in the
workflow code.
(congregation/comp/*
)
Congregation determines which optimizations are possible for a given workflow by
reasoning about the structure of it's corresponding Dag. The comp
module is
responsible both for executing these optimizations and inserting various internal
operator nodes that are used by the part
and codegen
modules later on.
In the example above, it would be possible for each input party to perform a local
aggregation over their own data before submitting it to MPC. This could result in a
considerable speedup, since there would be much less computation done under MPC if
any of the input datasets were large. The first phase of the comp
module (named
PushDown, at comp/push_down.py
) would identify this possibility, and restructure
the Dag object accordingly, yielding the following:
In the above image, we see that the aggregation which was previously singular has now
been split across the concat node into two distinct aggregations -- one computed locally
before any MPC has been done, and one after the input datasets have been combined. Similar
optimizations are performed for other operators where appropriate during this phase.
After the PushDown phase, where congregation sought to push the upper MPC boundary downwards
within the Dag, congregation will try to push the lower MPC boundary upwards using a similar
logic. In the example above, the output relation of the aggregate_mpc
node is passed to a
multiply
operator. Since the outputs of a multiplication are reversible (i.e. - if you know
both the outputs and the number they were multiplied by, you can infer the input), congregation
will rewrite the trust relations for that node so that it will be computed outside of MPC by
each compute party individually.
There are several compiler phases related to inserting internal nodes into the Dag. The first
two such phases, InsertCloseOps and InsertOpenOps, just insert Close and Open nodes into the
Dag to serve as signposts for the part
module, which is responsible for splitting the Dag into
discrete local and MPC workflows. The next two phases, InsertReadOps and InsertStoreOps, just
create operators for file I/O operations. Using the same workflow as an example, these four phases
together would yield the following:
This completed dag is then passed to the part
module to be partitioned, which will be described
in the next section. The state of the input Dag across the compiler phases is included below for
reference:
(congregation/part/*
)
The part
module is responsible for transforming the compiled Dag into a list of discrete Dags
that each represent some local or MPC computation. The partitioning algorithm currently used is
really simple -- it just traverses the Dag and looks for Open
and Close
nodes, and returns
all the Dags that result from cutting vertices at those points. So, the final Dag from the graph
above would result in the following partitions:
Note that each of the three Dags in the above diagram represent some work that will be done by
a single compute party.
Now each partition represents a single job, which is part of the larger workflow. More specifically,
part
would generate the following list for the above workflow:
partition = [
(local_0, "python"),
(local_1, "python"),
(local_2, "python"),
(mpc_0, "jiff"),
(local_3, "python")
]
Next, the codegen
module will iterate over this list, and transform each into actual files that can
be dispatched by the compute parties.
(congregation/codegen/*
)
The codegen
module's responsibility is to generate code for specific (Dag, Backend)
pairs. This
module is designed to be pluggable, so that new backends can be easily added. To write a new backend,
one would simply add another directory to the codegen
module as follows:
|- codegen
- codegen.py
|- jiff
|- python
|- <your new backend here>
The new backend class could then be defined there as a subclass of Codegen
. Once the methods from
the Codegen
class were filled out in that subclass, the new backend would be ready to use with the
congregation system.
(congregation/dispatch/*
)
The dispatch
module is responsible for taking Job
objects and running the code that they refer to.
Job
objects are just simple wrappers that point to a directory of code, store information about which
PIDs should run it, and any backend-specific metadata that might be needed for a particular job. For networked
jobs (e.g. Jiff jobs), the dispatch module runs synchronization protocols (using functionality provided by
the net
module) to ensure that parties wait to launch until they're all ready, or until a Jiff server is
available (if a compute party is also running the Jiff server). Developers are also free to write custom
protocols using the building blocks provided by the net
module.
Like the codegen
module, each compute backend requires its own dedicated Dispatcher
class. Adding a new
dispatcher would be identical to the codegen
example above:
|- dispatch
- dispatcher.py
|- jiff.py
|- python.py
|- <your new dispatcher here>
(congregation/net/*
)
The net
module interfaces with the various Dispatcher
types via the Peer
class (net.peer.Peer
). This
class contains methods for sending message types that serve as building blocks for more complicated protocols.
Existing message types are defined at net/messages/*
, and this list can be easily extended by a developer for
needs that might arise with a new compute backend. Note, though, that any new message types added must be accompanied
by a specific handler method for each (defined at net/handler.py
).
(congregation/assemble/*
)
In addition to providing a singular entrypoint for launching congregation workflows (Assemble.generate_and_dispatch()
),
the assemble
module is written so as to separate the various protocol steps into individual pieces, allowing a
developer to call each step individually while debugging a particular workflow or testing changes. Looking at the
methods themselves, each module described above is referenced accordingly:
* compile()
* partition()
* generate_code()
* generate_jobs()
* setup_peer()
* dispatch_jobs()