Skip to content

Commit

Permalink
fix pyg tests, make renumbering optional
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv committed Jul 25, 2023
1 parent 6ffc699 commit 2ceeaf7
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_cugraph_loader_basic(dask_client, karate_gnn):
F, G, N = karate_gnn
cugraph_store = CuGraphStore(F, G, N, multi_gpu=True)
Expand Down Expand Up @@ -51,7 +50,6 @@ def test_cugraph_loader_basic(dask_client, karate_gnn):


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_cugraph_loader_hetero(dask_client, karate_gnn):
F, G, N = karate_gnn
cugraph_store = CuGraphStore(F, G, N, multi_gpu=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

@pytest.mark.cugraph_ops
@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_neighbor_sample(dask_client, basic_graph_1):
F, G, N = basic_graph_1
cugraph_store = CuGraphStore(F, G, N, multi_gpu=True)
Expand All @@ -44,9 +43,10 @@ def test_neighbor_sample(dask_client, basic_graph_1):
batch_id_list=cudf.Series(cupy.zeros(5, dtype="int32")),
random_state=62,
return_offsets=False,
return_hops=True,
)
.sort_values(by=["sources", "destinations"])
.compute()
.sort_values(by=["sources", "destinations"])
)

out = _sampler_output_from_sampling_results(
Expand Down Expand Up @@ -76,7 +76,7 @@ def test_neighbor_sample(dask_client, basic_graph_1):

# check the hop dictionaries
assert len(out.num_sampled_nodes) == 1
assert out.num_sampled_nodes["vt1"].tolist() == [4, 4]
assert out.num_sampled_nodes["vt1"].tolist() == [4, 1]

assert len(out.num_sampled_edges) == 1
assert out.num_sampled_edges[("vt1", "pig", "vt1")].tolist() == [6]
Expand Down Expand Up @@ -128,8 +128,8 @@ def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph

# check the hop dictionaries
assert len(out.num_sampled_nodes) == 2
assert out.num_sampled_nodes["black"].tolist() == [2, 2]
assert out.num_sampled_nodes["brown"].tolist() == [3, 2]
assert out.num_sampled_nodes["black"].tolist() == [2, 0]
assert out.num_sampled_nodes["brown"].tolist() == [3, 0]

assert len(out.num_sampled_edges) == 5
assert out.num_sampled_edges[("brown", "horse", "brown")].tolist() == [2]
Expand All @@ -140,7 +140,6 @@ def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_neighbor_sample_mock_sampling_results(dask_client):
N = {
"A": 2, # 0, 1
Expand Down Expand Up @@ -203,9 +202,9 @@ def test_neighbor_sample_mock_sampling_results(dask_client):
assert out.col[("B", "ba", "A")].tolist() == [1, 1]

assert len(out.num_sampled_nodes) == 3
assert out.num_sampled_nodes["A"].tolist() == [2, 0, 1, 0, 1]
assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 1, 0]
assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 2]
assert out.num_sampled_nodes["A"].tolist() == [2, 0, 0, 0, 0]
assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 0, 0]
assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 1]

assert len(out.num_sampled_edges) == 3
assert out.num_sampled_edges[("A", "ab", "B")].tolist() == [3, 0, 1, 0]
Expand Down
4 changes: 0 additions & 4 deletions python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_cugraph_loader_basic(karate_gnn):
F, G, N = karate_gnn
cugraph_store = CuGraphStore(F, G, N)
Expand All @@ -56,7 +55,6 @@ def test_cugraph_loader_basic(karate_gnn):


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_cugraph_loader_hetero(karate_gnn):
F, G, N = karate_gnn
cugraph_store = CuGraphStore(F, G, N)
Expand All @@ -83,7 +81,6 @@ def test_cugraph_loader_hetero(karate_gnn):


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_cugraph_loader_from_disk():
F = FeatureStore()
F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x")
Expand Down Expand Up @@ -126,7 +123,6 @@ def test_cugraph_loader_from_disk():


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_cugraph_loader_from_disk_subset():
F = FeatureStore()
F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x")
Expand Down
16 changes: 7 additions & 9 deletions python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

@pytest.mark.cugraph_ops
@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_neighbor_sample(basic_graph_1):
F, G, N = basic_graph_1
cugraph_store = CuGraphStore(F, G, N)
Expand Down Expand Up @@ -70,15 +69,14 @@ def test_neighbor_sample(basic_graph_1):

# check the hop dictionaries
assert len(out.num_sampled_nodes) == 1
assert out.num_sampled_nodes["vt1"].tolist() == [4, 4]
assert out.num_sampled_nodes["vt1"].tolist() == [4, 1]

