-
Notifications
You must be signed in to change notification settings - Fork 312
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add MG neighborhood sampling to pylibcugraph & cugraph APIs (#2118)
Closes #2108 when merged. Requires both #2088 and #2156 to be merged before, the former because this uses MGGraph, and the later because of the C implementation of neighborhood sampling. Authors: - https://github.com/betochimas - Joseph Nke (https://github.com/jnke2016) - Rick Ratzel (https://github.com/rlratzel) Approvers: - Don Acosta (https://github.com/acostadon) - Rick Ratzel (https://github.com/rlratzel) - Joseph Nke (https://github.com/jnke2016) - Chuck Hastings (https://github.com/ChuckHastings) - Jordan Jacobelli (https://github.com/Ethyling) URL: #2118
- Loading branch information
1 parent
87a170a
commit 38be932
Showing
17 changed files
with
765 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
0 1 1.0 | ||
0 2 1.0 | ||
0 3 1.0 | ||
0 4 1.0 | ||
1 5 1.0 | ||
2 5 1.0 | ||
3 5 1.0 | ||
4 5 1.0 | ||
5 6 1.0 | ||
5 7 1.0 | ||
5 8 1.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# Copyright (c) 2022, NVIDIA CORPORATION. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. |
187 changes: 187 additions & 0 deletions
187
python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
# Copyright (c) 2022, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import numpy | ||
from dask.distributed import wait, default_client | ||
|
||
import dask_cudf | ||
import cudf | ||
|
||
from pylibcugraph.experimental import (MGGraph, | ||
ResourceHandle, | ||
GraphProperties, | ||
uniform_neighborhood_sampling, | ||
) | ||
from cugraph.dask.common.input_utils import get_distributed_data | ||
from cugraph.comms import comms as Comms | ||
|
||
|
||
def call_nbr_sampling(sID, | ||
data, | ||
src_col_name, | ||
dst_col_name, | ||
num_edges, | ||
do_expensive_check, | ||
start_list, | ||
info_list, | ||
h_fan_out, | ||
with_replacement): | ||
|
||
# Preparation for graph creation | ||
handle = Comms.get_handle(sID) | ||
handle = ResourceHandle(handle.getHandle()) | ||
graph_properties = GraphProperties(is_symmetric=False, is_multigraph=False) | ||
srcs = data[0][src_col_name] | ||
dsts = data[0][dst_col_name] | ||
weights = None | ||
if "value" in data[0].columns: | ||
weights = data[0]['value'] | ||
|
||
store_transposed = False | ||
|
||
mg = MGGraph(handle, | ||
graph_properties, | ||
srcs, | ||
dsts, | ||
weights, | ||
store_transposed, | ||
num_edges, | ||
do_expensive_check) | ||
|
||
ret_val = uniform_neighborhood_sampling(handle, | ||
mg, | ||
start_list, | ||
info_list, | ||
h_fan_out, | ||
with_replacement, | ||
do_expensive_check) | ||
return ret_val | ||
|
||
|
||
def convert_to_cudf(cp_arrays): | ||
""" | ||
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper | ||
""" | ||
cupy_sources, cupy_destinations, cupy_labels, cupy_indices = cp_arrays | ||
# cupy_sources, cupy_destinations, cupy_labels, cupy_indices, | ||
# cupy_counts = cp_arrays | ||
df = cudf.DataFrame() | ||
df["sources"] = cupy_sources | ||
df["destinations"] = cupy_destinations | ||
df["labels"] = cupy_labels | ||
df["indices"] = cupy_indices | ||
# df["counts"] = cupy_counts | ||
return df | ||
|
||
|
||
def EXPERIMENTAL__uniform_neighborhood(input_graph, | ||
start_info_list, | ||
fanout_vals, | ||
with_replacement=True): | ||
""" | ||
Does neighborhood sampling, which samples nodes from a graph based on the | ||
current node's neighbors, with a corresponding fanout value at each hop. | ||
Parameters | ||
---------- | ||
input_graph : cugraph.DiGraph | ||
cuGraph graph, which contains connectivity information as dask cudf | ||
edge list dataframe | ||
start_info_list : tuple of list or cudf.Series | ||
Tuple of a list of starting vertices for sampling, along with a | ||
corresponding list of label for reorganizing results after sending | ||
the input to different callers. | ||
fanout_vals : list | ||
List of branching out (fan-out) degrees per starting vertex for each | ||
hop level. | ||
with_replacement: bool, optional (default=True) | ||
Flag to specify if the random sampling is done with replacement | ||
Returns | ||
------- | ||
result : dask_cudf.DataFrame | ||
GPU data frame containing two dask_cudf.Series | ||
ddf['sources']: dask_cudf.Series | ||
Contains the source vertices from the sampling result | ||
ddf['destinations']: dask_cudf.Series | ||
Contains the destination vertices from the sampling result | ||
ddf['labels']: dask_cudf.Series | ||
Contains the start labels from the sampling result | ||
ddf['indices']: dask_cudf.Series | ||
Contains the indices from the sampling result for path | ||
reconstruction | ||
""" | ||
# Initialize dask client | ||
client = default_client() | ||
# Important for handling renumbering | ||
input_graph.compute_renumber_edge_list(transposed=False) | ||
|
||
start_list, info_list = start_info_list | ||
|
||
if isinstance(start_list, list): | ||
start_list = cudf.Series(start_list) | ||
if isinstance(info_list, list): | ||
info_list = cudf.Series(info_list) | ||
# fanout_vals must be a host array! | ||
# FIXME: ensure other sequence types (eg. cudf Series) can be handled. | ||
if isinstance(fanout_vals, list): | ||
fanout_vals = numpy.asarray(fanout_vals, dtype="int32") | ||
else: | ||
raise TypeError("fanout_vals must be a list, " | ||
f"got: {type(fanout_vals)}") | ||
|
||
ddf = input_graph.edgelist.edgelist_df | ||
num_edges = len(ddf) | ||
data = get_distributed_data(ddf) | ||
|
||
src_col_name = input_graph.renumber_map.renumbered_src_col_name | ||
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name | ||
|
||
# start_list uses "external" vertex IDs, but since the graph has been | ||
# renumbered, the start vertex IDs must also be renumbered. | ||
start_list = input_graph.lookup_internal_vertex_id(start_list).compute() | ||
do_expensive_check = True | ||
|
||
result = [client.submit(call_nbr_sampling, | ||
Comms.get_session_id(), | ||
wf[1], | ||
src_col_name, | ||
dst_col_name, | ||
num_edges, | ||
do_expensive_check, | ||
start_list, | ||
info_list, | ||
fanout_vals, | ||
with_replacement, | ||
workers=[wf[0]]) | ||
for idx, wf in enumerate(data.worker_to_parts.items())] | ||
|
||
wait(result) | ||
|
||
cudf_result = [client.submit(convert_to_cudf, | ||
cp_arrays) | ||
for cp_arrays in result] | ||
|
||
wait(cudf_result) | ||
|
||
ddf = dask_cudf.from_delayed(cudf_result) | ||
if input_graph.renumbered: | ||
ddf = input_graph.unrenumber(ddf, "sources") | ||
ddf = input_graph.unrenumber(ddf, "destinations") | ||
|
||
return ddf |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# Copyright (c) 2022, NVIDIA CORPORATION. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from cugraph.utilities.api_tools import experimental_warning_wrapper | ||
|
||
from cugraph.dask.sampling.neighborhood_sampling import \ | ||
EXPERIMENTAL__uniform_neighborhood | ||
uniform_neighborhood_sampling = \ | ||
experimental_warning_wrapper(EXPERIMENTAL__uniform_neighborhood) |
133 changes: 133 additions & 0 deletions
133
python/cugraph/cugraph/tests/dask/test_mg_neighborhood_sampling.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
# Copyright (c) 2022, NVIDIA CORPORATION. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import gc | ||
import pytest | ||
import cugraph.dask as dcg | ||
import cugraph | ||
import dask_cudf | ||
import cudf | ||
from cugraph.dask.common.mg_utils import is_single_gpu | ||
from cugraph.tests import utils | ||
|
||
|
||
# ============================================================================= | ||
# Test helpers | ||
# ============================================================================= | ||
def setup_function(): | ||
gc.collect() | ||
|
||
|
||
# datasets = utils.RAPIDS_DATASET_ROOT_DIR_PATH/"karate.csv" | ||
datasets = utils.DATASETS_SMALL | ||
fixture_params = utils.genFixtureParamsProduct((datasets, "graph_file")) | ||
|
||
|
||
def _get_param_args(param_name, param_values): | ||
""" | ||
Returns a tuple of (<param_name>, <pytest.param list>) which can be applied | ||
as the args to pytest.mark.parametrize(). The pytest.param list also | ||
contains param id string formed from the param name and values. | ||
""" | ||
return (param_name, | ||
[pytest.param(v, id=f"{param_name}={v}") for v in param_values]) | ||
|
||
|
||
@pytest.mark.skipif( | ||
is_single_gpu(), reason="skipping MG testing on Single GPU system" | ||
) | ||
def test_mg_neighborhood_sampling_simple(dask_client): | ||
|
||
from cugraph.experimental.dask import uniform_neighborhood_sampling | ||
|
||
df = cudf.DataFrame({"src": cudf.Series([0, 1, 1, 2, 2, 2, 3, 4], | ||
dtype="int32"), | ||
"dst": cudf.Series([1, 3, 4, 0, 1, 3, 5, 5], | ||
dtype="int32"), | ||
"value": cudf.Series([0.1, 2.1, 1.1, 5.1, 3.1, | ||
4.1, 7.2, 3.2], | ||
dtype="float32"), | ||
}) | ||
ddf = dask_cudf.from_cudf(df, npartitions=2) | ||
|
||
G = cugraph.Graph(directed=True) | ||
G.from_dask_cudf_edgelist(ddf, "src", "dst", "value") | ||
|
||
# TODO: Incomplete, include more testing for tree graph as well as | ||
# for larger graphs | ||
start_list = cudf.Series([0, 1], dtype="int32") | ||
info_list = cudf.Series([0, 0], dtype="int32") | ||
fanout_vals = [1, 1] | ||
with_replacement = True | ||
result_nbr = uniform_neighborhood_sampling(G, | ||
(start_list, info_list), | ||
fanout_vals, | ||
with_replacement) | ||
result_nbr = result_nbr.compute() | ||
|
||
# Since the validity of results have (probably) been tested at botht he C++ | ||
# and C layers, simply test that the python interface and conversions were | ||
# done correctly. | ||
assert result_nbr['sources'].dtype == "int32" | ||
assert result_nbr['destinations'].dtype == "int32" | ||
assert result_nbr['labels'].dtype == "int32" | ||
assert result_nbr['indices'].dtype == "int32" | ||
|
||
# ALl labels should be 0 or 1 | ||
assert result_nbr['labels'].isin([0, 1]).all() | ||
|
||
|
||
@pytest.mark.skipif( | ||
is_single_gpu(), reason="skipping MG testing on Single GPU system" | ||
) | ||
def test_mg_neighborhood_sampling_tree(dask_client): | ||
|
||
from cugraph.experimental.dask import uniform_neighborhood_sampling | ||
|
||
input_data_path = (utils.RAPIDS_DATASET_ROOT_DIR_PATH / | ||
"small_tree.csv").as_posix() | ||
chunksize = dcg.get_chunksize(input_data_path) | ||
|
||
ddf = dask_cudf.read_csv( | ||
input_data_path, | ||
chunksize=chunksize, | ||
delimiter=" ", | ||
names=["src", "dst", "value"], | ||
dtype=["int32", "int32", "float32"], | ||
) | ||
|
||
G = cugraph.Graph(directed=True) | ||
G.from_dask_cudf_edgelist(ddf, "src", "dst", "value") | ||
|
||
# TODO: Incomplete, include more testing for tree graph as well as | ||
# for larger graphs | ||
start_list = cudf.Series([0, 0], dtype="int32") | ||
info_list = cudf.Series([0, 0], dtype="int32") | ||
fanout_vals = [4, 1, 3] | ||
with_replacement = True | ||
result_nbr = uniform_neighborhood_sampling(G, | ||
(start_list, info_list), | ||
fanout_vals, | ||
with_replacement) | ||
result_nbr = result_nbr.compute() | ||
|
||
# Since the validity of results have (probably) been tested at botht he C++ | ||
# and C layers, simply test that the python interface and conversions were | ||
# done correctly. | ||
assert result_nbr['sources'].dtype == "int32" | ||
assert result_nbr['destinations'].dtype == "int32" | ||
assert result_nbr['labels'].dtype == "int32" | ||
assert result_nbr['indices'].dtype == "int32" | ||
|
||
# All labels should be 0 | ||
assert (result_nbr['labels'] == 0).all() |
Oops, something went wrong.