Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Edge Group Apply API #358

Merged
merged 55 commits into from
Feb 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
d47ba0e
add rtfd
VoVAllen Jan 7, 2019
a47f881
rrr
VoVAllen Jan 7, 2019
ab67eee
update
VoVAllen Jan 7, 2019
f0c108e
change env
VoVAllen Jan 7, 2019
b0ec4ec
temp fix
VoVAllen Jan 7, 2019
b7fe6a7
update
VoVAllen Jan 7, 2019
50fed85
fix
VoVAllen Jan 7, 2019
c5a7726
fix
VoVAllen Jan 7, 2019
c9c4e10
add
VoVAllen Jan 8, 2019
8b853d9
conf
VoVAllen Jan 8, 2019
5e84d38
Move file_pattern from Makefile to conf.py
VoVAllen Jan 8, 2019
932c165
remove yml
VoVAllen Jan 8, 2019
7f77957
fix
VoVAllen Jan 8, 2019
7ec49b6
fix
VoVAllen Jan 8, 2019
6ba2908
fix
VoVAllen Jan 8, 2019
f57cfad
fix
VoVAllen Jan 8, 2019
8617945
remove yml
VoVAllen Jan 8, 2019
9c30122
remove yml
VoVAllen Jan 8, 2019
3fcbca7
add doc docker
VoVAllen Jan 9, 2019
580ca67
add dgl install script
VoVAllen Jan 9, 2019
e450806
change name
VoVAllen Jan 9, 2019
29c3ebf
change dockerfile
VoVAllen Jan 9, 2019
0529572
fix
Jan 9, 2019
9e5fb09
name
VoVAllen Jan 9, 2019
524301d
add
VoVAllen Jan 9, 2019
98a5aa7
Merge branch 'master' of https://github.com/VoVAllen/dgl
VoVAllen Jan 9, 2019
71e411d
fix
Jan 9, 2019
8d787b1
fix
Jan 10, 2019
05f849c
fix
Jan 10, 2019
696bd58
fix
Jan 10, 2019
a6d2664
fix docker
Jan 12, 2019
4dc5377
merge upstream master
VoVAllen Jan 14, 2019
8dd9562
delete sphinx.py for doc-build backend
VoVAllen Jan 14, 2019
5b457f0
Add softmax to test backend
VoVAllen Jan 16, 2019
6c9109c
Add group apply function and tests
VoVAllen Jan 16, 2019
006b2b7
Merge branch 'group-apply' of https://github.com/VoVAllen/dgl into gr…
VoVAllen Jan 16, 2019
af2adc1
Delete unnecessary file
VoVAllen Jan 16, 2019
5f008bf
Update comments and test
VoVAllen Jan 17, 2019
b3dcafb
Merge branch 'master' into group-apply
VoVAllen Jan 17, 2019
6b6b830
Fix lint
VoVAllen Jan 17, 2019
10735ab
Merge remote-tracking branch 'origin/group-apply' into group-apply
VoVAllen Jan 17, 2019
8e9c9f2
remove unused bucketing code
lingfanyu Jan 19, 2019
82aa117
group apply edge bucketing code
lingfanyu Jan 19, 2019
3463a21
gen degree bucket schedule for group apply edge
lingfanyu Jan 19, 2019
0d53cae
schedule and graph code
lingfanyu Jan 19, 2019
3997a4b
fix compiling
lingfanyu Jan 19, 2019
ccf8e91
fix
lingfanyu Jan 19, 2019
4a714bf
fix lint
lingfanyu Jan 19, 2019
f824142
naming
lingfanyu Jan 19, 2019
725bf81
harder test case
lingfanyu Jan 19, 2019
b4de7a8
fix comments
lingfanyu Jan 19, 2019
ff69f92
Merge branch 'master' into group-apply
lingfanyu Feb 2, 2019
371fff3
more comments
lingfanyu Feb 2, 2019
70966fe
tweak function name
lingfanyu Feb 2, 2019
bd4f7d5
Merge branch 'master' into group-apply
VoVAllen Feb 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/dgl/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define DGL_GRAPH_H_

