Skip to content

Commit

Permalink
Merge branch 'branch-23.08' into cugraph-nx
Browse files Browse the repository at this point in the history
  • Loading branch information
eriknw committed Jul 18, 2023
2 parents 1852633 + 0372396 commit 8b24e4c
Show file tree
Hide file tree
Showing 10 changed files with 2,689 additions and 144 deletions.
1,860 changes: 1,860 additions & 0 deletions benchmarks/cugraph/standalone/bulk_sampling/benchmarking_script.ipynb

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions benchmarks/cugraph/standalone/bulk_sampling/bulk_sampling.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

export RAPIDS_NO_INITIALIZE="1"
export CUDF_SPILL="1"
export LIBCUDF_CUFILE_POLICY=OFF


dataset_name=$1
dataset_root=$2
output_root=$3
batch_sizes=$4
fanouts=$5
reverse_edges=$6

rm -rf $output_root
mkdir -p $output_root

# Change to 2 in Selene
gpu_per_replica=4
#--add_edge_ids \

# Expand to 1, 4, 8 in Selene
for i in 1,2,3,4:
do
for replication in 2;
do
dataset_name_with_replication="${dataset_name}[${replication}]"
dask_worker_devices=$(seq -s, 0 $((gpu_per_replica*replication-1)))
echo "Sampling dataset = $dataset_name_with_replication on devices = $dask_worker_devices"
python3 cugraph_bulk_sampling.py --datasets $dataset_name_with_replication \
--dataset_root $dataset_root \
--batch_sizes $batch_sizes \
--output_root $output_root \
--dask_worker_devices $dask_worker_devices \
--fanouts $fanouts \
--batch_sizes $batch_sizes \
--reverse_edges
done
done
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@

import cugraph

from datetime import datetime

import json
import re
import os
Expand All @@ -50,6 +48,7 @@
import dask_cudf
import dask.dataframe as ddf
from dask.distributed import default_client
from cugraph.dask import get_n_workers

from typing import Optional, Union, Dict

Expand Down Expand Up @@ -173,6 +172,7 @@ def sample_graph(G, label_df, output_path,seed=42, batch_size=500, seeds_per_cal
random_state=seed,
seeds_per_call=seeds_per_call,
batches_per_partition=batches_per_partition,
log_level = logging.INFO
)

n_workers = len(default_client().scheduler_info()['workers'])
Expand All @@ -182,10 +182,10 @@ def sample_graph(G, label_df, output_path,seed=42, batch_size=500, seeds_per_cal
'batch': cudf.Series(dtype='int32')
})


batch_df = label_df.map_partitions(_make_batch_ids, batch_size, n_workers, meta=meta)
#batch_df = batch_df.sort_values(by='node')

# should always persist the batch dataframe or performace may be suboptimal
# should always persist the batch dataframe or performance may be suboptimal
batch_df = batch_df.persist()

del label_df
Expand Down Expand Up @@ -278,6 +278,8 @@ def load_disk_dataset(dataset, dataset_dir='.', reverse_edges=True, replication_
path = Path(dataset_dir) / dataset
parquet_path = path / 'parquet'

n_workers = get_n_workers()

with open(os.path.join(path, 'meta.json')) as meta_file:
meta = json.load(meta_file)

Expand All @@ -289,7 +291,9 @@ def load_disk_dataset(dataset, dataset_dir='.', reverse_edges=True, replication_
print(f'Loading edge index for edge type {edge_type}')

can_edge_type = tuple(edge_type.split('__'))
edge_index_dict[can_edge_type] = dask_cudf.read_parquet(os.path.join(os.path.join(parquet_path, edge_type), 'edge_index.parquet'))
edge_index_dict[can_edge_type] = dask_cudf.read_parquet(
Path(parquet_path) / edge_type / 'edge_index.parquet'
).repartition(n_workers*2)

edge_index_dict[can_edge_type]['src'] += node_offsets_replicated[can_edge_type[0]]
edge_index_dict[can_edge_type]['dst'] += node_offsets_replicated[can_edge_type[-1]]
Expand Down Expand Up @@ -344,7 +348,7 @@ def load_disk_dataset(dataset, dataset_dir='.', reverse_edges=True, replication_
print(f'Loading node labels for node type {node_type} (offset={offset})')
node_label_path = os.path.join(os.path.join(parquet_path, node_type), 'node_label.parquet')
if os.path.exists(node_label_path):
node_labels[node_type] = dask_cudf.read_parquet(node_label_path).drop('label',axis=1).persist()
node_labels[node_type] = dask_cudf.read_parquet(node_label_path).repartition(n_workers).drop('label',axis=1).persist()
node_labels[node_type]['node'] += offset
node_labels[node_type] = node_labels[node_type].persist()

Expand Down
1 change: 1 addition & 0 deletions mg_utils/run-dask-process.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ function buildTcpArgs {
"

WORKER_ARGS="--rmm-pool-size=$WORKER_RMM_POOL_SIZE
--rmm-async
--local-directory=/tmp/$LOGNAME
--scheduler-file=$SCHEDULER_FILE
--memory-limit=$DASK_HOST_MEMORY_LIMIT
Expand Down
Loading

0 comments on commit 8b24e4c

Please sign in to comment.