Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward-merge branch-23.10 to branch-23.12 #3904

Merged
merged 8 commits into from
Oct 14, 2023
4 changes: 4 additions & 0 deletions ci/release/update-version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ NEXT_SHORT_TAG_PEP440=$(python -c "from setuptools.extern import packaging; prin
DEPENDENCIES=(
cudf
cugraph
cugraph-dgl
cugraph-pyg
cugraph-service-server
cugraph-service-client
cuxfilter
dask-cuda
Expand All @@ -93,6 +96,7 @@ DEPENDENCIES=(
librmm
pylibcugraph
pylibcugraphops
pylibwholegraph
pylibraft
pyraft
raft-dask
Expand Down
12 changes: 9 additions & 3 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ popd

rapids-logger "pytest cugraph"
pushd python/cugraph/cugraph
export DASK_WORKER_DEVICES="0"
DASK_WORKER_DEVICES="0" \
DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="1000s" \
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1000s" \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
pytest \
-v \
--benchmark-disable \
Expand Down Expand Up @@ -200,8 +203,11 @@ if [[ "${RAPIDS_CUDA_VERSION}" == "11.8.0" ]]; then
--channel pytorch \
--channel nvidia \
'pyg=2.3' \
'pytorch>=2.0' \
'pytorch-cuda>=11.8'
'pytorch=2.0.0' \
'pytorch-cuda=11.8'

# Install pyg dependencies (which requires pip)
pip install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.0.0+cu118.html

rapids-mamba-retry install \
--channel "${CPP_CHANNEL}" \
Expand Down
6 changes: 5 additions & 1 deletion ci/test_wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ arch=$(uname -m)
if [[ "${arch}" == "aarch64" && ${RAPIDS_BUILD_TYPE} == "pull-request" ]]; then
python ./ci/wheel_smoke_test_${package_name}.py
else
RAPIDS_DATASET_ROOT_DIR=`pwd`/datasets python -m pytest ./python/${package_name}/${python_package_name}/tests
RAPIDS_DATASET_ROOT_DIR=`pwd`/datasets \
DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="1000s" \
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1000s" \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
python -m pytest ./python/${package_name}/${python_package_name}/tests
fi
2 changes: 2 additions & 0 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies:
- pydata-sphinx-theme
- pylibcugraphops==23.10.*
- pylibraft==23.10.*
- pylibwholegraph==23.10.*
- pytest
- pytest-benchmark
- pytest-cov
Expand All @@ -73,5 +74,6 @@ dependencies:
- sphinxcontrib-websupport
- ucx-proc=*=gpu
- ucx-py==0.34.*
- wget
- wheel
name: all_cuda-118_arch-x86_64
2 changes: 2 additions & 0 deletions conda/environments/all_cuda-120_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies:
- pydata-sphinx-theme
- pylibcugraphops==23.10.*
- pylibraft==23.10.*
- pylibwholegraph==23.10.*
- pytest
- pytest-benchmark
- pytest-cov
Expand All @@ -72,5 +73,6 @@ dependencies:
- sphinxcontrib-websupport
- ucx-proc=*=gpu
- ucx-py==0.34.*
- wget
- wheel
name: all_cuda-120_arch-x86_64
6 changes: 6 additions & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ dependencies:
packages:
- ipython
- notebook>=0.5.0
- output_types: [conda]
packages:
- wget
test_python_common:
common:
- output_types: [conda, pyproject]
Expand All @@ -481,6 +484,9 @@ dependencies:
- *numpy
- python-louvain
- scikit-learn>=0.23.1
- output_types: [conda]
packages:
- pylibwholegraph==23.10.*
test_python_pylibcugraph:
common:
- output_types: [conda, pyproject]
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from cugraph_dgl.dataloading.dataset import (
HomogenousBulkSamplerDataset,
HetrogenousBulkSamplerDataset,
HeterogenousBulkSamplerDataset,
)
from cugraph_dgl.dataloading.neighbor_sampler import NeighborSampler
from cugraph_dgl.dataloading.dataloader import DataLoader
49 changes: 35 additions & 14 deletions python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from dask.distributed import default_client, Event
from cugraph_dgl.dataloading import (
HomogenousBulkSamplerDataset,
HetrogenousBulkSamplerDataset,
HeterogenousBulkSamplerDataset,
)
from cugraph_dgl.dataloading.utils.extract_graph_helpers import (
create_cugraph_graph_from_edges_dict,
Expand All @@ -47,19 +47,20 @@ def __init__(
graph_sampler: cugraph_dgl.dataloading.NeighborSampler,
sampling_output_dir: str,
batches_per_partition: int = 50,
seeds_per_call: int = 400_000,
seeds_per_call: int = 200_000,
device: torch.device = None,
use_ddp: bool = False,
ddp_seed: int = 0,
batch_size: int = 1024,
drop_last: bool = False,
shuffle: bool = False,
sparse_format: str = "coo",
**kwargs,
):
"""
Constructor for CuGraphStorage:
-------------------------------
graph : CuGraphStorage
graph : CuGraphStorage
The graph.
indices : Tensor or dict[ntype, Tensor]
The set of indices. It can either be a tensor of
Expand Down Expand Up @@ -89,7 +90,12 @@ def __init__(
The seed for shuffling the dataset in
:class:`torch.utils.data.distributed.DistributedSampler`.
Only effective when :attr:`use_ddp` is True.
batch_size: int,
batch_size: int
Batch size.
sparse_format: str, default = "coo"
The sparse format of the emitted sampled graphs. Choose between "csc"
and "coo". When using "csc", the graphs are of type
cugraph_dgl.nn.SparseGraph.
kwargs : dict
Key-word arguments to be passed to the parent PyTorch
:py:class:`torch.utils.data.DataLoader` class. Common arguments are:
Expand Down Expand Up @@ -123,6 +129,12 @@ def __init__(
... for input_nodes, output_nodes, blocks in dataloader:
...
"""
if sparse_format not in ["coo", "csc"]:
raise ValueError(
f"sparse_format must be one of 'coo', 'csc', "
f"but got {sparse_format}."
)
self.sparse_format = sparse_format

self.ddp_seed = ddp_seed
self.use_ddp = use_ddp
Expand Down Expand Up @@ -156,11 +168,12 @@ def __init__(
self.cugraph_dgl_dataset = HomogenousBulkSamplerDataset(
total_number_of_nodes=graph.total_number_of_nodes,
edge_dir=self.graph_sampler.edge_dir,
sparse_format=sparse_format,
)
else:
etype_id_to_etype_str_dict = {v: k for k, v in graph._etype_id_dict.items()}

self.cugraph_dgl_dataset = HetrogenousBulkSamplerDataset(
self.cugraph_dgl_dataset = HeterogenousBulkSamplerDataset(
num_nodes_dict=graph.num_nodes_dict,
etype_id_dict=etype_id_to_etype_str_dict,
etype_offset_dict=graph._etype_offset_d,
Expand Down Expand Up @@ -210,14 +223,23 @@ def __iter__(self):
output_dir = os.path.join(
self._sampling_output_dir, "epoch_" + str(self.epoch_number)
)
kwargs = {}
if isinstance(self.cugraph_dgl_dataset, HomogenousBulkSamplerDataset):
deduplicate_sources = True
prior_sources_behavior = "carryover"
renumber = True
kwargs["deduplicate_sources"] = True
kwargs["prior_sources_behavior"] = "carryover"
kwargs["renumber"] = True

if self.sparse_format == "csc":
kwargs["compression"] = "CSR"
kwargs["compress_per_hop"] = True
# The following kwargs will be deprecated in uniform sampler.
kwargs["use_legacy_names"] = False
kwargs["include_hop_column"] = False

else:
deduplicate_sources = False
prior_sources_behavior = None
renumber = False
kwargs["deduplicate_sources"] = False
kwargs["prior_sources_behavior"] = None
kwargs["renumber"] = False

bs = BulkSampler(
output_path=output_dir,
Expand All @@ -227,10 +249,9 @@ def __iter__(self):
seeds_per_call=self._seeds_per_call,
fanout_vals=self.graph_sampler._reversed_fanout_vals,
with_replacement=self.graph_sampler.replace,
deduplicate_sources=deduplicate_sources,
prior_sources_behavior=prior_sources_behavior,
renumber=renumber,
**kwargs,
)

if self.shuffle:
self.tensorized_indices_ds.shuffle()

Expand Down
37 changes: 25 additions & 12 deletions python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from cugraph_dgl.dataloading.utils.sampling_helpers import (
create_homogeneous_sampled_graphs_from_dataframe,
create_heterogeneous_sampled_graphs_from_dataframe,
create_homogeneous_sampled_graphs_from_dataframe_csc,
)


Expand All @@ -33,17 +34,19 @@ def __init__(
total_number_of_nodes: int,
edge_dir: str,
return_type: str = "dgl.Block",
sparse_format: str = "coo",
):
if return_type not in ["dgl.Block", "cugraph_dgl.nn.SparseGraph"]:
raise ValueError(
"return_type must be either 'dgl.Block' or \
'cugraph_dgl.nn.SparseGraph' "
"return_type must be either 'dgl.Block' or "
"'cugraph_dgl.nn.SparseGraph'."
)
# TODO: Deprecate `total_number_of_nodes`
# as it is no longer needed
# in the next release
self.total_number_of_nodes = total_number_of_nodes
self.edge_dir = edge_dir
self.sparse_format = sparse_format
self._current_batch_fn = None
self._input_files = None
self._return_type = return_type
Expand All @@ -60,10 +63,20 @@ def __getitem__(self, idx: int):

fn, batch_offset = self._batch_to_fn_d[idx]
if fn != self._current_batch_fn:
df = _load_sampled_file(dataset_obj=self, fn=fn)
self._current_batches = create_homogeneous_sampled_graphs_from_dataframe(
sampled_df=df, edge_dir=self.edge_dir, return_type=self._return_type
)
if self.sparse_format == "csc":
df = _load_sampled_file(dataset_obj=self, fn=fn, skip_rename=True)
self._current_batches = (
create_homogeneous_sampled_graphs_from_dataframe_csc(df)
)
else:
df = _load_sampled_file(dataset_obj=self, fn=fn)
self._current_batches = (
create_homogeneous_sampled_graphs_from_dataframe(
sampled_df=df,
edge_dir=self.edge_dir,
return_type=self._return_type,
)
)
current_offset = idx - batch_offset
return self._current_batches[current_offset]

Expand All @@ -87,7 +100,7 @@ def set_input_files(
)


class HetrogenousBulkSamplerDataset(torch.utils.data.Dataset):
class HeterogenousBulkSamplerDataset(torch.utils.data.Dataset):
def __init__(
self,
num_nodes_dict: Dict[str, int],
Expand Down Expand Up @@ -141,18 +154,18 @@ def set_input_files(
----------
input_directory: str
input_directory which contains all the files that will be
loaded by HetrogenousBulkSamplerDataset
loaded by HeterogenousBulkSamplerDataset
input_file_paths: List[str]
File names that will be loaded by the HetrogenousBulkSamplerDataset
File names that will be loaded by the HeterogenousBulkSamplerDataset
"""
_set_input_files(
self, input_directory=input_directory, input_file_paths=input_file_paths
)


def _load_sampled_file(dataset_obj, fn):
def _load_sampled_file(dataset_obj, fn, skip_rename=False):
df = cudf.read_parquet(os.path.join(fn))
if dataset_obj.edge_dir == "in":
if dataset_obj.edge_dir == "in" and not skip_rename:
df.rename(
columns={"sources": "destinations", "destinations": "sources"},
inplace=True,
Expand Down Expand Up @@ -181,7 +194,7 @@ def get_batch_to_fn_d(files):


def _set_input_files(
dataset_obj: Union[HomogenousBulkSamplerDataset, HetrogenousBulkSamplerDataset],
dataset_obj: Union[HomogenousBulkSamplerDataset, HeterogenousBulkSamplerDataset],
input_directory: Optional[str] = None,
input_file_paths: Optional[List[str]] = None,
) -> None:
Expand Down
Loading