#include <vector>
#include <string>
#include <cstdint>
#include <utility>
#include <tuple>
Expand Down
22 changes: 21 additions & 1 deletion include/dgl/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace sched {
* \note If there are multiple messages going into the same destination vertex, then
* there will be multiple copies of the destination vertex in vids
* \return a vector of 5 IdArrays for degree bucketing. The 5 arrays are:
* degrees: of degrees for each bucket
* degrees: degrees for each bucket
* nids: destination node ids
* nid_section: number of nodes in each bucket (used to split nids)
* mids: message ids
Expand All @@ -32,6 +32,26 @@ namespace sched {
std::vector<IdArray> DegreeBucketing(const IdArray& msg_ids, const IdArray& vids,
const IdArray& recv_ids);

/*!
* \brief Generate degree bucketing schedule for group_apply edge
* \param uids One end vertex of edge by which edges are grouped
* \param vids The other end vertex of edge
* \param eids Edge ids
* \note This function always generate group_apply schedule based on degrees of
* nodes in uids. Therefore, if group_apply by source nodes, then uids
* should be source. If group_apply by destination nodes, then uids
* should be destination.
* \return a vector of 5 IdArrays for degree bucketing. The 5 arrays are:
* degrees: degrees for each bucket
* new_uids: uids reordered by degree bucket
* new_vids: vids reordered by degree bucket
* new_edis: eids reordered by degree bucket
* sections: number of edges in each degree bucket (used to partition
* new_uids, new_vids, and new_eids)
*/
std::vector<IdArray> GroupEdgeByNodeDegree(const IdArray& uids,
const IdArray& vids, const IdArray& eids);

} // namespace sched

} // namespace dgl
Expand Down
84 changes: 84 additions & 0 deletions python/dgl/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,90 @@ def apply_edges(self, func="default", edges=ALL, inplace=False):
inplace=inplace)
Runtime.run(prog)

def group_apply_edges(self, group_by, func, edges=ALL, inplace=False):
"""Group the edges by nodes and apply the function on the grouped edges to
update their features.

Parameters
----------
group_by : str
Specify how to group edges. Expected to be either 'src' or 'dst'
func : callable
Apply function on the edge. The function should be
an :mod:`Edge UDF <dgl.udf>`. The input of `Edge UDF` should
be (bucket_size, degrees, *feature_shape), and
return the dict with values of the same shapes.
edges : valid edges type, optional
Edges on which to group and apply ``func``. See :func:`send` for valid
edges type. Default is all the edges.
inplace: bool, optional
If True, update will be done in place, but autograd will break.

Notes
-----
On multigraphs, if :math:`u` and :math:`v` are specified, then all the edges
between :math:`u` and :math:`v` will be updated.

Examples
--------

.. note:: Here we use pytorch syntax for demo. The general idea applies
to other frameworks with minor syntax change (e.g. replace
``torch.tensor`` with ``mxnet.ndarray``).

>>> import torch as th

>>> g = dgl.DGLGraph()
>>> g.add_nodes(4)
>>> g.add_edges(0, [1, 2, 3])
>>> g.add_edges(1, [2, 3])
>>> g.add_edges(2, [2, 3])
>>> g.edata['feat'] = th.randn((g.number_of_edges(), 1))

>>> # Softmax over the out edges of each node
>>> # Second dimension of edges.data is the degree dimension
>>> def softmax_feat(edges): return {'norm_feat': th.softmax(edges.data['feat'], dim=1)}
>>> g.group_apply_edges(func=softmax_feat, group_by='src') # Apply func to the first edge.
>>> u, v, eid = g.out_edges(1, form='all')
>>> in_feat = g.edata['feat'][eid]
>>> out_feat = g.edata['norm_feat'][eid]
>>> print(out_feat - th.softmax(in_feat, 0))
tensor([[0.],
[0.]])

See Also
--------
apply_edges
"""
assert func is not None