assert len(out.num_sampled_edges) == 1
assert out.num_sampled_edges[("vt1", "pig", "vt1")].tolist() == [6]


@pytest.mark.cugraph_ops
@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1):
F, G, N = multi_edge_multi_vertex_graph_1
cugraph_store = CuGraphStore(F, G, N)
Expand All @@ -93,6 +91,7 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1):
random_state=62,
return_offsets=False,
).sort_values(by=["sources", "destinations"])
print(sampling_results)

out = _sampler_output_from_sampling_results(
sampling_results=sampling_results,
Expand All @@ -118,8 +117,8 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1):

# check the hop dictionaries
assert len(out.num_sampled_nodes) == 2
assert out.num_sampled_nodes["black"].tolist() == [2, 2]
assert out.num_sampled_nodes["brown"].tolist() == [3, 2]
assert out.num_sampled_nodes["black"].tolist() == [2, 0]
assert out.num_sampled_nodes["brown"].tolist() == [3, 0]

assert len(out.num_sampled_edges) == 5
assert out.num_sampled_edges[("brown", "horse", "brown")].tolist() == [2]
Expand All @@ -130,7 +129,6 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1):


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skip(reason="broken")
def test_neighbor_sample_mock_sampling_results(abc_graph):
F, G, N = abc_graph

Expand Down Expand Up @@ -166,9 +164,9 @@ def test_neighbor_sample_mock_sampling_results(abc_graph):
assert out.col[("B", "ba", "A")].tolist() == [1, 1]

assert len(out.num_sampled_nodes) == 3
assert out.num_sampled_nodes["A"].tolist() == [2, 0, 1, 0, 1]
assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 1, 0]
assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 2]
assert out.num_sampled_nodes["A"].tolist() == [2, 0, 0, 0, 0]
assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 0, 0]
assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 1]

