Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added get_vertex_data() and get_edge_data() to SG/MG PropertyGraph #2444

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0913be8
Added options to extract_subgraph() to bypass renumbering and adding …
rlratzel Jul 15, 2022
b7b7a7d
flake8 fixes.
rlratzel Jul 15, 2022
9998a95
Added code and tests for PG.num_vertices_with_properties attribute, w…
rlratzel Jul 16, 2022
eb7d928
Added code and test for handling no vertex data when accessing num_ve…
rlratzel Jul 16, 2022
c5186a3
Updated MG PropertyGraph to return edges with edge ID and added tests…
rlratzel Jul 19, 2022
f5b84a1
Updated tests for changes to edge attrs.
rlratzel Jul 21, 2022
bb9fdbe
Merge remote-tracking branch 'upstream/branch-22.08' into branch-22.0…
rlratzel Jul 25, 2022
9ec831d
Updated code and tests for get_vertex_data() and get_edge_data() for …
rlratzel Jul 25, 2022
2c5810d
Merge remote-tracking branch 'upstream/branch-22.08' into branch-22.0…
rlratzel Jul 25, 2022
d7a0e08
Fixed edge ID generation off-by-one problem, make SG and MG edge ID g…
rlratzel Jul 25, 2022
56e6373
Added tests for get_*_data() for empty graphs.
rlratzel Jul 26, 2022
1021cdd
flake8 fixes.
rlratzel Jul 26, 2022
b9e639d
Added support for types and columns to get_*_data() APIs, updated tes…
rlratzel Jul 26, 2022
d2e20ce
Merge remote-tracking branch 'upstream/branch-22.08' into branch-22.0…
rlratzel Jul 28, 2022
09b3d59
Fixed bug handling the optional columns arg to get_*_data(), fixed te…
rlratzel Jul 28, 2022
7141aef
Merge remote-tracking branch 'upstream/branch-22.08' into branch-22.0…
rlratzel Jul 28, 2022
4bb0106
Updated to restrict PG.extract_subgraph to always renumber due to cur…
rlratzel Jul 28, 2022
552de65
Efficiency improvements to edge ID generation based on review feedback.
rlratzel Jul 29, 2022
4f2679b
Merge remote-tracking branch 'upstream/branch-22.08' into branch-22.0…
rlratzel Jul 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 86 additions & 24 deletions python/cugraph/cugraph/dask/structure/mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,38 @@ def add_vertex_data(self,
for n in self.__vertex_prop_dataframe.columns])
self.__vertex_prop_eval_dict.update(latest)

def get_vertex_data(self, vertex_ids=None, types=None, columns=None):
"""
Return a dataframe containing vertex properties for only the specified
vertex_ids, columns, and/or types, or all vertex IDs if not specified.
"""
if self.__vertex_prop_dataframe is not None:
if vertex_ids is not None:
df_mask = (
self.__vertex_prop_dataframe[self.vertex_col_name]
.isin(vertex_ids)
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
)
df = self.__vertex_prop_dataframe.loc[df_mask]
else:
df = self.__vertex_prop_dataframe

if types is not None:
# FIXME: coerce types to a list-like if not?
df_mask = df[self.type_col_name].isin(types)
df = df.loc[df_mask]

# The "internal" pG.vertex_col_name and pG.type_col_name columns
# are also included/added since they are assumed to be needed by
# the caller.
if columns is None:
return df
else:
# FIXME: invalid columns will result in a KeyError, should a
# check be done here and a more PG-specific error raised?
return df[[self.vertex_col_name, self.type_col_name] + columns]

return None

def add_edge_data(self,
dataframe,
vertex_col_names,
Expand Down Expand Up @@ -411,22 +443,20 @@ def add_edge_data(self,
# columns. The copied DataFrame is then merged (another copy) and then
# deleted when out-of-scope.
tmp_df = dataframe.copy()
# FIXME: Find a better way to create the edge id
prev_eid = -1 if self.__last_edge_id is None else self.__last_edge_id
tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]]
tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]]
tmp_df[self.type_col_name] = type_name

# Add unique edge IDs to the new rows
prev_eid = -1 if self.__last_edge_id is None else self.__last_edge_id
starting_eid = prev_eid + 1
data_size = len(tmp_df.compute().index)
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
cudf_series = \
cudf.Series(range(starting_eid, starting_eid + data_size))
dask_series =\
dask_cudf.from_cudf(cudf_series, self.__num_workers)
dask_series = dask_series.reset_index(drop=True)
self.__last_edge_id = starting_eid + data_size
tmp_df = tmp_df.reset_index(drop=True)
tmp_df[self.edge_id_col_name] = dask_series
tmp_df[self.type_col_name] = type_name
new_eids = cudf.Series(range(starting_eid, starting_eid + data_size))
new_eids_dask = dask_cudf.from_cudf(new_eids, self.__num_workers)
self.__last_edge_id = starting_eid + data_size - 1
tmp_df[self.edge_id_col_name] = new_eids_dask
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
tmp_df.persist()

