Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-23.08' into branch-23.0…
Browse files Browse the repository at this point in the history
…8_refactor-edge-bc
  • Loading branch information
jnke2016 committed Jul 20, 2023
2 parents 184bdcf + 145a92b commit feb9c50
Show file tree
Hide file tree
Showing 24 changed files with 896 additions and 31 deletions.
4 changes: 2 additions & 2 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ channels:
- rapidsai
- rapidsai-nightly
- dask/label/dev
- conda-forge
- nvidia
- pytorch
- dglteam/label/cu118
- conda-forge
- nvidia
dependencies:
- aiohttp
- c-compiler
Expand Down
4 changes: 2 additions & 2 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ channels:
- rapidsai
- rapidsai-nightly
- dask/label/dev
- conda-forge
- nvidia
- pytorch
- dglteam/label/cu118
- conda-forge
- nvidia
dependencies:
checks:
common:
Expand Down
4 changes: 2 additions & 2 deletions python/cugraph-dgl/conda/cugraph_dgl_dev_cuda-118.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ channels:
- rapidsai
- rapidsai-nightly
- dask/label/dev
- conda-forge
- nvidia
- pytorch
- dglteam/label/cu118
- conda-forge
- nvidia
dependencies:
- cugraph==23.8.*
- dgl>=1.1.0.cu*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ def _get_tensor_ls_from_sampled_df(df):
batch_indices = torch.searchsorted(batch_id_tensor, batch_indices)

split_d = {}

for column in ["sources", "destinations", "edge_id", "hop_id"]:
tensor = cast_to_tensor(df[column])
split_d[column] = torch.tensor_split(tensor, batch_indices.cpu())
if column in df.columns:
tensor = cast_to_tensor(df[column])
split_d[column] = torch.tensor_split(tensor, batch_indices.cpu())

result_tensor_ls = []
for i, hop_id_tensor in enumerate(split_d["hop_id"]):
Expand All @@ -66,7 +68,11 @@ def _get_tensor_ls_from_sampled_df(df):
hop_indices = torch.searchsorted(hop_id_tensor, hop_indices)
s = torch.tensor_split(split_d["sources"][i], hop_indices.cpu())
d = torch.tensor_split(split_d["destinations"][i], hop_indices.cpu())
eid = torch.tensor_split(split_d["edge_id"][i], hop_indices.cpu())
if "edge_id" in split_d:
eid = torch.tensor_split(split_d["edge_id"][i], hop_indices.cpu())
else:
eid = [None] * len(s)

result_tensor_ls.append((x, y, z) for x, y, z in zip(s, d, eid))

