From b1a5ed04782c41407d8fff8a3598834c4b1ebbc0 Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Thu, 23 May 2024 21:35:28 -0700 Subject: [PATCH 01/10] Call new replicate_edgelist function --- .../cugraph/structure/graph_implementation/simpleGraph.py | 7 ++++--- python/cugraph/cugraph/structure/replicate_edgelist.py | 2 +- .../tests/centrality/test_betweenness_centrality.py | 5 +++++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py index c90607f9bf6..fa3de8db327 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py @@ -12,6 +12,7 @@ # limitations under the License. from cugraph.structure import graph_primtypes_wrapper +from cugraph.structure.replicate_edgelist import replicate_cudf_dataframe from cugraph.structure.symmetrize import symmetrize from cugraph.structure.number_map import NumberMap import cugraph.dask.common.mg_utils as mg_utils @@ -685,11 +686,11 @@ def _replicate_edgelist(self): # FIXME: There might be a better way to control it if client is None: return - work_futures = replication.replicate_cudf_dataframe( - self.edgelist.edgelist_df, client=client, comms=comms + res = replicate_cudf_dataframe( + self.edgelist.edgelist_df ) - self.batch_edgelists = work_futures + self.batch_edgelists = res def _replicate_adjlist(self): client = mg_utils.get_client() diff --git a/python/cugraph/cugraph/structure/replicate_edgelist.py b/python/cugraph/cugraph/structure/replicate_edgelist.py index d413e50e485..04365d4a8d6 100644 --- a/python/cugraph/cugraph/structure/replicate_edgelist.py +++ b/python/cugraph/cugraph/structure/replicate_edgelist.py @@ -269,7 +269,6 @@ def replicate_cudf_dataframe(cudf_dataframe): ) _client = default_client() - if not isinstance(cudf_dataframe, dask_cudf.DataFrame): if isinstance(cudf_dataframe, cudf.DataFrame): df = dask_cudf.from_cudf( @@ -291,6 +290,7 @@ def replicate_cudf_dataframe(cudf_dataframe): Comms.get_session_id(), df, "dataframe", + cudf_dataframe.columns ) return ddf diff --git a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py index 6d1f53f7fc3..dcd8cfd5618 100644 --- a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py +++ b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py @@ -117,6 +117,11 @@ def calc_betweenness_centrality( ignore_weights=not edgevals, ) if multi_gpu_batch: + # graph_file.get_dask_edgelist() + # G = graph_file.get_dask_graph( + # create_using=cugraph.Graph(directed=directed), + # ignore_weights=not edgevals, + # ) G.enable_batch() M = G.to_pandas_edgelist().rename( From 71f3ea740458a30304139785e6940853476e0eb1 Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Fri, 24 May 2024 11:15:39 -0700 Subject: [PATCH 02/10] Fix FutureWarning when accessing Series --- python/cugraph/cugraph/dask/community/induced_subgraph.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/cugraph/cugraph/dask/community/induced_subgraph.py b/python/cugraph/cugraph/dask/community/induced_subgraph.py index d079bcaf653..d24c9b883ac 100644 --- a/python/cugraph/cugraph/dask/community/induced_subgraph.py +++ b/python/cugraph/cugraph/dask/community/induced_subgraph.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -151,9 +151,9 @@ def induced_subgraph( # renumbered, the node ID must also be renumbered. if input_graph.renumbered: vertices = input_graph.lookup_internal_vertex_id(vertices) - vertices_type = input_graph.edgelist.edgelist_df.dtypes[0] + vertices_type = input_graph.edgelist.edgelist_df.dtypes.iloc[0] else: - vertices_type = input_graph.input_df.dtypes[0] + vertices_type = input_graph.input_df.dtypes.iloc[0] if isinstance(vertices, (cudf.Series, cudf.DataFrame)): vertices = dask_cudf.from_cudf(vertices, npartitions=input_graph._npartitions) From a3b2e23b9634d8b9e9a188d8074e2fd6b9a180f6 Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Fri, 24 May 2024 21:46:17 -0700 Subject: [PATCH 03/10] Update core --- python/cugraph/cugraph/tests/core/test_core_number_mg.py | 1 + python/cugraph/cugraph/tests/core/test_k_core_mg.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/tests/core/test_core_number_mg.py b/python/cugraph/cugraph/tests/core/test_core_number_mg.py index 3d9a7bef5be..13ee654fbf0 100644 --- a/python/cugraph/cugraph/tests/core/test_core_number_mg.py +++ b/python/cugraph/cugraph/tests/core/test_core_number_mg.py @@ -70,6 +70,7 @@ def test_sg_core_number(dask_client, dataset, degree_type, benchmark): @pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("degree_type", DEGREE_TYPE) def test_core_number(dask_client, dataset, degree_type, benchmark): + dataset.get_dask_edgelist(download=True) dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) result_core_number = benchmark(dcg.core_number, dg, degree_type) diff --git a/python/cugraph/cugraph/tests/core/test_k_core_mg.py b/python/cugraph/cugraph/tests/core/test_k_core_mg.py index c7ad6d2d41d..7ee7c7fcd42 100644 --- a/python/cugraph/cugraph/tests/core/test_k_core_mg.py +++ b/python/cugraph/cugraph/tests/core/test_k_core_mg.py @@ -48,6 +48,7 @@ def setup_function(): def get_sg_results(dataset, core_number, degree_type): + dataset.get_edgelist(download=True) G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) if core_number: @@ -105,7 +106,7 @@ def test_dask_mg_k_core(dask_client, dataset, core_number, degree_type, benchmar expected_k_core_results, core_number = get_sg_results( dataset, core_number, degree_type ) - + dataset.get_dask_edgelist() dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) k_core_results = benchmark(dcg.k_core, dg, core_number=core_number) k_core_results = ( @@ -123,6 +124,7 @@ def test_dask_mg_k_core(dask_client, dataset, core_number, degree_type, benchmar @pytest.mark.mg def test_dask_mg_k_core_invalid_input(dask_client): dataset = DATASETS[0] + dataset.get_dask_edgelist(download=True) dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=True)) with pytest.raises(ValueError): From aa70ae2e943c14e55fd712ce496167276d0cc47c Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Fri, 24 May 2024 21:46:37 -0700 Subject: [PATCH 04/10] Update dask k_core.py --- python/cugraph/cugraph/dask/cores/k_core.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/python/cugraph/cugraph/dask/cores/k_core.py b/python/cugraph/cugraph/dask/cores/k_core.py index 0d799e3ee06..909beb8c367 100644 --- a/python/cugraph/cugraph/dask/cores/k_core.py +++ b/python/cugraph/cugraph/dask/cores/k_core.py @@ -73,7 +73,7 @@ def k_core(input_graph, k=None, core_number=None, degree_type="bidirectional"): on input, output, or both directed edges, with valid values being "incoming", "outgoing", and "bidirectional" respectively. - core_number : cudf.DataFrame or das_cudf.DataFrame, optional (default=None) + core_number : cudf.DataFrame or dask_cudf.DataFrame, optional (default=None) Precomputed core number of the nodes of the graph G containing two cudf.Series of size V: the vertex identifiers and the corresponding core number values. If set to None, the core numbers of the nodes are @@ -131,31 +131,25 @@ def k_core(input_graph, k=None, core_number=None, degree_type="bidirectional"): core_number = dcg.core_number(input_graph) if input_graph.renumbered is True: - if len(input_graph.renumber_map.implementation.col_names) > 1: cols = core_number.columns[:-1].to_list() else: cols = "vertex" - core_number = input_graph.add_internal_vertex_id( core_number, "vertex", cols ) - if not isinstance(core_number, dask_cudf.DataFrame): if isinstance(core_number, cudf.DataFrame): # convert to dask_cudf in order to distribute the edges core_number = dask_cudf.from_cudf(core_number, input_graph._npartitions) - else: raise TypeError( f"'core_number' must be either None or of" f"type cudf/dask_cudf, got: {type(core_number)}" ) - core_number = core_number.rename(columns={"core_number": "values"}) if k is None: k = core_number["values"].max().compute() - core_number = get_distributed_data(core_number) wait(core_number) core_number = core_number.worker_to_parts From 042d2db0f9d8710c9c71135b6df8b6dc551e645d Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Wed, 29 May 2024 08:22:51 -0700 Subject: [PATCH 05/10] Pre-load MG edeglist for core tests --- python/cugraph/cugraph/tests/core/test_core_number_mg.py | 2 +- python/cugraph/cugraph/tests/core/test_k_core_mg.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/cugraph/cugraph/tests/core/test_core_number_mg.py b/python/cugraph/cugraph/tests/core/test_core_number_mg.py index 13ee654fbf0..86c21a99661 100644 --- a/python/cugraph/cugraph/tests/core/test_core_number_mg.py +++ b/python/cugraph/cugraph/tests/core/test_core_number_mg.py @@ -70,7 +70,7 @@ def test_sg_core_number(dask_client, dataset, degree_type, benchmark): @pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("degree_type", DEGREE_TYPE) def test_core_number(dask_client, dataset, degree_type, benchmark): - dataset.get_dask_edgelist(download=True) + dataset.get_dask_edgelist(download=True) # reload with MG edgelist dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) result_core_number = benchmark(dcg.core_number, dg, degree_type) diff --git a/python/cugraph/cugraph/tests/core/test_k_core_mg.py b/python/cugraph/cugraph/tests/core/test_k_core_mg.py index 7ee7c7fcd42..3c06f953bff 100644 --- a/python/cugraph/cugraph/tests/core/test_k_core_mg.py +++ b/python/cugraph/cugraph/tests/core/test_k_core_mg.py @@ -106,7 +106,7 @@ def test_dask_mg_k_core(dask_client, dataset, core_number, degree_type, benchmar expected_k_core_results, core_number = get_sg_results( dataset, core_number, degree_type ) - dataset.get_dask_edgelist() + dataset.get_dask_edgelist() # reload with MG edgelist dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) k_core_results = benchmark(dcg.k_core, dg, core_number=core_number) k_core_results = ( @@ -124,7 +124,7 @@ def test_dask_mg_k_core(dask_client, dataset, core_number, degree_type, benchmar @pytest.mark.mg def test_dask_mg_k_core_invalid_input(dask_client): dataset = DATASETS[0] - dataset.get_dask_edgelist(download=True) + dataset.get_dask_edgelist(download=True) # reload with MG edgelist dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=True)) with pytest.raises(ValueError): From 0e0f2d079639d8b486b81b519ac1940eadde040b Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Wed, 29 May 2024 14:04:42 -0700 Subject: [PATCH 06/10] fillna should be called on the edge weight columns --- python/cugraph/cugraph/dask/structure/mg_property_graph.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index dafa198b6f6..57c49d1e75f 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -1123,8 +1123,7 @@ def fillna_edges(self, val=0): Series is passed, the index or keys are the columns to fill and the values are the fill value for the corresponding column. """ - - self.__edge_prop_dataframe = self.__edge_prop_dataframe.fillna(val).persist() + self.__edge_prop_dataframe['val'] = self.__edge_prop_dataframe['val'].fillna(val).persist() def select_vertices(self, expr, from_previous_selection=None): raise NotImplementedError From e4e70440e7e05a03f3d3afbd624edad8fbf2e0ab Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Wed, 29 May 2024 14:05:10 -0700 Subject: [PATCH 07/10] Prevent null column access --- .../cugraph/cugraph/tests/data_store/test_property_graph_mg.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/tests/data_store/test_property_graph_mg.py b/python/cugraph/cugraph/tests/data_store/test_property_graph_mg.py index 42cb0f232bf..2d0197ca0c4 100644 --- a/python/cugraph/cugraph/tests/data_store/test_property_graph_mg.py +++ b/python/cugraph/cugraph/tests/data_store/test_property_graph_mg.py @@ -1042,7 +1042,8 @@ def test_add_data_noncontiguous(dask_client, set_index): for edge_type in ["cat", "dog", "pig"]: cur_df = df[df.edge_type == edge_type] if set_index: - cur_df = cur_df.set_index("vertex") + cur_df['ind_vertex'] = cur_df['vertex'] + cur_df = cur_df.set_index("ind_vertex") pG.add_vertex_data(cur_df, vertex_col_name="vertex", type_name=edge_type) for edge_type in ["cat", "dog", "pig"]: cur_df = pG.get_vertex_data(types=edge_type).compute() From 826b86ba1393c1759d7c1ddf75701dd198c5ca02 Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Tue, 4 Jun 2024 07:46:14 -0700 Subject: [PATCH 08/10] Remove old test comments --- .../cugraph/tests/centrality/test_betweenness_centrality.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py index dcd8cfd5618..6d1f53f7fc3 100644 --- a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py +++ b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py @@ -117,11 +117,6 @@ def calc_betweenness_centrality( ignore_weights=not edgevals, ) if multi_gpu_batch: - # graph_file.get_dask_edgelist() - # G = graph_file.get_dask_graph( - # create_using=cugraph.Graph(directed=directed), - # ignore_weights=not edgevals, - # ) G.enable_batch() M = G.to_pandas_edgelist().rename( From 4a44f2f53554d27c0593c17519da56f25a1bdfb7 Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Tue, 4 Jun 2024 11:21:44 -0700 Subject: [PATCH 09/10] Update replicated edgelist test after using new API --- python/cugraph/cugraph/tests/utils/test_replication_mg.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cugraph/cugraph/tests/utils/test_replication_mg.py b/python/cugraph/cugraph/tests/utils/test_replication_mg.py index 2f9c0d0189b..13089bc238b 100644 --- a/python/cugraph/cugraph/tests/utils/test_replication_mg.py +++ b/python/cugraph/cugraph/tests/utils/test_replication_mg.py @@ -200,8 +200,8 @@ def test_enable_batch_edgelist_replication(graph_file, directed, dask_client): G = utils.generate_cugraph_graph_from_file(graph_file, directed) G.enable_batch() df = G.edgelist.edgelist_df - for worker in G.batch_edgelists: - replicated_df = G.batch_edgelists[worker].result() + for i in range(G.batch_edgelists.npartitions): + replicated_df = G.batch_edgelists.get_partition(i).compute() assert_frame_equal(df, replicated_df) From 206232c1b6c5536a32a91e887bd24d3f33b4b66d Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Tue, 4 Jun 2024 11:29:15 -0700 Subject: [PATCH 10/10] Style fix --- python/cugraph/cugraph/dask/structure/mg_property_graph.py | 6 ++++-- python/cugraph/cugraph/tests/core/test_core_number_mg.py | 2 +- python/cugraph/cugraph/tests/core/test_k_core_mg.py | 4 ++-- .../cugraph/tests/data_store/test_property_graph_mg.py | 2 +- python/cugraph/cugraph/tests/utils/test_replication_mg.py | 2 +- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index 57c49d1e75f..c02fa0596d4 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -1123,7 +1123,9 @@ def fillna_edges(self, val=0): Series is passed, the index or keys are the columns to fill and the values are the fill value for the corresponding column. """ - self.__edge_prop_dataframe['val'] = self.__edge_prop_dataframe['val'].fillna(val).persist() + self.__edge_prop_dataframe["val"] = ( + self.__edge_prop_dataframe["val"].fillna(val).persist() + ) def select_vertices(self, expr, from_previous_selection=None): raise NotImplementedError diff --git a/python/cugraph/cugraph/tests/core/test_core_number_mg.py b/python/cugraph/cugraph/tests/core/test_core_number_mg.py index 86c21a99661..1138c1dc488 100644 --- a/python/cugraph/cugraph/tests/core/test_core_number_mg.py +++ b/python/cugraph/cugraph/tests/core/test_core_number_mg.py @@ -70,7 +70,7 @@ def test_sg_core_number(dask_client, dataset, degree_type, benchmark): @pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("degree_type", DEGREE_TYPE) def test_core_number(dask_client, dataset, degree_type, benchmark): - dataset.get_dask_edgelist(download=True) # reload with MG edgelist + dataset.get_dask_edgelist(download=True) # reload with MG edgelist dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) result_core_number = benchmark(dcg.core_number, dg, degree_type) diff --git a/python/cugraph/cugraph/tests/core/test_k_core_mg.py b/python/cugraph/cugraph/tests/core/test_k_core_mg.py index 3c06f953bff..3257269c26a 100644 --- a/python/cugraph/cugraph/tests/core/test_k_core_mg.py +++ b/python/cugraph/cugraph/tests/core/test_k_core_mg.py @@ -106,7 +106,7 @@ def test_dask_mg_k_core(dask_client, dataset, core_number, degree_type, benchmar expected_k_core_results, core_number = get_sg_results( dataset, core_number, degree_type ) - dataset.get_dask_edgelist() # reload with MG edgelist + dataset.get_dask_edgelist() # reload with MG edgelist dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) k_core_results = benchmark(dcg.k_core, dg, core_number=core_number) k_core_results = ( @@ -124,7 +124,7 @@ def test_dask_mg_k_core(dask_client, dataset, core_number, degree_type, benchmar @pytest.mark.mg def test_dask_mg_k_core_invalid_input(dask_client): dataset = DATASETS[0] - dataset.get_dask_edgelist(download=True) # reload with MG edgelist + dataset.get_dask_edgelist(download=True) # reload with MG edgelist dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=True)) with pytest.raises(ValueError): diff --git a/python/cugraph/cugraph/tests/data_store/test_property_graph_mg.py b/python/cugraph/cugraph/tests/data_store/test_property_graph_mg.py index 2d0197ca0c4..350f5069f11 100644 --- a/python/cugraph/cugraph/tests/data_store/test_property_graph_mg.py +++ b/python/cugraph/cugraph/tests/data_store/test_property_graph_mg.py @@ -1042,7 +1042,7 @@ def test_add_data_noncontiguous(dask_client, set_index): for edge_type in ["cat", "dog", "pig"]: cur_df = df[df.edge_type == edge_type] if set_index: - cur_df['ind_vertex'] = cur_df['vertex'] + cur_df["ind_vertex"] = cur_df["vertex"] cur_df = cur_df.set_index("ind_vertex") pG.add_vertex_data(cur_df, vertex_col_name="vertex", type_name=edge_type) for edge_type in ["cat", "dog", "pig"]: diff --git a/python/cugraph/cugraph/tests/utils/test_replication_mg.py b/python/cugraph/cugraph/tests/utils/test_replication_mg.py index 13089bc238b..a0ebee34335 100644 --- a/python/cugraph/cugraph/tests/utils/test_replication_mg.py +++ b/python/cugraph/cugraph/tests/utils/test_replication_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at