Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable expression-based Dask Dataframe support #4325

Merged
merged 27 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b3e0fba
allow dask-expr for debugging
rjzamora Apr 9, 2024
39f5101
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora Apr 24, 2024
4ac1828
avoid importing from core and p2p shuffling
rjzamora Apr 25, 2024
41d42f6
avoid deprecated API
rjzamora Apr 25, 2024
9074cab
adjust test_nodes_functionality to work after dask_expr#1041
rjzamora Apr 25, 2024
0f8e221
add a few workarounds for now
rjzamora Apr 26, 2024
436a080
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 2, 2024
da82bcd
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 2, 2024
a9ae6f2
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 7, 2024
f2d6a25
Merge remote-tracking branch 'upstream/branch-24.06' into debug-dask-…
rjzamora May 8, 2024
43963ad
test hacky workaround
rjzamora May 8, 2024
f9bcbe6
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 10, 2024
bf9dd10
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 13, 2024
c104292
Merge remote-tracking branch 'upstream/branch-24.06' into debug-dask-…
rjzamora May 15, 2024
03bbee5
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 15, 2024
952b224
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 16, 2024
9496171
clean up test_mg_symmetrize
rjzamora May 17, 2024
d807de0
Merge remote-tracking branch 'upstream/branch-24.06' into debug-dask-…
rjzamora May 17, 2024
94c8d79
clean up test_mg_symmetrize
rjzamora May 17, 2024
942a396
Merge branch 'debug-dask-expr' of github.com:rjzamora/cugraph into de…
rjzamora May 17, 2024
36df87d
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 17, 2024
27ecbaf
use 'canonical' dask.dataframe approach for concatnating dataframes a…
rjzamora May 20, 2024
68b798b
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 20, 2024
ec0fcd4
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 21, 2024
acbc219
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 21, 2024
541c8f2
Merge branch 'branch-24.06' into debug-dask-expr
rjzamora May 22, 2024
8dc5804
Update python/cugraph/cugraph/structure/symmetrize.py
rjzamora May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ def generate_rmat_dataset(
del label_df
gc.collect()

dask_label_df = dask_cudf.from_dask_dataframe(dask_label_df)
dask_label_df = dask_label_df.to_backend("cudf")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_dask_dataframe is now deprecated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that dask-expr has some dispatching/plugin support for different DataFrame implementations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: Given the complexity of dask's various dispatching mechanisms, I'm not expecting anything other than "pandas" and "cudf" the ever be implemented - Though it's technically possible.


node_offsets = {"paper": 0}
edge_offsets = {("paper", "cites", "paper"): 0}
Expand Down
4 changes: 0 additions & 4 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@

set -euo pipefail

# TODO: Enable dask query planning (by default) once some bugs are fixed.
# xref: https://github.com/rapidsai/cudf/issues/15027
export DASK_DATAFRAME__QUERY_PLANNING=False

Comment on lines -6 to -9
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AIUI this is one of the OPS relevant changes. Basically removing a workaround that is no longer needed

# Support invoking test_python.sh outside the script directory
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../

Expand Down
4 changes: 0 additions & 4 deletions ci/test_wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@

set -eoxu pipefail

# TODO: Enable dask query planning (by default) once some bugs are fixed.
# xref: https://github.com/rapidsai/cudf/issues/15027
export DASK_DATAFRAME__QUERY_PLANNING=False

Comment on lines -6 to -9
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other one. So same change as before just in another place

package_name=$1
package_dir=$2

Expand Down
7 changes: 6 additions & 1 deletion python/cugraph/cugraph/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -11,6 +11,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dask import config

from .link_analysis.pagerank import pagerank
from .link_analysis.hits import hits
from .traversal.bfs import bfs
Expand All @@ -34,3 +36,6 @@
from .link_prediction.sorensen import sorensen
from .link_prediction.overlap import overlap
from .community.leiden import leiden

# Avoid "p2p" shuffling in dask for now
config.set({"dataframe.shuffle.method": "tasks"})
6 changes: 3 additions & 3 deletions python/cugraph/cugraph/dask/common/input_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,8 +15,8 @@

from collections.abc import Sequence
from collections import OrderedDict
from dask_cudf.core import DataFrame as dcDataFrame
from dask_cudf.core import Series as daskSeries
from dask_cudf import DataFrame as dcDataFrame
from dask_cudf import Series as daskSeries

Comment on lines -18 to +19
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: All imports from dask_cudf.core should be avoided, because these imports are always using "legacy" dask-cudf. Importing from the top-level dask_cudf module are automatically routed to the proper API. There is no way to protect against dask_cudf.core imports yet, because some query-planning logic still needs to find/use specific legacy code.

import cugraph.dask.comms.comms as Comms

Expand Down
4 changes: 2 additions & 2 deletions python/cugraph/cugraph/dask/common/part_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import collections
import dask_cudf
from dask.array.core import Array as daskArray
from dask_cudf.core import DataFrame as daskDataFrame
from dask_cudf.core import Series as daskSeries
from dask_cudf import DataFrame as daskDataFrame
from dask_cudf import Series as daskSeries
from functools import reduce
import cugraph.dask.comms.comms as Comms
from dask.delayed import delayed
Expand Down
4 changes: 2 additions & 2 deletions python/cugraph/cugraph/structure/convert_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def from_edgelist(

Parameters
----------
df : cudf.DataFrame, pandas.DataFrame, dask_cudf.core.DataFrame
df : cudf.DataFrame, pandas.DataFrame, dask_cudf.DataFrame
This DataFrame contains columns storing edge source vertices,
destination (or target following NetworkX's terminology) vertices, and
(optional) weights.
Expand Down Expand Up @@ -95,7 +95,7 @@ def from_edgelist(
renumber=renumber,
)

elif df_type is dask_cudf.core.DataFrame:
elif df_type is dask_cudf.DataFrame:
if create_using is None:
G = Graph()
elif isinstance(create_using, Graph):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,19 +285,20 @@ def __from_edgelist(
symmetrize=not self.properties.directed,
)

# Create a dask_cudf dataframe from the cudf series
# or dataframe objects obtained from symmetrization
if isinstance(source_col, dask_cudf.Series):
# Create a dask_cudf dataframe from the cudf series obtained
# from symmetrization
input_ddf = source_col.to_frame()
input_ddf = input_ddf.rename(columns={source_col.name: source})
input_ddf[destination] = dest_col
frames = [
source_col.to_frame(name=source),
dest_col.to_frame(name=destination),
]
else:
# Multi column dask_cudf dataframe
input_ddf = dask_cudf.concat([source_col, dest_col], axis=1)
frames = [source_col, dest_col]

if value_col is not None:
for vc in value_col_names:
input_ddf[vc] = value_col[vc]
frames.append(value_col[value_col_names])

input_ddf = dask_cudf.concat(frames, axis=1)

self.input_df = input_ddf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ def df_type_id(dataframe_type):
return s + "cudf.DataFrame"
if dataframe_type == pd.DataFrame:
return s + "pandas.DataFrame"
if dataframe_type == dask_cudf.core.DataFrame:
return s + "dask_cudf.core.DataFrame"
if dataframe_type == dask_cudf.DataFrame:
return s + "dask_cudf.DataFrame"
return s + "?"


Expand Down
13 changes: 8 additions & 5 deletions python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,17 @@ def test_mg_symmetrize(dask_client, read_datasets):

# create a dask DataFrame from the dask Series
if isinstance(sym_src, dask_cudf.Series):
ddf2 = sym_src.to_frame()
ddf2 = ddf2.rename(columns={sym_src.name: "src"})
ddf2["dst"] = sym_dst
frames = [
sym_src.to_frame(name="src"),
sym_dst.to_frame(name="dst"),
]
else:
ddf2 = dask_cudf.concat([sym_src, sym_dst], axis=1)
frames = [sym_src, sym_dst]

if val_col_name is not None:
ddf2["weight"] = sym_val
frames.append(sym_val.to_frame(name="weight"))

ddf2 = dask_cudf.concat(frames, axis=1)

compare(ddf, ddf2, src_col_name, dst_col_name, val_col_name)

Expand Down
6 changes: 3 additions & 3 deletions python/cugraph/cugraph/tests/structure/test_graph_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ def test_nodes_functionality(dask_client, input_combo):
expected_nodes = (
dask_cudf.concat([ddf["src"], ddf["dst"]])
.drop_duplicates()
.to_frame()
.sort_values(0)
.to_frame(name="0")
.sort_values("0")
Comment on lines -102 to +103
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another "precaution" (using numerical column names still seems "fragile" in dask)

)

expected_nodes = expected_nodes.compute().reset_index(drop=True)

result_nodes["expected_nodes"] = expected_nodes[0]
result_nodes["expected_nodes"] = expected_nodes["0"]

compare = result_nodes.query("result_nodes != expected_nodes")

Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/cugraph/tests/utils/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def test_reader_dask(dask_client, dataset):
E = dataset.get_dask_edgelist(download=True)

assert E is not None
assert isinstance(E, dask_cudf.core.DataFrame)
assert isinstance(E, dask_cudf.DataFrame)
dataset.unload()


Expand Down
Loading