Skip to content

Commit

Permalink
Merge branch 'branch-23.08' into branch-23.08-resultsset
Browse files Browse the repository at this point in the history
  • Loading branch information
betochimas authored Aug 8, 2023
2 parents 54258e9 + 62ecea2 commit 8114449
Show file tree
Hide file tree
Showing 23 changed files with 498 additions and 134 deletions.
7 changes: 5 additions & 2 deletions python/cugraph/cugraph/structure/graph_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,14 @@ def __init__(self, m_graph=None, directed=False):
if isinstance(m_graph, MultiGraph):
elist = m_graph.view_edge_list()
if m_graph.is_weighted():
weights = "weights"
weights = m_graph.weight_column
else:
weights = None
self.from_cudf_edgelist(
elist, source="src", destination="dst", edge_attr=weights
elist,
source=m_graph.source_columns,
destination=m_graph.destination_columns,
edge_attr=weights,
)
else:
raise TypeError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def __init__(self, properties):
self.properties = simpleDistributedGraphImpl.Properties(properties)
self.source_columns = None
self.destination_columns = None
self.weight_column = None
self.vertex_columns = None

def _make_plc_graph(
sID,
Expand Down Expand Up @@ -175,6 +177,7 @@ def __from_edgelist(
"and destination parameters"
)
ddf_columns = s_col + d_col
self.vertex_columns = ddf_columns.copy()
_client = default_client()
workers = _client.scheduler_info()["workers"]
# Repartition to 2 partitions per GPU for memory efficient process
Expand Down Expand Up @@ -214,10 +217,11 @@ def __from_edgelist(
# The symmetrize step may add additional edges with unknown
# ids and types for an undirected graph. Therefore, only
# directed graphs may be used with ids and types.
# FIXME: Drop the check in symmetrize.py as it is redundant
if len(edge_attr) == 3:
if not self.properties.directed:
raise ValueError(
"User-provided edge ids and edge "
"User-provided edge ids and/or edge "
"types are not permitted for an "
"undirected graph."
)
Expand Down Expand Up @@ -285,6 +289,7 @@ def __from_edgelist(
self.properties.renumber = renumber
self.source_columns = source
self.destination_columns = destination
self.weight_column = weight

# If renumbering is not enabled, this function will only create
# the edgelist_df and not do any renumbering.
Expand Down Expand Up @@ -316,7 +321,6 @@ def __from_edgelist(
ddf = ddf.map_partitions(lambda df: df.copy())
ddf = persist_dask_df_equal_parts_per_worker(ddf, _client)
num_edges = len(ddf)
self._number_of_edges = num_edges
ddf = get_persisted_df_worker_map(ddf, _client)
delayed_tasks_d = {
w: delayed(simpleDistributedGraphImpl._make_plc_graph)(
Expand Down Expand Up @@ -356,6 +360,8 @@ def renumbered(self):

def view_edge_list(self):
"""
FIXME: Should this also return the edge ids and types?
Display the edge list. Compute it if needed.
NOTE: If the graph is of type Graph() then the displayed undirected
edges are the same as displayed by networkx Graph(), but the direction
Expand Down Expand Up @@ -386,7 +392,59 @@ def view_edge_list(self):
"""
if self.edgelist is None:
raise RuntimeError("Graph has no Edgelist.")
return self.edgelist.edgelist_df

edgelist_df = self.input_df
is_string_dtype = False
is_multi_column = False
wgtCol = simpleDistributedGraphImpl.edgeWeightCol
if not self.properties.directed:
srcCol = self.source_columns
dstCol = self.destination_columns
if self.renumber_map.unrenumbered_id_type == "object":
# FIXME: Use the renumbered vertices instead and then un-renumber.
# This operation can be expensive.
is_string_dtype = True
edgelist_df = self.edgelist.edgelist_df
srcCol = self.renumber_map.renumbered_src_col_name
dstCol = self.renumber_map.renumbered_dst_col_name

if isinstance(srcCol, list):
srcCol = self.renumber_map.renumbered_src_col_name
dstCol = self.renumber_map.renumbered_dst_col_name
edgelist_df = self.edgelist.edgelist_df
# unrenumber before extracting the upper triangular part
if len(self.source_columns) == 1:
edgelist_df = self.renumber_map.unrenumber(edgelist_df, srcCol)
edgelist_df = self.renumber_map.unrenumber(edgelist_df, dstCol)
else:
is_multi_column = True

edgelist_df[srcCol], edgelist_df[dstCol] = edgelist_df[
[srcCol, dstCol]
].min(axis=1), edgelist_df[[srcCol, dstCol]].max(axis=1)

edgelist_df = edgelist_df.groupby(by=[srcCol, dstCol]).sum().reset_index()
if wgtCol in edgelist_df.columns:
# FIXME: This breaks if there are are multi edges as those will
# be dropped during the symmetrization step and the original 'weight'
# will be halved.
edgelist_df[wgtCol] /= 2

if is_string_dtype or is_multi_column:
# unrenumber the vertices
edgelist_df = self.renumber_map.unrenumber(edgelist_df, srcCol)
edgelist_df = self.renumber_map.unrenumber(edgelist_df, dstCol)

if self.properties.renumbered:
edgelist_df = edgelist_df.rename(
columns=self.renumber_map.internal_to_external_col_names
)

# If there is no 'wgt' column, nothing will happen
edgelist_df = edgelist_df.rename(columns={wgtCol: self.weight_column})

self.properties.edge_count = len(edgelist_df)
return edgelist_df

def delete_edge_list(self):
"""
Expand All @@ -405,23 +463,7 @@ def number_of_vertices(self):
Get the number of nodes in the graph.
"""
if self.properties.node_count is None:
if self.edgelist is not None:
if self.renumbered is True:
src_col_name = self.renumber_map.renumbered_src_col_name
dst_col_name = self.renumber_map.renumbered_dst_col_name
# FIXME: from_dask_cudf_edgelist() currently requires
# renumber=True for MG, so this else block will not be
# used. Should this else block be removed and added back when
# the restriction is removed?
else:
src_col_name = "src"
dst_col_name = "dst"

ddf = self.edgelist.edgelist_df[[src_col_name, dst_col_name]]
# ddf = self.edgelist.edgelist_df[["src", "dst"]]
self.properties.node_count = ddf.max().max().compute() + 1
else:
raise RuntimeError("Graph is Empty")
self.properties.node_count = len(self.nodes())
return self.properties.node_count

def number_of_nodes(self):
Expand All @@ -434,10 +476,16 @@ def number_of_edges(self, directed_edges=False):
"""
Get the number of edges in the graph.
"""
if self.edgelist is not None:
return self._number_of_edges
else:
raise RuntimeError("Graph is Empty")

if directed_edges and self.edgelist is not None:
return len(self.edgelist.edgelist_df)

if self.properties.edge_count is None:
if self.edgelist is not None:
self.view_edge_list()
else:
raise RuntimeError("Graph is Empty")
return self.properties.edge_count

def in_degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -1021,19 +1069,8 @@ def edges(self):
sources and destinations. It does not return the edge weights.
For viewing edges with weights use view_edge_list()
"""
if self.renumbered is True:
src_col_name = self.renumber_map.renumbered_src_col_name
dst_col_name = self.renumber_map.renumbered_dst_col_name
# FIXME: from_dask_cudf_edgelist() currently requires
# renumber=True for MG, so this else block will not be
# used. Should this else block be removed and added back when
# the restriction is removed?
else:
src_col_name = "src"
dst_col_name = "dst"

# return self.view_edge_list()[["src", "dst"]]
return self.view_edge_list()[[src_col_name, dst_col_name]]
return self.view_edge_list()[self.vertex_columns]

def nodes(self):
"""
Expand All @@ -1045,23 +1082,26 @@ def nodes(self):
a dataframe and do 'renumber_map.unrenumber' or 'G.unrenumber'
"""

if self.renumbered:
# FIXME: This relies on current implementation
# of NumberMap, should not really expose
# this, perhaps add a method to NumberMap
if self.edgelist is not None:
if self.renumbered:
# FIXME: This relies on current implementation
# of NumberMap, should not really expose
# this, perhaps add a method to NumberMap

df = self.renumber_map.implementation.ddf.drop(columns="global_id")
df = self.renumber_map.implementation.ddf.drop(columns="global_id")

if len(df.columns) > 1:
return df
else:
return df[df.columns[0]]
if len(df.columns) > 1:
return df
else:
return df[df.columns[0]]

else:
df = self.input_df
return dask_cudf.concat(
[df[self.source_columns], df[self.destination_columns]]
).drop_duplicates()
else:
df = self.input_df
return dask_cudf.concat(
[df[self.source_columns], df[self.destination_columns]]
).drop_duplicates()
raise RuntimeError("Graph is Empty")

def neighbors(self, n):
if self.edgelist is None:
Expand Down
Loading

0 comments on commit 8114449

Please sign in to comment.