if group_by not in ('src', 'dst'):
raise DGLError("Group_by should be either src or dst")

if is_all(edges):
u, v, _ = self._graph.edges()
eid = utils.toindex(slice(0, self.number_of_edges()))
elif isinstance(edges, tuple):
u, v = edges
u = utils.toindex(u)
v = utils.toindex(v)
# Rewrite u, v to handle edge broadcasting and multigraph.
u, v, eid = self._graph.edge_ids(u, v)
else:
eid = utils.toindex(edges)
u, v, _ = self._graph.find_edges(eid)

with ir.prog() as prog:
scheduler.schedule_group_apply_edge(graph=self,
u=u,
v=v,
eid=eid,
apply_func=func,
group_by=group_by,
inplace=inplace)
Runtime.run(prog)


def send(self, edges=ALL, message_func="default"):
"""Send messages along the given edges.

Expand Down
187 changes: 151 additions & 36 deletions python/dgl/runtime/degree_bucketing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
from __future__ import absolute_import

from .._ffi.function import _init_api
from ..base import is_all
from .. import backend as F
from ..udf import NodeBatch
from ..udf import NodeBatch, EdgeBatch
from .. import utils

from . import ir
Expand Down Expand Up @@ -98,41 +97,9 @@ def _degree_bucketing_schedule(mids, dsts, v):
"""
buckets = _CAPI_DGLDegreeBucketing(mids.todgltensor(), dsts.todgltensor(),
v.todgltensor())
return _process_buckets(buckets)
return _process_node_buckets(buckets)

def _degree_bucketing_for_edges(dsts):
"""Return the bucketing by degree scheduling for destination nodes of
messages

Parameters
----------
dsts: utils.Index
destination node for each message
"""

buckets = _CAPI_DGLDegreeBucketingForEdges(dsts.todgltensor())
return _process_buckets(buckets)

def _degree_bucketing_for_graph(graph, v):
"""Return the bucketing by degree scheduling given graph index and optional
dst nodes

Parameters:
-----------
graph: GraphIndex
DGLGraph Index (update all case) or message graph index (recv cases)
v: utils.Index
Destination nodes (recv cases)
"""

if is_all(v):
buckets = _CAPI_DGLDegreeBucketingForFullGraph(graph._handle)
else:
buckets = _CAPI_DGLDegreeBucketingForRecvNodes(graph._handle,
v.todgltensor())
return _process_buckets(buckets)

def _process_buckets(buckets):
def _process_node_buckets(buckets):
"""read bucketing auxiliary data

Returns
Expand Down Expand Up @@ -189,4 +156,152 @@ def _reshaped_getter(key):
return reduce_udf(nbatch)
return _rfunc_wrapper

def gen_group_apply_edge_schedule(
graph,
apply_func,
u, v, eid,
group_by,
var_nf,
var_ef,
var_out):
"""Create degree bucketing schedule for group_apply_edge

Edges will be grouped by either its source node or destination node
specified by 'group_by', and will be divided into buckets in which
'group_by' nodes have the same degree. The apply_func UDF will be applied
to each bucket. The per-bucket result will be merged according to the
*unique-ascending order* of the edge ids.

