Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix inconsistent graph properties between the SG and the MG API #3757

Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b118dbc
refactor mg api core function to match sg api
Jul 27, 2023
457a8ef
Merge remote-tracking branch 'upstream/branch-23.08' into branch-23.08
Jul 27, 2023
5e1bb6a
refactor functions, add new attributes and fixmes
Jul 28, 2023
bd8c978
add fixme
Jul 28, 2023
2f91b5f
remove expensive test
Jul 28, 2023
a055f97
Merge remote-tracking branch 'upstream/branch-23.08' into branch-23.08
Jul 28, 2023
82dd74e
fix style
Jul 28, 2023
4f03d88
retrieve weights if they were passed
Jul 31, 2023
8a9453b
update tests
Jul 31, 2023
9c349b4
fix typo
Jul 31, 2023
fdb98e7
remove hardcoded column name
Jul 31, 2023
4a25d00
fix style
Jul 31, 2023
1b1a5d8
Merge remote-tracking branch 'upstream/branch-23.08' into branch-23.0…
Jul 31, 2023
e335d2b
remove (invalid) hardcoded column names
Aug 1, 2023
d03d827
fix style
Aug 1, 2023
7f69c22
remove invalid hardcoded column names, update docstrings, remove debu…
Aug 2, 2023
1fb2422
remove hardcoded column names
Aug 2, 2023
ccd8d17
fix style
Aug 2, 2023
1e9e670
remove outdated fixme
Aug 2, 2023
17d4624
fix typo
Aug 2, 2023
b206f33
properly rename column names
Aug 2, 2023
97c6dd3
fix style
Aug 2, 2023
d6a8cca
add comments
Aug 2, 2023
86eb9b5
fix typo
Aug 2, 2023
140ca33
add test for edge retrieval
Aug 2, 2023
4a1e413
fix style
Aug 2, 2023
6aa8806
fix the edge_list view
Aug 3, 2023
351dc45
remove invalid hardcoded column names and add more tests
Aug 3, 2023
35fa60d
fix style
Aug 3, 2023
578b22e
Merge remote-tracking branch 'upstream/branch-23.08' into branch-23.0…
Aug 3, 2023
56a92c9
remove hardcoded column names
Aug 3, 2023
711a1fe
fix style
Aug 3, 2023
fad3813
unrenumber before extracting the upper triangular part
Aug 3, 2023
b173691
add test for mg graph properties
Aug 3, 2023
7ab383e
fix style
Aug 3, 2023
e06a810
unrenumber before extracting the upper triangular part
Aug 3, 2023
de8b904
rename column name
Aug 3, 2023
928e87c
Merge remote-tracking branch 'upstream/branch-23.08' into branch-23.0…
Aug 3, 2023
e586c38
add multicolumn support for MG when getting the edgelist view
Aug 3, 2023
00eab44
fix typo
Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions python/cugraph/cugraph/structure/graph_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,14 @@ def __init__(self, m_graph=None, directed=False):
if isinstance(m_graph, MultiGraph):
elist = m_graph.view_edge_list()
if m_graph.is_weighted():
weights = "weights"
weights = m_graph.weight_column
else:
weights = None
self.from_cudf_edgelist(
elist, source="src", destination="dst", edge_attr=weights
elist,
source=m_graph.source_columns,
destination=m_graph.destination_columns,
edge_attr=weights,
)
else:
raise TypeError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def __init__(self, properties):
self.properties = simpleDistributedGraphImpl.Properties(properties)
self.source_columns = None
self.destination_columns = None
self.weight_column = None
self.vertex_columns = None