if property_columns:
# all columns
column_names_to_drop = set(tmp_df.columns)
Expand All @@ -441,13 +471,50 @@ def add_edge_data(self,
new_col_info = self.__get_new_column_dtypes(
tmp_df, self.__edge_prop_dataframe)
self.__edge_prop_dtypes.update(new_col_info)

self.__edge_prop_dataframe = \
self.__edge_prop_dataframe.merge(tmp_df, how="outer")

# Update the vertex eval dict with the latest column instances
latest = dict([(n, self.__edge_prop_dataframe[n])
for n in self.__edge_prop_dataframe.columns])
self.__edge_prop_eval_dict.update(latest)

def get_edge_data(self, edge_ids=None, types=None, columns=None):
"""
Return a dataframe containing edge properties for only the specified
edge_ids, columns, and/or edge type, or all edge IDs if not specified.
"""
if self.__edge_prop_dataframe is not None:
if edge_ids is not None:
df_mask = self.__edge_prop_dataframe[self.edge_id_col_name]\
.isin(edge_ids)
df = self.__edge_prop_dataframe.loc[df_mask]
else:
df = self.__edge_prop_dataframe

if types is not None:
# FIXME: coerce types to a list-like if not?
df_mask = df[self.type_col_name].isin(types)
df = df.loc[df_mask]

# The "internal" src, dst, edge_id, and type columns are also
# included/added since they are assumed to be needed by the caller.
if columns is None:
# remove the "internal" weight column if one was added
all_columns = list(self.__edge_prop_dataframe.columns)
if self.weight_col_name in all_columns:
all_columns.remove(self.weight_col_name)
return df[all_columns]
else:
# FIXME: invalid columns will result in a KeyError, should a
# check be done here and a more PG-specific error raised?
return df[[self.src_col_name, self.dst_col_name,
self.edge_id_col_name, self.type_col_name]
+ columns]

return None

def select_vertices(self, expr, from_previous_selection=None):
raise NotImplementedError

Expand Down Expand Up @@ -667,13 +734,14 @@ def edge_props_to_graph(self,
# FIXME: MNMG Graphs required renumber to be True due to requirements
# on legacy code that needed segment offsets, partition offsets,
# etc. which were previously computed during the "legacy" C
# renumbering. The workaround is to pass renumber=True, then manually
# call G.compute_renumber_edge_list(legacy_renum_only=True) to compute
# the required meta-data without changing vertex IDs.
# renumbering. The workaround is to always pass renumber=True, but set
# legacy_renum_only based on if actual renumbering (ie. changing the
# vertex IDs) should happen or not.
renumber = True
if renumber_graph is False:
renumber = True
legacy_renum_only = True
else:
renumber = renumber_graph
legacy_renum_only = False
rlratzel marked this conversation as resolved.
Show resolved Hide resolved

col_names = [self.src_col_name, self.dst_col_name]
if edge_attr is not None:
Expand All @@ -683,14 +751,8 @@ def edge_props_to_graph(self,
source=self.src_col_name,
destination=self.dst_col_name,
edge_attr=edge_attr,
renumber=renumber)
# FIXME: see FIXME above - to generate the edgelist,
# compute_renumber_edge_list() must be called, but legacy mode needs to
# be used based on if renumbering was to be done or not.
if renumber_graph is False:
G.compute_renumber_edge_list(legacy_renum_only=True)
else:
G.compute_renumber_edge_list(legacy_renum_only=False)
renumber=renumber,
legacy_renum_only=legacy_renum_only)

if add_edge_data:
# Set the edge_data on the resulting Graph to a DataFrame
Expand Down
102 changes: 78 additions & 24 deletions python/cugraph/cugraph/structure/property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def num_edges(self):
def edges(self):
if self.__edge_prop_dataframe is not None:
return self.__edge_prop_dataframe[[self.src_col_name,
self.dst_col_name]]
self.dst_col_name,
self.edge_id_col_name]]
return None

@property
Expand Down Expand Up @@ -346,6 +347,38 @@ def add_vertex_data(self,
for n in self.__vertex_prop_dataframe.columns])
self.__vertex_prop_eval_dict.update(latest)