Parameters
----------
graph : DGLGraph
DGLGraph to use
apply_func: callable
The edge_apply_func UDF
u: utils.Index
Source nodes of edges to apply
v: utils.Index
Destination nodes of edges to apply
eid: utils.Index
Edges to apply
group_by: str
If "src", group by u. If "dst", group by v
var_nf : var.FEAT_DICT
The variable for node feature frame.
var_ef : var.FEAT_DICT
The variable for edge frame.
var_out : var.FEAT_DICT
The variable for output feature dicts.
"""
if group_by == "src":
buckets = _degree_bucketing_for_edge_grouping(u, v, eid)
degs, uids, vids, eids = buckets
elif group_by == "dst":
buckets = _degree_bucketing_for_edge_grouping(v, u, eid)
degs, vids, uids, eids = buckets
else:
raise DGLError("group_apply_edge must be grouped by either src or dst")

idx_list = []
fd_list = []
for deg, u_bkt, v_bkt, eid_bkt in zip(degs, uids, vids, eids):
# create per-bkt efunc
_efunc = var.FUNC(_create_per_bkt_efunc(graph, apply_func, deg,
u_bkt, v_bkt, eid_bkt))
# vars
var_u = var.IDX(u_bkt)
var_v = var.IDX(v_bkt)
var_eid = var.IDX(eid_bkt)
# apply edge UDF on each bucket
fdsrc = ir.READ_ROW(var_nf, var_u)
fddst = ir.READ_ROW(var_nf, var_v)
fdedge = ir.READ_ROW(var_ef, var_eid)
fdedge = ir.EDGE_UDF(_efunc, fdsrc, fdedge, fddst, ret=fdedge) # reuse var
# save for merge
idx_list.append(var_eid)
fd_list.append(fdedge)

# merge buckets according to the ascending order of the edge ids.
all_idx = F.cat([idx.data.tousertensor() for idx in idx_list], dim=0)
_, order = F.sort_1d(all_idx)
var_order = var.IDX(utils.toindex(order))
ir.MERGE_ROW(var_order, fd_list, ret=var_out)

def _degree_bucketing_for_edge_grouping(uids, vids, eids):
"""Return the edge buckets by degree and grouped nodes for group_apply_edge

Parameters
----------
degree
uids: utils.Index
node id of one end of eids, based on which edges are grouped
vids: utils.Index
node id of the other end of eids
eids: utils.Index
edge id for each edge
"""
buckets = _CAPI_DGLGroupEdgeByNodeDegree(uids.todgltensor(),
vids.todgltensor(),
eids.todgltensor())
return _process_edge_buckets(buckets)

def _process_edge_buckets(buckets):
"""read bucketing auxiliary data for group_apply_edge buckets

Returns
-------
degrees: numpy.ndarray
A list of degree for each bucket
uids: list of utils.Index
A list of node id buckets, nodes in each bucket have the same degree
vids: list of utils.Index
A list of node id buckets
eids: list of utils.Index
A list of edge id buckets
"""
# get back results
degs = buckets(0).asnumpy()
uids = utils.toindex(buckets(1))
vids = utils.toindex(buckets(2))
eids = utils.toindex(buckets(3))
# XXX: convert directly from ndarary to python list?
sections = buckets(4).asnumpy().tolist()

# split buckets and convert to index
def split(to_split):
res = F.split(to_split.tousertensor(), sections, 0)
return map(utils.toindex, res)

uids = split(uids)
vids = split(vids)
eids = split(eids)
return degs, uids, vids, eids

def _create_per_bkt_efunc(graph, apply_func, deg, u, v, eid):
"""Internal function to generate the per degree bucket edge UDF."""
batch_size = len(u) // deg
def _efunc_wrapper(src_data, edge_data, dst_data):
def _reshape_func(data):
def _reshaped_getter(key):
feat = data[key]
new_shape = (batch_size, deg) + F.shape(feat)[1:]
return F.reshape(feat, new_shape)
return _reshaped_getter

def _reshape_back(data):
shape = F.shape(data)[2:]
new_shape = (batch_size * deg,) + shape
return F.reshape(data, new_shape)

reshaped_src_data = utils.LazyDict(_reshape_func(src_data),
src_data.keys())
reshaped_edge_data = utils.LazyDict(_reshape_func(edge_data),
edge_data.keys())
reshaped_dst_data = utils.LazyDict(_reshape_func(dst_data),
dst_data.keys())
ebatch = EdgeBatch(graph, (u, v, eid), reshaped_src_data,
reshaped_edge_data, reshaped_dst_data)
return {k: _reshape_back(v) for k, v in apply_func(ebatch).items()}
return _efunc_wrapper

_init_api("dgl.runtime.degree_bucketing")
Loading