return result_tensor_ls
Expand Down Expand Up @@ -125,15 +131,16 @@ def _create_homogeneous_sampled_graphs_from_tensors_perhop(
def create_homogeneous_dgl_block_from_tensors_ls(
src_ids: torch.Tensor,
dst_ids: torch.Tensor,
edge_ids: torch.Tensor,
edge_ids: Optional[torch.Tensor],
seed_nodes: Optional[torch.Tensor],
total_number_of_nodes: int,
):
sampled_graph = dgl.graph(
(src_ids, dst_ids),
num_nodes=total_number_of_nodes,
)
sampled_graph.edata[dgl.EID] = edge_ids
if edge_ids is not None:
sampled_graph.edata[dgl.EID] = edge_ids
# TODO: Check if unique is needed
if seed_nodes is None:
seed_nodes = dst_ids.unique()
Expand All @@ -144,7 +151,8 @@ def create_homogeneous_dgl_block_from_tensors_ls(
src_nodes=src_ids.unique(),
include_dst_in_src=True,
)
block.edata[dgl.EID] = sampled_graph.edata[dgl.EID]
if edge_ids is not None:
block.edata[dgl.EID] = sampled_graph.edata[dgl.EID]
return block


Expand Down
66 changes: 65 additions & 1 deletion python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ def _call_plc_uniform_neighbor_sample(
with_edge_properties,
random_state=None,
return_offsets=False,
return_hops=True,
prior_sources_behavior=None,
deduplicate_sources=False,
):
st_x = st_x[0]
start_list_x = st_x[start_col_name]
Expand All @@ -209,6 +212,9 @@ def _call_plc_uniform_neighbor_sample(
with_edge_properties=with_edge_properties,
batch_id_list=batch_id_list_x,
random_state=random_state,
prior_sources_behavior=prior_sources_behavior,
deduplicate_sources=deduplicate_sources,
return_hops=return_hops,
)
return convert_to_cudf(
cp_arrays, weight_t, with_edge_properties, return_offsets=return_offsets
Expand All @@ -227,6 +233,7 @@ def _call_plc_uniform_neighbor_sample_legacy(
with_edge_properties,
random_state=None,
return_offsets=False,
return_hops=True,
):
start_list_x = st_x[start_col_name]
batch_id_list_x = st_x[batch_col_name] if batch_col_name in st_x else None
Expand All @@ -242,6 +249,7 @@ def _call_plc_uniform_neighbor_sample_legacy(
with_edge_properties=with_edge_properties,
batch_id_list=batch_id_list_x,
random_state=random_state,
return_hops=return_hops,
)
return convert_to_cudf(
cp_arrays, weight_t, with_edge_properties, return_offsets=return_offsets
Expand All @@ -262,6 +270,7 @@ def _mg_call_plc_uniform_neighbor_sample_legacy(
with_edge_properties,
random_state,
return_offsets=False,
return_hops=True,
):
result = [
client.submit(
Expand All @@ -281,6 +290,7 @@ def _mg_call_plc_uniform_neighbor_sample_legacy(
allow_other_workers=False,
pure=False,
return_offsets=return_offsets,
return_hops=return_hops,
)
for i, w in enumerate(Comms.get_workers())
]
Expand Down Expand Up @@ -327,6 +337,9 @@ def _mg_call_plc_uniform_neighbor_sample(
with_edge_properties,
random_state,
return_offsets=False,
return_hops=True,
prior_sources_behavior=None,
deduplicate_sources=False,
):
n_workers = None
if keep_batches_together:
Expand Down Expand Up @@ -354,6 +367,9 @@ def _mg_call_plc_uniform_neighbor_sample(
# FIXME accept and properly transmute a numpy/cupy random state.
random_state=hash((random_state, w)),
return_offsets=return_offsets,
return_hops=return_hops,
prior_sources_behavior=prior_sources_behavior,
deduplicate_sources=deduplicate_sources,
allow_other_workers=False,
pure=False,
)
Expand Down Expand Up @@ -408,6 +424,7 @@ def _uniform_neighbor_sample_legacy(
label_to_output_comm_rank: bool = None,
random_state: int = None,
return_offsets: bool = False,
return_hops: bool = False,
_multiple_clients: bool = False,
) -> Union[dask_cudf.DataFrame, Tuple[dask_cudf.DataFrame, dask_cudf.DataFrame]]:
warnings.warn(
Expand Down Expand Up @@ -508,6 +525,7 @@ def _uniform_neighbor_sample_legacy(
with_edge_properties=with_edge_properties,
random_state=random_state,
return_offsets=return_offsets,
return_hops=return_hops,
)
finally:
lock.release()
Expand All @@ -530,6 +548,7 @@ def _uniform_neighbor_sample_legacy(
with_edge_properties=with_edge_properties,
random_state=random_state,
return_offsets=return_offsets,
return_hops=return_hops,
)

if return_offsets:
Expand Down Expand Up @@ -564,6 +583,9 @@ def uniform_neighbor_sample(
max_batch_id=None,
random_state: int = None,
return_offsets: bool = False,
return_hops: bool = True,
prior_sources_behavior: str = None,
deduplicate_sources: bool = False,
_multiple_clients: bool = False,
) -> Union[dask_cudf.DataFrame, Tuple[dask_cudf.DataFrame, dask_cudf.DataFrame]]:
"""
Expand Down Expand Up @@ -630,6 +652,24 @@ def uniform_neighbor_sample(
dataframes, one with sampling results and one with
batch ids and their start offsets per rank.
return_hops: bool, optional (default=True)
Whether to return the sampling results with hop ids
corresponding to the hop where the edge appeared.
Defaults to True.
prior_sources_behavior: str (Optional)
Options are "carryover", and "exclude".
Default will leave the source list as-is.
Carryover will carry over sources from previous hops to the
current hop.
Exclude will exclude sources from previous hops from reappearing
as sources in future hops.
deduplicate_sources: bool, optional (default=False)
Whether to first deduplicate the list of possible sources
from the previous destinations before performing next
hop.
_multiple_clients: bool, optional (default=False)
internal flag to ensure sampling works with multiple dask clients
set to True to prevent hangs in multi-client environment
Expand Down Expand Up @@ -690,6 +730,14 @@ def uniform_neighbor_sample(
or label_list is not None
or label_to_output_comm_rank is not None
):
if prior_sources_behavior or deduplicate_sources:
raise ValueError(
"unique sources, carry_over_sources, and deduplicate_sources"
" are not supported with batch_id_list, label_list, and"
" label_to_output_comm_rank. Consider using with_batch_ids"
" and keep_batches_together instead."
)

return uniform_neighbor_sample_legacy(
input_graph,
start_list,
Expand All @@ -701,6 +749,7 @@ def uniform_neighbor_sample(
label_to_output_comm_rank=label_to_output_comm_rank,
random_state=random_state,
return_offsets=return_offsets,
return_hops=return_hops,
_multiple_clients=_multiple_clients,
)

Expand All @@ -719,7 +768,16 @@ def uniform_neighbor_sample(
raise ValueError("expected 1d input for start list without batch ids")

start_list = start_list.to_frame()
start_list[batch_id_n] = cudf.Series(cp.zeros(len(start_list), dtype="int32"))
if isinstance(start_list, dask_cudf.DataFrame):
start_list = start_list.map_partitions(
lambda df: df.assign(
**{batch_id_n: cudf.Series(cp.zeros(len(df), dtype="int32"))}
)
).persist()
else:
start_list = start_list.reset_index(drop=True).assign(
**{batch_id_n: cudf.Series(cp.zeros(len(start_list), dtype="int32"))}
)

if keep_batches_together and min_batch_id is None:
raise ValueError(
Expand Down Expand Up @@ -795,6 +853,9 @@ def uniform_neighbor_sample(
with_edge_properties=with_edge_properties,
random_state=random_state,
return_offsets=return_offsets,
return_hops=return_hops,
prior_sources_behavior=prior_sources_behavior,
deduplicate_sources=deduplicate_sources,
)
finally:
lock.release()
Expand All @@ -818,6 +879,9 @@ def uniform_neighbor_sample(
with_edge_properties=with_edge_properties,
random_state=random_state,
return_offsets=return_offsets,
return_hops=return_hops,
prior_sources_behavior=prior_sources_behavior,
deduplicate_sources=deduplicate_sources,
)

if return_offsets:
Expand Down
2 changes: 2 additions & 0 deletions python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def _write_samples_to_parquet(
raise ValueError("Invalid value of partition_info")

max_batch_id = offsets.batch_id.max()
results.dropna(axis=1, how="all", inplace=True)
results["hop_id"] = results["hop_id"].astype("uint8")

for p in range(0, len(offsets), batches_per_partition):
offsets_p = offsets.iloc[p : p + batches_per_partition]
Expand Down
34 changes: 31 additions & 3 deletions python/cugraph/cugraph/link_prediction/jaccard.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -20,7 +20,7 @@
)


def jaccard(input_graph, vertex_pair=None):
def jaccard(input_graph, vertex_pair=None, do_expensive_check=True):
"""
Compute the Jaccard similarity between each pair of vertices connected by
an edge, or between arbitrary pairs of vertices specified by the user.
Expand All @@ -36,6 +36,10 @@ def jaccard(input_graph, vertex_pair=None):
of cugraph.jaccard is different from the behavior of
networkx.jaccard_coefficient.
This algorithm doesn't currently support datasets with vertices that
are not (re)numebred vertices from 0 to V-1 where V is the total number of
vertices as this creates isolated vertices.
cugraph.jaccard, in the absence of a specified vertex pair list, will
use the edges of the graph to construct a vertex pair list and will
return the jaccard coefficient for those vertex pairs.
Expand Down Expand Up @@ -80,6 +84,10 @@ def jaccard(input_graph, vertex_pair=None):
current implementation computes the jaccard coefficient for all
adjacent vertices in the graph.
do_expensive_check: bool (default=True)
When set to True, check if the vertices in the graph are (re)numbered
from 0 to V-1 where V is the total number of vertices.
Returns
-------
df : cudf.DataFrame
Expand All @@ -104,6 +112,22 @@ def jaccard(input_graph, vertex_pair=None):
>>> df = cugraph.jaccard(G)
"""
if do_expensive_check:
if not input_graph.renumbered:
input_df = input_graph.edgelist.edgelist_df[["src", "dst"]]
max_vertex = input_df.max().max()
expected_nodes = cudf.Series(range(0, max_vertex + 1, 1)).astype(
input_df.dtypes[0]
)
nodes = (
cudf.concat([input_df["src"], input_df["dst"]])
.unique()
.sort_values()
.reset_index(drop=True)
)
if not expected_nodes.equals(nodes):
raise ValueError("Unrenumbered vertices are not supported.")

if input_graph.is_directed():
raise ValueError("Input must be an undirected Graph.")
if type(vertex_pair) == cudf.DataFrame:
Expand All @@ -120,10 +144,14 @@ def jaccard(input_graph, vertex_pair=None):
return df


def jaccard_coefficient(G, ebunch=None):
def jaccard_coefficient(G, ebunch=None, do_expensive_check=True):
"""
For NetworkX Compatability. See `jaccard`
NOTE: This algorithm doesn't currently support datasets with vertices that
are not (re)numebred vertices from 0 to V-1 where V is the total number of
vertices as this creates isolated vertices.
Parameters
----------
graph : cugraph.Graph
Expand Down
Loading

0 comments on commit feb9c50

Please sign in to comment.