def _make_plc_graph(
sID,
Expand Down Expand Up @@ -175,6 +177,7 @@ def __from_edgelist(
"and destination parameters"
)
ddf_columns = s_col + d_col
self.vertex_columns = ddf_columns.copy()
_client = default_client()
workers = _client.scheduler_info()["workers"]
# Repartition to 2 partitions per GPU for memory efficient process
Expand Down Expand Up @@ -214,10 +217,11 @@ def __from_edgelist(
# The symmetrize step may add additional edges with unknown
# ids and types for an undirected graph. Therefore, only
# directed graphs may be used with ids and types.
# FIXME: Drop the check in symmetrize.py as it is redundant
if len(edge_attr) == 3:
if not self.properties.directed:
raise ValueError(
"User-provided edge ids and edge "
"User-provided edge ids and/or edge "
"types are not permitted for an "
"undirected graph."
)
Expand Down Expand Up @@ -285,6 +289,7 @@ def __from_edgelist(
self.properties.renumber = renumber
self.source_columns = source
self.destination_columns = destination
self.weight_column = weight

# If renumbering is not enabled, this function will only create
# the edgelist_df and not do any renumbering.
Expand Down Expand Up @@ -316,7 +321,6 @@ def __from_edgelist(
ddf = ddf.map_partitions(lambda df: df.copy())
ddf = persist_dask_df_equal_parts_per_worker(ddf, _client)
num_edges = len(ddf)
self._number_of_edges = num_edges
ddf = get_persisted_df_worker_map(ddf, _client)
delayed_tasks_d = {
w: delayed(simpleDistributedGraphImpl._make_plc_graph)(
Expand Down Expand Up @@ -356,6 +360,8 @@ def renumbered(self):

def view_edge_list(self):
"""
FIXME: Should this also return the edge ids and types?

Display the edge list. Compute it if needed.
NOTE: If the graph is of type Graph() then the displayed undirected
edges are the same as displayed by networkx Graph(), but the direction
Expand Down Expand Up @@ -386,7 +392,47 @@ def view_edge_list(self):
"""
if self.edgelist is None:
raise RuntimeError("Graph has no Edgelist.")
return self.edgelist.edgelist_df

edgelist_df = self.input_df
is_string_dtype = False
wgtCol = simpleDistributedGraphImpl.edgeWeightCol
if not self.properties.directed:
srcCol = self.source_columns
dstCol = self.destination_columns
if self.renumber_map.unrenumbered_id_type == "object":
# FIXME: Use the renumbered vertices instead and then un-renumber.
# This operation can be expensive.
is_string_dtype = True
edgelist_df = self.edgelist.edgelist_df
srcCol = self.renumber_map.renumbered_src_col_name
dstCol = self.renumber_map.renumbered_dst_col_name

edgelist_df[srcCol], edgelist_df[dstCol] = edgelist_df[
[srcCol, dstCol]
].min(axis=1), edgelist_df[[srcCol, dstCol]].max(axis=1)

edgelist_df = edgelist_df.groupby(by=[srcCol, dstCol]).sum().reset_index()
if wgtCol in edgelist_df.columns:
# FIXME: This breaks if there are are multi edges as those will
# be dropped during the symmetrization step and the original 'weight'
# will be halved.
edgelist_df[wgtCol] /= 2

if is_string_dtype:
# unrenumber the vertices
edgelist_df = self.renumber_map.unrenumber(edgelist_df, srcCol)
edgelist_df = self.renumber_map.unrenumber(edgelist_df, dstCol)

if self.properties.renumbered:
edgelist_df = edgelist_df.rename(
columns=self.renumber_map.internal_to_external_col_names
)

# If there is no 'wgt' column, nothing will happen
edgelist_df = edgelist_df.rename(columns={wgtCol: self.weight_column})

self.properties.edge_count = len(edgelist_df)
return edgelist_df

def delete_edge_list(self):
"""
Expand All @@ -405,23 +451,7 @@ def number_of_vertices(self):
Get the number of nodes in the graph.
"""
if self.properties.node_count is None:
if self.edgelist is not None:
if self.renumbered is True:
src_col_name = self.renumber_map.renumbered_src_col_name
dst_col_name = self.renumber_map.renumbered_dst_col_name
# FIXME: from_dask_cudf_edgelist() currently requires
# renumber=True for MG, so this else block will not be
# used. Should this else block be removed and added back when
# the restriction is removed?
else:
src_col_name = "src"
dst_col_name = "dst"

ddf = self.edgelist.edgelist_df[[src_col_name, dst_col_name]]
# ddf = self.edgelist.edgelist_df[["src", "dst"]]
self.properties.node_count = ddf.max().max().compute() + 1
else:
raise RuntimeError("Graph is Empty")
self.properties.node_count = len(self.nodes())
return self.properties.node_count

def number_of_nodes(self):
Expand All @@ -434,10 +464,16 @@ def number_of_edges(self, directed_edges=False):
"""
Get the number of edges in the graph.
"""
if self.edgelist is not None:
return self._number_of_edges
else:
raise RuntimeError("Graph is Empty")

if directed_edges and self.edgelist is not None:
return len(self.edgelist.edgelist_df)

if self.properties.edge_count is None:
if self.edgelist is not None:
self.view_edge_list()
else:
raise RuntimeError("Graph is Empty")
return self.properties.edge_count

def in_degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -1021,19 +1057,8 @@ def edges(self):
sources and destinations. It does not return the edge weights.
For viewing edges with weights use view_edge_list()
"""
if self.renumbered is True:
src_col_name = self.renumber_map.renumbered_src_col_name
dst_col_name = self.renumber_map.renumbered_dst_col_name
# FIXME: from_dask_cudf_edgelist() currently requires
# renumber=True for MG, so this else block will not be
# used. Should this else block be removed and added back when
# the restriction is removed?
else:
src_col_name = "src"
dst_col_name = "dst"

# return self.view_edge_list()[["src", "dst"]]
return self.view_edge_list()[[src_col_name, dst_col_name]]
return self.view_edge_list()[self.vertex_columns]

def nodes(self):
"""
Expand All @@ -1045,23 +1070,26 @@ def nodes(self):
a dataframe and do 'renumber_map.unrenumber' or 'G.unrenumber'
"""

if self.renumbered:
# FIXME: This relies on current implementation
# of NumberMap, should not really expose
# this, perhaps add a method to NumberMap
if self.edgelist is not None:
if self.renumbered:
# FIXME: This relies on current implementation
# of NumberMap, should not really expose
# this, perhaps add a method to NumberMap

df = self.renumber_map.implementation.ddf.drop(columns="global_id")
df = self.renumber_map.implementation.ddf.drop(columns="global_id")

if len(df.columns) > 1:
return df
else:
return df[df.columns[0]]
if len(df.columns) > 1:
return df
else:
return df[df.columns[0]]

else:
df = self.input_df
return dask_cudf.concat(
[df[self.source_columns], df[self.destination_columns]]
).drop_duplicates()
else:
df = self.input_df
return dask_cudf.concat(
[df[self.source_columns], df[self.destination_columns]]
).drop_duplicates()
raise RuntimeError("Graph is Empty")

def neighbors(self, n):
if self.edgelist is None:
Expand Down
Loading