assert len(out.num_sampled_edges) == 3
assert out.num_sampled_edges[("A", "ab", "B")].tolist() == [3, 0, 1, 0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,14 @@ def _call_plc_uniform_neighbor_sample_legacy(
random_state=random_state,
return_hops=return_hops,
)
return convert_to_cudf(

output = convert_to_cudf(
cp_arrays, weight_t, with_edge_properties, return_offsets=return_offsets
)

if isinstance(output, (list, tuple)) and len(output) == 1:
return output[0]
return output


def _mg_call_plc_uniform_neighbor_sample_legacy(
Expand Down
31 changes: 23 additions & 8 deletions python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(
graph,
seeds_per_call: int = 200_000,
batches_per_partition: int = 100,
renumber: bool=False,
log_level: int = None,
**kwargs,
):
Expand All @@ -61,6 +62,9 @@ def __init__(
a single sampling call.
batches_per_partition: int (optional, default=100)
The number of batches outputted to a single parquet partition.
renumber: bool (optional, default=False)
Whether to renumber vertices. Currently only supported for
homogeneous graphs.
log_level: int (optional, default=None)
Whether to enable logging for this sampler. Supports 3 levels
of logging if enabled (INFO, WARNING, ERROR). If not provided,
Expand Down Expand Up @@ -88,6 +92,7 @@ def __init__(
self.__graph = graph
self.__seeds_per_call = seeds_per_call
self.__batches_per_partition = batches_per_partition
self.__renumber = renumber
self.__batches = None
self.__sample_call_args = kwargs

Expand All @@ -102,6 +107,10 @@ def batch_size(self) -> int:
@property
def batches_per_partition(self) -> int:
return self.__batches_per_partition

@property
def renumber(self) -> bool:
return self.__renumber

@property
def size(self) -> int:
Expand Down Expand Up @@ -236,7 +245,7 @@ def flush(self) -> None:
start_time_sample_call = time.perf_counter()

# Call uniform neighbor sample
samples, offsets, renumber_map = sample_fn(
output = sample_fn(
self.__graph,
**self.__sample_call_args,
start_list=self.__batches[[self.start_col_name, self.batch_col_name]][
Expand All @@ -245,9 +254,15 @@ def flush(self) -> None:
with_batch_ids=True,
with_edge_properties=True,
return_offsets=True,
renumber=True,
renumber=self.__renumber
)

if self.__renumber:
samples, offsets, renumber_map = output
else:
samples, offsets = output
renumber_map = None

end_time_sample_call = time.perf_counter()
sample_runtime = end_time_sample_call - start_time_sample_call

Expand All @@ -266,15 +281,15 @@ def flush(self) -> None:
# Write batches to parquet
self.__write(samples, offsets, renumber_map)
if isinstance(self.__batches, dask_cudf.DataFrame):
wait(
[f.release() for f in futures_of(samples)]
+ [f.release() for f in futures_of(offsets)]
+ [f.release() for f in futures_of(renumber_map)]
)
futures = [f.release() for f in futures_of(samples)] + [f.release() for f in futures_of(offsets)]
if renumber_map is not None:
futures += [f.release() for f in futures_of(renumber_map)]
wait(futures)

del samples
del offsets
del renumber_map
if renumber_map is not None:
del renumber_map

end_time_write = time.perf_counter()
write_runtime = end_time_write - start_time_write
Expand Down
83 changes: 44 additions & 39 deletions python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,11 @@ def _write_samples_to_parquet(
end_batch_id = offsets_p.batch_id.iloc[-1]

start_ix = offsets_p.offsets.iloc[0]
renumber_map_start_ix = offsets_p.renumber_map_offsets.iloc[0]
if end_batch_id == max_batch_id:
end_ix = len(results)
renumber_map_end_ix = len(renumber_map)
else:
offsets_z = offsets[offsets.batch_id == (end_batch_id + 1)]
end_ix = offsets_z.offsets.iloc[0]
renumber_map_end_ix = offsets_z.renumber_map_offsets.iloc[0]

full_output_path = os.path.join(
output_path, f"batch={start_batch_id}-{end_batch_id}.parquet"
Expand All @@ -82,44 +79,52 @@ def _write_samples_to_parquet(
cupy.diff(offsets_p.offsets.values, append=end_ix)
).values

print("\n-------------------------------------------------------")
print(renumber_map_start_ix, renumber_map_end_ix)
print(start_batch_id, end_batch_id)
renumber_map_p = renumber_map.map.iloc[
renumber_map_start_ix:renumber_map_end_ix
]
print(renumber_map_p)
print(offsets_p)

# Add the length so no na-checking is required in the loading stage
map_offset = (
end_batch_id - start_batch_id + 2
) - offsets_p.renumber_map_offsets.iloc[0]
renumber_map_o = cudf.concat(
[
offsets_p.renumber_map_offsets + map_offset,
cudf.Series([len(renumber_map_p) + len(offsets_p) + 1], dtype="int32"),
]
)

renumber_offset_len = len(renumber_map_o)
if renumber_offset_len != end_batch_id - start_batch_id + 2:
raise ValueError("Invalid batch id or renumber map")
if renumber_map is not None:
renumber_map_start_ix = offsets_p.renumber_map_offsets.iloc[0]

final_map_series = cudf.concat(
[
renumber_map_o,
renumber_map_p,
],
ignore_index=True,
)
print("\nfinal map:\n", final_map_series)
if end_batch_id == max_batch_id:
renumber_map_end_ix = len(renumber_map)
else:
renumber_map_end_ix = offsets_z.renumber_map_offsets.iloc[0]

if len(final_map_series) > len(results_p):
final_map_series.name = "map"
results_p = results_p.join(final_map_series, how="outer").sort_index()
else:
results_p["map"] = final_map_series
print("\n-------------------------------------------------------")
print(renumber_map_start_ix, renumber_map_end_ix)
print(start_batch_id, end_batch_id)
renumber_map_p = renumber_map.map.iloc[
renumber_map_start_ix:renumber_map_end_ix
]
print(renumber_map_p)
print(offsets_p)

# Add the length so no na-checking is required in the loading stage
map_offset = (
end_batch_id - start_batch_id + 2
) - offsets_p.renumber_map_offsets.iloc[0]
renumber_map_o = cudf.concat(
[
offsets_p.renumber_map_offsets + map_offset,
cudf.Series([len(renumber_map_p) + len(offsets_p) + 1], dtype="int32"),
]
)

renumber_offset_len = len(renumber_map_o)
if renumber_offset_len != end_batch_id - start_batch_id + 2:
raise ValueError("Invalid batch id or renumber map")

final_map_series = cudf.concat(
[
renumber_map_o,
renumber_map_p,
],
ignore_index=True,
)
print("\nfinal map:\n", final_map_series)

if len(final_map_series) > len(results_p):
final_map_series.name = "map"
results_p = results_p.join(final_map_series, how="outer").sort_index()
else:
results_p["map"] = final_map_series

print("\nresults_p:\n", results_p)
results_p.to_parquet(
Expand Down
Loading

0 comments on commit 2ceeaf7

Please sign in to comment.