def get_vertex_data(self, vertex_ids=None, types=None, columns=None):
"""
Return a dataframe containing vertex properties for only the specified
vertex_ids, columns, and/or types, or all vertex IDs if not specified.
"""
if self.__vertex_prop_dataframe is not None:
if vertex_ids is not None:
df_mask = (
self.__vertex_prop_dataframe[self.vertex_col_name]
.isin(vertex_ids)
)
df = self.__vertex_prop_dataframe.loc[df_mask]
else:
df = self.__vertex_prop_dataframe

if types is not None:
# FIXME: coerce types to a list-like if not?
df_mask = df[self.type_col_name].isin(types)
df = df.loc[df_mask]

# The "internal" pG.vertex_col_name and pG.type_col_name columns
# are also included/added since they are assumed to be needed by
# the caller.
if columns is None:
return df
else:
# FIXME: invalid columns will result in a KeyError, should a
# check be done here and a more PG-specific error raised?
return df[[self.vertex_col_name, self.type_col_name] + columns]

return None

def add_edge_data(self,
dataframe,
vertex_col_names,
Expand Down Expand Up @@ -442,9 +475,17 @@ def add_edge_data(self,
tmp_df = dataframe.copy(deep=True)
tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]]
tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]]
# FIXME: handle case of a type_name column already being in tmp_df
tmp_df[self.type_col_name] = type_name

# Add unique edge IDs to the new rows
prev_eid = -1 if self.__last_edge_id is None else self.__last_edge_id
starting_eid = prev_eid + 1
data_size = len(tmp_df.index)
new_eids = self.__series_type(range(starting_eid,
starting_eid + data_size))
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
self.__last_edge_id = starting_eid + data_size - 1
tmp_df[self.edge_id_col_name] = new_eids

if property_columns:
# all columns
column_names_to_drop = set(tmp_df.columns)
Expand All @@ -463,13 +504,46 @@ def add_edge_data(self,
self.__edge_prop_dataframe = \
self.__edge_prop_dataframe.merge(tmp_df, how="outer")

self.__add_edge_ids()

# Update the vertex eval dict with the latest column instances
latest = dict([(n, self.__edge_prop_dataframe[n])
for n in self.__edge_prop_dataframe.columns])
self.__edge_prop_eval_dict.update(latest)

def get_edge_data(self, edge_ids=None, types=None, columns=None):
"""
Return a dataframe containing edge properties for only the specified
edge_ids, columns, and/or edge type, or all edge IDs if not specified.
"""
if self.__edge_prop_dataframe is not None:
if edge_ids is not None:
df_mask = self.__edge_prop_dataframe[self.edge_id_col_name]\
.isin(edge_ids)
df = self.__edge_prop_dataframe.loc[df_mask]
else:
df = self.__edge_prop_dataframe

if types is not None:
# FIXME: coerce types to a list-like if not?
df_mask = df[self.type_col_name].isin(types)
df = df.loc[df_mask]

# The "internal" src, dst, edge_id, and type columns are also
# included/added since they are assumed to be needed by the caller.
if columns is None:
# remove the "internal" weight column if one was added
all_columns = list(self.__edge_prop_dataframe.columns)
if self.weight_col_name in all_columns:
all_columns.remove(self.weight_col_name)
return df[all_columns]
else:
# FIXME: invalid columns will result in a KeyError, should a
# check be done here and a more PG-specific error raised?
return df[[self.src_col_name, self.dst_col_name,
self.edge_id_col_name, self.type_col_name]
+ columns]

return None

def select_vertices(self, expr, from_previous_selection=None):
"""
Evaluate expr and return a PropertySelection object representing the
Expand Down Expand Up @@ -862,26 +936,6 @@ def __create_property_lookup_table(self, edge_prop_df):
self.dst_col_name: dst,
self.edge_id_col_name: edge_id})

def __add_edge_ids(self):
"""
Replace nans with unique edge IDs. Edge IDs are simply numbers
incremented by 1 for each edge.
"""
prev_eid = -1 if self.__last_edge_id is None else self.__last_edge_id
nans = self.__edge_prop_dataframe[self.edge_id_col_name].isna()

if nans.any():
indices = nans.index[nans]
num_indices = len(indices)
starting_eid = prev_eid + 1
new_eids = self.__series_type(
range(starting_eid, starting_eid + num_indices))

self.__edge_prop_dataframe[self.edge_id_col_name]\
.iloc[indices] = new_eids

self.__last_edge_id = starting_eid + num_indices - 1

def __get_all_vertices_series(self):
"""
Return a list of all Series objects that contain vertices from all
Expand Down
Loading