From 2ceeaf7999a70b1352a604927ef2a105d54b57fe Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Tue, 25 Jul 2023 15:06:59 +0000 Subject: [PATCH] fix pyg tests, make renumbering optional --- .../tests/mg/test_mg_cugraph_loader.py | 2 - .../tests/mg/test_mg_cugraph_sampler.py | 17 ++-- .../cugraph_pyg/tests/test_cugraph_loader.py | 4 - .../cugraph_pyg/tests/test_cugraph_sampler.py | 16 ++-- .../dask/sampling/uniform_neighbor_sample.py | 7 +- .../cugraph/gnn/data_loading/bulk_sampler.py | 31 +++++-- .../gnn/data_loading/bulk_sampler_io.py | 83 ++++++++++--------- .../tests/sampling/test_bulk_sampler.py | 8 +- .../tests/sampling/test_bulk_sampler_mg.py | 5 +- 9 files changed, 95 insertions(+), 78 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py index 77baf094f06..e29f3aea512 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py @@ -22,7 +22,6 @@ @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_cugraph_loader_basic(dask_client, karate_gnn): F, G, N = karate_gnn cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) @@ -51,7 +50,6 @@ def test_cugraph_loader_basic(dask_client, karate_gnn): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_cugraph_loader_hetero(dask_client, karate_gnn): F, G, N = karate_gnn cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py index 0bcba71b9b5..d235ed42553 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py @@ -29,7 +29,6 @@ @pytest.mark.cugraph_ops @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_neighbor_sample(dask_client, basic_graph_1): F, G, N = basic_graph_1 cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) @@ -44,9 +43,10 @@ def test_neighbor_sample(dask_client, basic_graph_1): batch_id_list=cudf.Series(cupy.zeros(5, dtype="int32")), random_state=62, return_offsets=False, + return_hops=True, ) - .sort_values(by=["sources", "destinations"]) .compute() + .sort_values(by=["sources", "destinations"]) ) out = _sampler_output_from_sampling_results( @@ -76,7 +76,7 @@ def test_neighbor_sample(dask_client, basic_graph_1): # check the hop dictionaries assert len(out.num_sampled_nodes) == 1 - assert out.num_sampled_nodes["vt1"].tolist() == [4, 4] + assert out.num_sampled_nodes["vt1"].tolist() == [4, 1] assert len(out.num_sampled_edges) == 1 assert out.num_sampled_edges[("vt1", "pig", "vt1")].tolist() == [6] @@ -128,8 +128,8 @@ def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph # check the hop dictionaries assert len(out.num_sampled_nodes) == 2 - assert out.num_sampled_nodes["black"].tolist() == [2, 2] - assert out.num_sampled_nodes["brown"].tolist() == [3, 2] + assert out.num_sampled_nodes["black"].tolist() == [2, 0] + assert out.num_sampled_nodes["brown"].tolist() == [3, 0] assert len(out.num_sampled_edges) == 5 assert out.num_sampled_edges[("brown", "horse", "brown")].tolist() == [2] @@ -140,7 +140,6 @@ def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_neighbor_sample_mock_sampling_results(dask_client): N = { "A": 2, # 0, 1 @@ -203,9 +202,9 @@ def test_neighbor_sample_mock_sampling_results(dask_client): assert out.col[("B", "ba", "A")].tolist() == [1, 1] assert len(out.num_sampled_nodes) == 3 - assert out.num_sampled_nodes["A"].tolist() == [2, 0, 1, 0, 1] - assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 1, 0] - assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 2] + assert out.num_sampled_nodes["A"].tolist() == [2, 0, 0, 0, 0] + assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 0, 0] + assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 1] assert len(out.num_sampled_edges) == 3 assert out.num_sampled_edges[("A", "ab", "B")].tolist() == [3, 0, 1, 0] diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py index 683f73e52fd..e0a943aeca3 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py @@ -29,7 +29,6 @@ @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_cugraph_loader_basic(karate_gnn): F, G, N = karate_gnn cugraph_store = CuGraphStore(F, G, N) @@ -56,7 +55,6 @@ def test_cugraph_loader_basic(karate_gnn): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_cugraph_loader_hetero(karate_gnn): F, G, N = karate_gnn cugraph_store = CuGraphStore(F, G, N) @@ -83,7 +81,6 @@ def test_cugraph_loader_hetero(karate_gnn): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_cugraph_loader_from_disk(): F = FeatureStore() F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x") @@ -126,7 +123,6 @@ def test_cugraph_loader_from_disk(): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_cugraph_loader_from_disk_subset(): F = FeatureStore() F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x") diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py index 215a7b74347..174650e1ffb 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py @@ -27,7 +27,6 @@ @pytest.mark.cugraph_ops @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_neighbor_sample(basic_graph_1): F, G, N = basic_graph_1 cugraph_store = CuGraphStore(F, G, N) @@ -70,7 +69,7 @@ def test_neighbor_sample(basic_graph_1): # check the hop dictionaries assert len(out.num_sampled_nodes) == 1 - assert out.num_sampled_nodes["vt1"].tolist() == [4, 4] + assert out.num_sampled_nodes["vt1"].tolist() == [4, 1] assert len(out.num_sampled_edges) == 1 assert out.num_sampled_edges[("vt1", "pig", "vt1")].tolist() == [6] @@ -78,7 +77,6 @@ def test_neighbor_sample(basic_graph_1): @pytest.mark.cugraph_ops @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): F, G, N = multi_edge_multi_vertex_graph_1 cugraph_store = CuGraphStore(F, G, N) @@ -93,6 +91,7 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): random_state=62, return_offsets=False, ).sort_values(by=["sources", "destinations"]) + print(sampling_results) out = _sampler_output_from_sampling_results( sampling_results=sampling_results, @@ -118,8 +117,8 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): # check the hop dictionaries assert len(out.num_sampled_nodes) == 2 - assert out.num_sampled_nodes["black"].tolist() == [2, 2] - assert out.num_sampled_nodes["brown"].tolist() == [3, 2] + assert out.num_sampled_nodes["black"].tolist() == [2, 0] + assert out.num_sampled_nodes["brown"].tolist() == [3, 0] assert len(out.num_sampled_edges) == 5 assert out.num_sampled_edges[("brown", "horse", "brown")].tolist() == [2] @@ -130,7 +129,6 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -@pytest.mark.skip(reason="broken") def test_neighbor_sample_mock_sampling_results(abc_graph): F, G, N = abc_graph @@ -166,9 +164,9 @@ def test_neighbor_sample_mock_sampling_results(abc_graph): assert out.col[("B", "ba", "A")].tolist() == [1, 1] assert len(out.num_sampled_nodes) == 3 - assert out.num_sampled_nodes["A"].tolist() == [2, 0, 1, 0, 1] - assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 1, 0] - assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 2] + assert out.num_sampled_nodes["A"].tolist() == [2, 0, 0, 0, 0] + assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 0, 0] + assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 1] assert len(out.num_sampled_edges) == 3 assert out.num_sampled_edges[("A", "ab", "B")].tolist() == [3, 0, 1, 0] diff --git a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py index 553f3f5f543..81ac9e8d1d1 100644 --- a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py @@ -317,9 +317,14 @@ def _call_plc_uniform_neighbor_sample_legacy( random_state=random_state, return_hops=return_hops, ) - return convert_to_cudf( + + output = convert_to_cudf( cp_arrays, weight_t, with_edge_properties, return_offsets=return_offsets ) + + if isinstance(output, (list, tuple)) and len(output) == 1: + return output[0] + return output def _mg_call_plc_uniform_neighbor_sample_legacy( diff --git a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py index 57dc20196fd..02ea7e10935 100644 --- a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py +++ b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py @@ -42,6 +42,7 @@ def __init__( graph, seeds_per_call: int = 200_000, batches_per_partition: int = 100, + renumber: bool=False, log_level: int = None, **kwargs, ): @@ -61,6 +62,9 @@ def __init__( a single sampling call. batches_per_partition: int (optional, default=100) The number of batches outputted to a single parquet partition. + renumber: bool (optional, default=False) + Whether to renumber vertices. Currently only supported for + homogeneous graphs. log_level: int (optional, default=None) Whether to enable logging for this sampler. Supports 3 levels of logging if enabled (INFO, WARNING, ERROR). If not provided, @@ -88,6 +92,7 @@ def __init__( self.__graph = graph self.__seeds_per_call = seeds_per_call self.__batches_per_partition = batches_per_partition + self.__renumber = renumber self.__batches = None self.__sample_call_args = kwargs @@ -102,6 +107,10 @@ def batch_size(self) -> int: @property def batches_per_partition(self) -> int: return self.__batches_per_partition + + @property + def renumber(self) -> bool: + return self.__renumber @property def size(self) -> int: @@ -236,7 +245,7 @@ def flush(self) -> None: start_time_sample_call = time.perf_counter() # Call uniform neighbor sample - samples, offsets, renumber_map = sample_fn( + output = sample_fn( self.__graph, **self.__sample_call_args, start_list=self.__batches[[self.start_col_name, self.batch_col_name]][ @@ -245,9 +254,15 @@ def flush(self) -> None: with_batch_ids=True, with_edge_properties=True, return_offsets=True, - renumber=True, + renumber=self.__renumber ) + if self.__renumber: + samples, offsets, renumber_map = output + else: + samples, offsets = output + renumber_map = None + end_time_sample_call = time.perf_counter() sample_runtime = end_time_sample_call - start_time_sample_call @@ -266,15 +281,15 @@ def flush(self) -> None: # Write batches to parquet self.__write(samples, offsets, renumber_map) if isinstance(self.__batches, dask_cudf.DataFrame): - wait( - [f.release() for f in futures_of(samples)] - + [f.release() for f in futures_of(offsets)] - + [f.release() for f in futures_of(renumber_map)] - ) + futures = [f.release() for f in futures_of(samples)] + [f.release() for f in futures_of(offsets)] + if renumber_map is not None: + futures += [f.release() for f in futures_of(renumber_map)] + wait(futures) del samples del offsets - del renumber_map + if renumber_map is not None: + del renumber_map end_time_write = time.perf_counter() write_runtime = end_time_write - start_time_write diff --git a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py index 7706e31633d..de96b6404e1 100644 --- a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py +++ b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py @@ -64,14 +64,11 @@ def _write_samples_to_parquet( end_batch_id = offsets_p.batch_id.iloc[-1] start_ix = offsets_p.offsets.iloc[0] - renumber_map_start_ix = offsets_p.renumber_map_offsets.iloc[0] if end_batch_id == max_batch_id: end_ix = len(results) - renumber_map_end_ix = len(renumber_map) else: offsets_z = offsets[offsets.batch_id == (end_batch_id + 1)] end_ix = offsets_z.offsets.iloc[0] - renumber_map_end_ix = offsets_z.renumber_map_offsets.iloc[0] full_output_path = os.path.join( output_path, f"batch={start_batch_id}-{end_batch_id}.parquet" @@ -82,44 +79,52 @@ def _write_samples_to_parquet( cupy.diff(offsets_p.offsets.values, append=end_ix) ).values - print("\n-------------------------------------------------------") - print(renumber_map_start_ix, renumber_map_end_ix) - print(start_batch_id, end_batch_id) - renumber_map_p = renumber_map.map.iloc[ - renumber_map_start_ix:renumber_map_end_ix - ] - print(renumber_map_p) - print(offsets_p) - - # Add the length so no na-checking is required in the loading stage - map_offset = ( - end_batch_id - start_batch_id + 2 - ) - offsets_p.renumber_map_offsets.iloc[0] - renumber_map_o = cudf.concat( - [ - offsets_p.renumber_map_offsets + map_offset, - cudf.Series([len(renumber_map_p) + len(offsets_p) + 1], dtype="int32"), - ] - ) - - renumber_offset_len = len(renumber_map_o) - if renumber_offset_len != end_batch_id - start_batch_id + 2: - raise ValueError("Invalid batch id or renumber map") + if renumber_map is not None: + renumber_map_start_ix = offsets_p.renumber_map_offsets.iloc[0] - final_map_series = cudf.concat( - [ - renumber_map_o, - renumber_map_p, - ], - ignore_index=True, - ) - print("\nfinal map:\n", final_map_series) + if end_batch_id == max_batch_id: + renumber_map_end_ix = len(renumber_map) + else: + renumber_map_end_ix = offsets_z.renumber_map_offsets.iloc[0] - if len(final_map_series) > len(results_p): - final_map_series.name = "map" - results_p = results_p.join(final_map_series, how="outer").sort_index() - else: - results_p["map"] = final_map_series + print("\n-------------------------------------------------------") + print(renumber_map_start_ix, renumber_map_end_ix) + print(start_batch_id, end_batch_id) + renumber_map_p = renumber_map.map.iloc[ + renumber_map_start_ix:renumber_map_end_ix + ] + print(renumber_map_p) + print(offsets_p) + + # Add the length so no na-checking is required in the loading stage + map_offset = ( + end_batch_id - start_batch_id + 2 + ) - offsets_p.renumber_map_offsets.iloc[0] + renumber_map_o = cudf.concat( + [ + offsets_p.renumber_map_offsets + map_offset, + cudf.Series([len(renumber_map_p) + len(offsets_p) + 1], dtype="int32"), + ] + ) + + renumber_offset_len = len(renumber_map_o) + if renumber_offset_len != end_batch_id - start_batch_id + 2: + raise ValueError("Invalid batch id or renumber map") + + final_map_series = cudf.concat( + [ + renumber_map_o, + renumber_map_p, + ], + ignore_index=True, + ) + print("\nfinal map:\n", final_map_series) + + if len(final_map_series) > len(results_p): + final_map_series.name = "map" + results_p = results_p.join(final_map_series, how="outer").sort_index() + else: + results_p["map"] = final_map_series print("\nresults_p:\n", results_p) results_p.to_parquet( diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py index de7ea1d34e4..651bdf5d12b 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py @@ -60,8 +60,7 @@ def test_bulk_sampler_simple(scratch_dir): bs.flush() recovered_samples = cudf.read_parquet(samples_path) - recovered_samples = recovered_samples.drop("map", axis=1).dropna() - print(recovered_samples) + assert 'map' not in recovered_samples.columns for b in batches["batch"].unique().values_host.tolist(): assert b in recovered_samples["batch_id"].values_host.tolist() @@ -113,7 +112,7 @@ def test_bulk_sampler_remainder(scratch_dir): bs.flush() recovered_samples = cudf.read_parquet(samples_path) - recovered_samples = recovered_samples.drop("map", axis=1).dropna() + assert 'map' not in recovered_samples.columns for b in batches["batch"].unique().values_host.tolist(): assert b in recovered_samples["batch_id"].values_host.tolist() @@ -169,7 +168,7 @@ def test_bulk_sampler_large_batch_size(scratch_dir): bs.flush() recovered_samples = cudf.read_parquet(samples_path) - recovered_samples = recovered_samples.drop("map", axis=1).dropna() + assert 'map' not in recovered_samples.columns for b in batches["batch"].unique().values_host.tolist(): assert b in recovered_samples["batch_id"].values_host.tolist() @@ -203,6 +202,7 @@ def test_bulk_sampler_partitions(scratch_dir): fanout_vals=[2, 2], with_replacement=False, batches_per_partition=2, + renumber=True, ) batches = cudf.DataFrame( diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py index dc70e661a5c..3fc1101e4af 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py @@ -64,7 +64,7 @@ def test_bulk_sampler_simple(dask_client, scratch_dir): bs.flush() recovered_samples = cudf.read_parquet(samples_path) - recovered_samples = recovered_samples.drop("map", axis=1).dropna() + assert 'map' not in recovered_samples.columns for b in batches["batch"].unique().compute().values_host.tolist(): assert b in recovered_samples["batch_id"].values_host.tolist() @@ -108,7 +108,7 @@ def test_bulk_sampler_mg_graph_sg_input(dask_client, scratch_dir): bs.flush() recovered_samples = cudf.read_parquet(samples_path) - recovered_samples = recovered_samples.drop("map", axis=1).dropna() + assert 'map' not in recovered_samples.columns for b in batches["batch"].unique().values_host.tolist(): assert b in recovered_samples["batch_id"].values_host.tolist() @@ -143,6 +143,7 @@ def test_bulk_sampler_partitions(dask_client, scratch_dir, mg_input): fanout_vals=[2, 2], with_replacement=False, batches_per_partition=2, + renumber=True, ) batches = cudf.DataFrame(