diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index e891f37d14..5cacdb0eef 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -57,16 +57,6 @@ jobs: label: no_queue partition: "not ci1" - # Set dataframe.query-planning: false - - os: ubuntu-latest - environment: "3.10" - label: no_expr - partition: "ci1" - - os: ubuntu-latest - environment: "3.10" - label: no_expr - partition: "not ci1" - # dask.array P2P shuffle - os: ubuntu-latest environment: mindeps @@ -83,12 +73,12 @@ jobs: - os: ubuntu-latest environment: mindeps label: pandas - extra_packages: [numpy=1.24, pandas=2.0, pyarrow=14.0.1] + extra_packages: [numpy=1.24, pandas=2.0, pyarrow=14.0.1, dask-expr] partition: "ci1" - os: ubuntu-latest environment: mindeps label: pandas - extra_packages: [numpy=1.24, pandas=2.0, pyarrow=14.0.1] + extra_packages: [numpy=1.24, pandas=2.0, pyarrow=14.0.1, dask-expr] partition: "not ci1" - os: ubuntu-latest @@ -221,11 +211,6 @@ jobs: if: ${{ matrix.label == 'no_queue' }} run: echo "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=inf" >> $GITHUB_ENV - - name: Set up dask env to disable dask-expr - shell: bash -l {0} - if: ${{ matrix.label == 'no_expr' }} - run: echo "DASK_DATAFRAME__QUERY_PLANNING=False" >> $GITHUB_ENV - - name: Print host info # host_info.py imports numpy, which isn't a direct dependency of distributed if: matrix.environment != 'mindeps' diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml index 404a0539da..8f2b0c69e8 100644 --- a/continuous_integration/environment-3.10.yaml +++ b/continuous_integration/environment-3.10.yaml @@ -19,6 +19,7 @@ dependencies: - ipykernel - ipywidgets - jinja2 + - jupyter_events<0.11 - jupyter-server-proxy - jupyterlab - locket diff --git a/continuous_integration/environment-3.11.yaml b/continuous_integration/environment-3.11.yaml index 25f429be04..d175ca5f12 100644 --- a/continuous_integration/environment-3.11.yaml +++ b/continuous_integration/environment-3.11.yaml @@ -18,6 +18,7 @@ dependencies: - ipykernel - ipywidgets - jinja2 + - jupyter_events<0.11 - jupyter-server-proxy - jupyterlab - locket diff --git a/continuous_integration/environment-3.12.yaml b/continuous_integration/environment-3.12.yaml index 96b89fd7b9..30f2d98ffe 100644 --- a/continuous_integration/environment-3.12.yaml +++ b/continuous_integration/environment-3.12.yaml @@ -18,6 +18,7 @@ dependencies: - ipykernel - ipywidgets - jinja2 + - jupyter_events<0.11 - jupyter-server-proxy - jupyterlab - locket diff --git a/continuous_integration/environment-3.13.yaml b/continuous_integration/environment-3.13.yaml index 27e990b740..a4ea041134 100644 --- a/continuous_integration/environment-3.13.yaml +++ b/continuous_integration/environment-3.13.yaml @@ -17,6 +17,7 @@ dependencies: - ipykernel - ipywidgets - jinja2 + - jupyter_events<0.11 - jupyter-server-proxy - jupyterlab - locket diff --git a/distributed/protocol/tests/test_highlevelgraph.py b/distributed/protocol/tests/test_highlevelgraph.py index ba1f6478a8..b3df069a92 100644 --- a/distributed/protocol/tests/test_highlevelgraph.py +++ b/distributed/protocol/tests/test_highlevelgraph.py @@ -1,7 +1,5 @@ from __future__ import annotations -import contextlib - import pytest np = pytest.importorskip("numpy") @@ -175,11 +173,9 @@ async def test_dataframe_annotations(c, s, a, b): acol = df["a"] bcol = df["b"] - ctx = contextlib.nullcontext() - if dd._dask_expr_enabled(): - ctx = pytest.warns( - UserWarning, match="Annotations will be ignored when using query-planning" - ) + ctx = pytest.warns( + UserWarning, match="Annotations will be ignored when using query-planning" + ) with dask.annotate(retries=retries), ctx: df = acol + bcol @@ -189,7 +185,3 @@ async def test_dataframe_annotations(c, s, a, b): assert rdf.dtypes == np.float64 assert (rdf == 10.0).all() - - if not dd._dask_expr_enabled(): - # There is an annotation match per partition (i.e. task) - assert plugin.retry_matches == df.npartitions diff --git a/distributed/shuffle/__init__.py b/distributed/shuffle/__init__.py index df35588b46..a2169105db 100644 --- a/distributed/shuffle/__init__.py +++ b/distributed/shuffle/__init__.py @@ -1,18 +1,12 @@ from __future__ import annotations from distributed.shuffle._arrow import check_minimal_arrow_version -from distributed.shuffle._merge import HashJoinP2PLayer, hash_join_p2p from distributed.shuffle._rechunk import rechunk_p2p from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin -from distributed.shuffle._shuffle import P2PShuffleLayer, rearrange_by_column_p2p from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin __all__ = [ "check_minimal_arrow_version", - "hash_join_p2p", - "HashJoinP2PLayer", - "P2PShuffleLayer", - "rearrange_by_column_p2p", "rechunk_p2p", "ShuffleSchedulerPlugin", "ShuffleWorkerPlugin", diff --git a/distributed/shuffle/_merge.py b/distributed/shuffle/_merge.py index bf7a532e80..e8e5a8f173 100644 --- a/distributed/shuffle/_merge.py +++ b/distributed/shuffle/_merge.py @@ -1,26 +1,13 @@ # mypy: ignore-errors from __future__ import annotations -from collections.abc import Iterable -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING -import dask -from dask._task_spec import GraphNode, Task, TaskRef -from dask.base import is_dask_collection -from dask.highlevelgraph import HighLevelGraph -from dask.layers import Layer -from dask.tokenize import tokenize +from dask._task_spec import GraphNode from dask.typing import Key -from distributed.shuffle._arrow import check_minimal_arrow_version -from distributed.shuffle._core import ( - P2PBarrierTask, - ShuffleId, - barrier_key, - get_worker_plugin, - p2p_barrier, -) -from distributed.shuffle._shuffle import DataFrameShuffleSpec, shuffle_transfer +from distributed.shuffle._core import ShuffleId, get_worker_plugin +from distributed.shuffle._shuffle import shuffle_transfer if TYPE_CHECKING: import pandas as pd @@ -29,127 +16,8 @@ # TODO import from typing (requires Python >=3.10) from typing_extensions import TypeAlias - from dask.dataframe.core import _Frame - _T_LowLevelGraph: TypeAlias = dict[Key, GraphNode] -_HASH_COLUMN_NAME = "__hash_partition" - - -def _prepare_index_for_partitioning(df: pd.DataFrame, index: IndexLabel): - import pandas as pd - - from dask.dataframe.core import _Frame - - list_like = pd.api.types.is_list_like(index) and not is_dask_collection(index) - - if not isinstance(index, _Frame): - if list_like: - # Make sure we don't try to select with pd.Series/pd.Index - index = list(index) - index = df._select_columns_or_index(index) - elif hasattr(index, "to_frame"): - # If this is an index, we should still convert to a - # DataFrame. Otherwise, the hashed values of a column - # selection will not match (important when merging). - index = index.to_frame() - return index - - -def _calculate_partitions(df: pd.DataFrame, index: IndexLabel, npartitions: int): - index = _prepare_index_for_partitioning(df, index) - from dask.dataframe.shuffle import partitioning_index - - meta = df._meta._constructor_sliced([0]) - # Ensure that we have the same index as before to avoid alignment - # when calculating meta dtypes later on - meta.index = df._meta_nonempty.index[:1] - partitions = index.map_partitions( - partitioning_index, - npartitions=npartitions or df.npartitions, - meta=meta, - transform_divisions=False, - ) - df2 = df.assign(**{_HASH_COLUMN_NAME: partitions}) - df2._meta.index.name = df._meta.index.name - return df2 - - -def hash_join_p2p( - lhs: _Frame, - left_on: IndexLabel | None, - rhs: _Frame, - right_on: IndexLabel | None, - how: MergeHow = "inner", - npartitions: int | None = None, - suffixes: Suffixes = ("_x", "_y"), - indicator: bool = False, -): - from dask.dataframe.core import Index, new_dd_object - - if npartitions is None: - npartitions = max(lhs.npartitions, rhs.npartitions) - - if isinstance(left_on, Index): - _left_on = None - left_index = True - else: - left_index = False - _left_on = left_on - - if isinstance(right_on, Index): - _right_on = None - right_index = True - else: - right_index = False - _right_on = right_on - merge_kwargs = dict( - how=how, - left_on=_left_on, - right_on=_right_on, - left_index=left_index, - right_index=right_index, - suffixes=suffixes, - indicator=indicator, - ) - # dummy result - # Avoid using dummy data for a collection it is empty - _lhs_meta = lhs._meta_nonempty if len(lhs.columns) else lhs._meta - _rhs_meta = rhs._meta_nonempty if len(rhs.columns) else rhs._meta - meta = _lhs_meta.merge(_rhs_meta, **merge_kwargs) - lhs = _calculate_partitions(lhs, left_on, npartitions) - rhs = _calculate_partitions(rhs, right_on, npartitions) - merge_name = "hash-join-" + tokenize(lhs, rhs, **merge_kwargs) - disk: bool = dask.config.get("distributed.p2p.storage.disk") - join_layer = HashJoinP2PLayer( - name=merge_name, - name_input_left=lhs._name, - meta_input_left=lhs._meta, - left_on=_left_on, - n_partitions_left=lhs.npartitions, - name_input_right=rhs._name, - meta_input_right=rhs._meta, - right_on=_right_on, - n_partitions_right=rhs.npartitions, - meta_output=meta, - how=how, - npartitions=npartitions, - suffixes=suffixes, - indicator=indicator, - left_index=left_index, - right_index=right_index, - disk=disk, - ) - graph = HighLevelGraph.from_collections( - merge_name, join_layer, dependencies=[lhs, rhs] - ) - return new_dd_object(graph, merge_name, meta, [None] * (npartitions + 1)) - - -hash_join = hash_join_p2p - -_HASH_COLUMN_NAME = "__hash_partition" - def merge_transfer( input: pd.DataFrame, @@ -196,276 +64,3 @@ def merge_unpack( right_index=right_index, indicator=indicator, ) - - -class HashJoinP2PLayer(Layer): - name: str - npartitions: int - how: MergeHow - suffixes: Suffixes - indicator: bool - meta_output: pd.DataFrame - parts_out: set[int] - - name_input_left: str - meta_input_left: pd.DataFrame - n_partitions_left: int - left_on: IndexLabel | None - left_index: bool - - name_input_right: str - meta_input_right: pd.DataFrame - n_partitions_right: int - right_on: IndexLabel | None - right_index: bool - - def __init__( - self, - name: str, - name_input_left: str, - meta_input_left: pd.DataFrame, - left_on: IndexLabel | None, - n_partitions_left: int, - n_partitions_right: int, - name_input_right: str, - meta_input_right: pd.DataFrame, - right_on: IndexLabel | None, - meta_output: pd.DataFrame, - left_index: bool, - right_index: bool, - npartitions: int, - disk: bool, - how: MergeHow = "inner", - suffixes: Suffixes = ("_x", "_y"), - indicator: bool = False, - parts_out: Iterable[int] | None = None, - annotations: dict | None = None, - ) -> None: - check_minimal_arrow_version() - self.name = name - self.name_input_left = name_input_left - self.meta_input_left = meta_input_left - self.left_on = left_on - self.name_input_right = name_input_right - self.meta_input_right = meta_input_right - self.right_on = right_on - self.how = how - self.npartitions = npartitions - self.suffixes = suffixes - self.indicator = indicator - self.meta_output = meta_output - if parts_out: - self.parts_out = set(parts_out) - else: - self.parts_out = set(range(npartitions)) - self.n_partitions_left = n_partitions_left - self.n_partitions_right = n_partitions_right - self.left_index = left_index - self.right_index = right_index - self.disk = disk - super().__init__(annotations=annotations) - - def _cull_dependencies( - self, keys: Iterable[str], parts_out: Iterable[str] | None = None - ): - """Determine the necessary dependencies to produce `keys`. - - For a simple shuffle, output partitions always depend on - all input partitions. This method does not require graph - materialization. - """ - deps = {} - parts_out = parts_out or self._keys_to_parts(keys) - keys = {(self.name_input_left, i) for i in range(self.n_partitions_left)} - keys |= {(self.name_input_right, i) for i in range(self.n_partitions_right)} - # Protect against mutations later on with frozenset - keys = frozenset(keys) - for part in parts_out: - deps[(self.name, part)] = keys - return deps - - def _keys_to_parts(self, keys: Iterable[str]) -> set[str]: - """Simple utility to convert keys to partition indices.""" - parts = set() - for key in keys: - try: - _name, _part = key - except ValueError: - continue - if _name != self.name: - continue - parts.add(_part) - return parts - - def get_output_keys(self): - return {(self.name, part) for part in self.parts_out} - - def __repr__(self): - return f"HashJoin" - - def is_materialized(self): - return hasattr(self, "_cached_dict") - - def __getitem__(self, key): - return self._dict[key] - - def __iter__(self): - return iter(self._dict) - - def __len__(self): - return len(self._dict) - - @property - def _dict(self): - """Materialize full dict representation""" - if hasattr(self, "_cached_dict"): - return self._cached_dict - else: - dsk = self._construct_graph() - self._cached_dict = dsk - return self._cached_dict - - def _cull(self, parts_out: Iterable[int]): - return HashJoinP2PLayer( - name=self.name, - name_input_left=self.name_input_left, - meta_input_left=self.meta_input_left, - left_on=self.left_on, - name_input_right=self.name_input_right, - meta_input_right=self.meta_input_right, - right_on=self.right_on, - how=self.how, - npartitions=self.npartitions, - suffixes=self.suffixes, - indicator=self.indicator, - meta_output=self.meta_output, - parts_out=parts_out, - left_index=self.left_index, - right_index=self.right_index, - disk=self.disk, - annotations=self.annotations, - n_partitions_left=self.n_partitions_left, - n_partitions_right=self.n_partitions_right, - ) - - def cull(self, keys: Iterable[str], all_keys: Any) -> tuple[HashJoinP2PLayer, dict]: - """Cull a SimpleShuffleLayer HighLevelGraph layer. - - The underlying graph will only include the necessary - tasks to produce the keys (indices) included in `parts_out`. - Therefore, "culling" the layer only requires us to reset this - parameter. - """ - parts_out = self._keys_to_parts(keys) - - culled_deps = self._cull_dependencies(keys, parts_out=parts_out) - if parts_out != set(self.parts_out): - culled_layer = self._cull(parts_out) - return culled_layer, culled_deps - else: - return self, culled_deps - - def _construct_graph(self) -> _T_LowLevelGraph: - token_left = tokenize( - # Include self.name to ensure that shuffle IDs are unique for individual - # merge operations. Reusing shuffles between merges is dangerous because of - # required coordination and complexity introduced through dynamic clusters. - self.name, - self.name_input_left, - self.left_on, - self.left_index, - ) - shuffle_id_left = ShuffleId(token_left) - token_right = tokenize( - # Include self.name to ensure that shuffle IDs are unique for individual - # merge operations. Reusing shuffles between merges is dangerous because of - # required coordination and complexity introduced through dynamic clusters. - self.name, - self.name_input_right, - self.right_on, - self.right_index, - ) - shuffle_id_right = ShuffleId(token_right) - dsk: _T_LowLevelGraph = {} - name_left = "hash-join-transfer-" + token_left - name_right = "hash-join-transfer-" + token_right - transfer_keys_left = list() - for i in range(self.n_partitions_left): - t = Task( - (name_left, i), - merge_transfer, - TaskRef((self.name_input_left, i)), - shuffle_id_left, - i, - ) - dsk[t.key] = t - transfer_keys_left.append(t.ref()) - - transfer_keys_right = list() - for i in range(self.n_partitions_right): - t = Task( - (name_right, i), - merge_transfer, - TaskRef((self.name_input_right, i)), - shuffle_id_right, - i, - ) - dsk[t.key] = t - transfer_keys_right.append(t.ref()) - - _barrier_key_left = barrier_key(shuffle_id_left) - barrier_left = P2PBarrierTask( - _barrier_key_left, - p2p_barrier, - token_left, - *transfer_keys_left, - spec=DataFrameShuffleSpec( - id=shuffle_id_left, - npartitions=self.npartitions, - column=_HASH_COLUMN_NAME, - meta=self.meta_input_left, - parts_out=self.parts_out, - disk=self.disk, - drop_column=True, - ), - ) - dsk[barrier_left.key] = barrier_left - _barrier_key_right = barrier_key(shuffle_id_right) - barrier_right = P2PBarrierTask( - _barrier_key_right, - p2p_barrier, - token_right, - *transfer_keys_right, - spec=DataFrameShuffleSpec( - id=shuffle_id_right, - npartitions=self.npartitions, - column=_HASH_COLUMN_NAME, - meta=self.meta_input_right, - parts_out=self.parts_out, - disk=self.disk, - drop_column=True, - ), - ) - dsk[barrier_right.key] = barrier_right - - name = self.name - for part_out in self.parts_out: - t = Task( - (name, part_out), - merge_unpack, - token_left, - token_right, - part_out, - barrier_left.ref(), - barrier_right.ref(), - self.how, - self.left_on, - self.right_on, - self.meta_output, - self.suffixes, - self.left_index, - self.right_index, - self.indicator, - ) - dsk[t.key] = t - return dsk diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index 8b597f79e4..cd47d49508 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -753,7 +753,7 @@ def partial_concatenate( ) rec_cat_arg[old_partial_index] = t.ref() else: - rec_cat_arg[old_partial_index] = TaskRef((input_name,) + old_global_index) + rec_cat_arg[old_partial_index] = TaskRef((input_name,) + old_global_index) # type: ignore[call-overload] concat_task = Task( (rechunk_name(token),) + global_new_index, diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 6f78a65042..e936b9d7c6 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -3,15 +3,7 @@ import logging import os from collections import defaultdict -from collections.abc import ( - Callable, - Collection, - Generator, - Hashable, - Iterable, - Iterator, - Sequence, -) +from collections.abc import Callable, Generator, Hashable, Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from pathlib import Path @@ -20,11 +12,7 @@ import toolz from tornado.ioloop import IOLoop -import dask -from dask._task_spec import GraphNode, Task, TaskRef -from dask.highlevelgraph import HighLevelGraph -from dask.layers import Layer -from dask.tokenize import tokenize +from dask._task_spec import GraphNode from dask.typing import Key from dask.utils import is_dataframe_like @@ -32,8 +20,6 @@ from distributed.metrics import context_meter from distributed.shuffle._arrow import ( buffers_to_table, - check_dtype_support, - check_minimal_arrow_version, convert_shards, deserialize_table, read_from_disk, @@ -41,15 +27,12 @@ ) from distributed.shuffle._core import ( NDIndex, - P2PBarrierTask, ShuffleId, ShuffleRun, ShuffleSpec, - barrier_key, get_worker_plugin, handle_transfer_errors, handle_unpack_errors, - p2p_barrier, ) from distributed.shuffle._exceptions import DataUnavailable from distributed.shuffle._limiter import ResourceLimiter @@ -64,8 +47,6 @@ # TODO import from typing (requires Python >=3.10) from typing_extensions import TypeAlias - from dask.dataframe import DataFrame - def shuffle_transfer( input: pd.DataFrame, @@ -89,218 +70,9 @@ def shuffle_unpack( ) -def rearrange_by_column_p2p( - df: DataFrame, - column: str, - npartitions: int | None = None, -) -> DataFrame: - import pandas as pd - - from dask.dataframe.core import new_dd_object - - meta = df._meta - if not pd.api.types.is_integer_dtype(meta[column].dtype): - raise TypeError( - f"Expected meta {column=} to be an integer column, is {meta[column].dtype}." - ) - check_dtype_support(meta) - npartitions = npartitions or df.npartitions - token = tokenize(df, column, npartitions) - - if any(not isinstance(c, str) for c in meta.columns): - unsupported = {c: type(c) for c in meta.columns if not isinstance(c, str)} - raise TypeError( - f"p2p requires all column names to be str, found: {unsupported}", - ) - - name = f"shuffle_p2p-{token}" - disk: bool = dask.config.get("distributed.p2p.storage.disk") - - layer = P2PShuffleLayer( - name, - column, - npartitions, - npartitions_input=df.npartitions, - name_input=df._name, - meta_input=meta, - disk=disk, - ) - return new_dd_object( - HighLevelGraph.from_collections(name, layer, [df]), - name, - meta, - [None] * (npartitions + 1), - ) - - _T_LowLevelGraph: TypeAlias = dict[Key, GraphNode] -class P2PShuffleLayer(Layer): - name: str - column: str - npartitions: int - npartitions_input: int - name_input: str - meta_input: pd.DataFrame - disk: bool - parts_out: tuple[int, ...] - drop_column: bool - - def __init__( - self, - name: str, - column: str, - npartitions: int, - npartitions_input: int, - name_input: str, - meta_input: pd.DataFrame, - disk: bool, - parts_out: Iterable[int] | None = None, - annotations: dict | None = None, - drop_column: bool = False, - ): - check_minimal_arrow_version() - self.name = name - self.column = column - self.npartitions = npartitions - self.name_input = name_input - self.meta_input = meta_input - self.disk = disk - if parts_out: - self.parts_out = tuple(parts_out) - else: - self.parts_out = tuple(range(self.npartitions)) - self.npartitions_input = npartitions_input - self.drop_column = drop_column - super().__init__(annotations=annotations) - - def __repr__(self) -> str: - return ( - f"{type(self).__name__}" - ) - - def get_output_keys(self) -> set[Key]: - return {(self.name, part) for part in self.parts_out} - - def is_materialized(self) -> bool: - return hasattr(self, "_cached_dict") - - @property - def _dict(self) -> _T_LowLevelGraph: - """Materialize full dict representation""" - self._cached_dict: _T_LowLevelGraph - dsk: _T_LowLevelGraph - if hasattr(self, "_cached_dict"): - return self._cached_dict - else: - dsk = self._construct_graph() - self._cached_dict = dsk - return self._cached_dict - - def __getitem__(self, key: Key) -> GraphNode: - return self._dict[key] - - def __iter__(self) -> Iterator[Key]: - return iter(self._dict) - - def __len__(self) -> int: - return len(self._dict) - - def _cull(self, parts_out: Iterable[int]) -> P2PShuffleLayer: - return P2PShuffleLayer( - self.name, - self.column, - self.npartitions, - self.npartitions_input, - self.name_input, - self.meta_input, - self.disk, - parts_out=parts_out, - ) - - def _keys_to_parts(self, keys: Iterable[Key]) -> set[int]: - """Simple utility to convert keys to partition indices.""" - parts = set() - for key in keys: - if isinstance(key, tuple) and len(key) == 2: - name, part = key - if name == self.name: - assert isinstance(part, int) - parts.add(part) - return parts - - def cull( - self, keys: set[Key], all_keys: Collection[Key] - ) -> tuple[P2PShuffleLayer, dict]: - """Cull a P2PShuffleLayer HighLevelGraph layer. - - The underlying graph will only include the necessary - tasks to produce the keys (indices) included in `parts_out`. - Therefore, "culling" the layer only requires us to reset this - parameter. - """ - parts_out = self._keys_to_parts(keys) - # Protect against mutations later on with frozenset - input_parts = frozenset( - {(self.name_input, i) for i in range(self.npartitions_input)} - ) - culled_deps = {(self.name, part): input_parts for part in parts_out} - - if parts_out != set(self.parts_out): - culled_layer = self._cull(parts_out) - return culled_layer, culled_deps - else: - return self, culled_deps - - def _construct_graph(self) -> _T_LowLevelGraph: - token = tokenize(self.name_input, self.column, self.npartitions, self.parts_out) - shuffle_id = ShuffleId(token) - dsk: _T_LowLevelGraph = {} - _barrier_key = barrier_key(shuffle_id) - name = "shuffle-transfer-" + token - transfer_keys = list() - for i in range(self.npartitions_input): - t = Task( - (name, i), - shuffle_transfer, - TaskRef((self.name_input, i)), - token, - i, - ) - dsk[t.key] = t - transfer_keys.append(t.ref()) - - barrier = P2PBarrierTask( - _barrier_key, - p2p_barrier, - token, - *transfer_keys, - spec=DataFrameShuffleSpec( - id=shuffle_id, - npartitions=self.npartitions, - column=self.column, - meta=self.meta_input, - parts_out=self.parts_out, - disk=self.disk, - drop_column=self.drop_column, - ), - ) - dsk[barrier.key] = barrier - - name = self.name - for part_out in self.parts_out: - t = Task( - (name, part_out), - shuffle_unpack, - token, - part_out, - barrier.ref(), - ) - dsk[t.key] = t - return dsk - - def split_by_worker( df: pd.DataFrame, column: str, diff --git a/distributed/shuffle/tests/test_graph.py b/distributed/shuffle/tests/test_graph.py index 33c20b01f8..23603c9f90 100644 --- a/distributed/shuffle/tests/test_graph.py +++ b/distributed/shuffle/tests/test_graph.py @@ -9,24 +9,10 @@ pytest.importorskip("pyarrow") import dask -from dask.blockwise import Blockwise -from dask.utils_test import hlg_layer_topological from distributed.utils_test import gen_cluster -@pytest.mark.skipif(condition=dd._dask_expr_enabled(), reason="no HLG") -def test_basic(client): - df = dd.demo.make_timeseries(freq="15D", partition_freq="30D") - df["name"] = df["name"].astype("string[python]") - with dask.config.set({"dataframe.shuffle.method": "p2p"}): - p2p_shuffled = df.shuffle("id") - - (opt,) = dask.optimize(p2p_shuffled) - assert isinstance(hlg_layer_topological(opt.dask, 0), Blockwise) - # blockwise -> barrier -> unpack -> drop_by_shallow_copy - - @pytest.mark.parametrize("dtype", ["csingle", "cdouble", "clongdouble"]) def test_raise_on_complex_numbers(dtype): df = dd.from_pandas( diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py index f491ba7edd..92d1152bf8 100644 --- a/distributed/shuffle/tests/test_merge.py +++ b/distributed/shuffle/tests/test_merge.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import contextlib from typing import Any from unittest import mock @@ -46,28 +45,6 @@ async def list_eq(a, b): dd._compat.assert_numpy_array_equal(av, bv) -@pytest.mark.skipif(dd._dask_expr_enabled(), reason="pyarrow>=7.0.0 already required") -@gen_cluster(client=True) -async def test_minimal_version(c, s, a, b): - no_pyarrow_ctx = ( - mock.patch.dict("sys.modules", {"pyarrow": None}) - if pa is not None - else contextlib.nullcontext() - ) - with no_pyarrow_ctx: - A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]}) - a = dd.repartition(A, [0, 4, 5]) - - B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]}) - b = dd.repartition(B, [0, 2, 5]) - - with ( - pytest.raises(ModuleNotFoundError, match="requires pyarrow"), - dask.config.set({"dataframe.shuffle.method": "p2p"}), - ): - await c.compute(dd.merge(a, b, left_on="x", right_on="z")) - - @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"]) @gen_cluster(client=True) async def test_basic_merge(c, s, a, b, how): @@ -79,13 +56,10 @@ async def test_basic_merge(c, s, a, b, how): joined = a.merge(b, left_on="y", right_on="y", how=how) - if dd._dask_expr_enabled(): - # Ensure we're using a hash join - from dask_expr._merge import HashJoinP2P + # Ensure we're using a hash join + from dask_expr._merge import HashJoinP2P - assert any( - isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk() - ) + assert any(isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()) expected = pd.merge(A, B, how, "y") await list_eq(joined, expected) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 701d18bf43..18dc1f1887 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -124,28 +124,6 @@ async def assert_scheduler_cleanup( assert not plugin.heartbeats -@pytest.mark.skipif(dd._dask_expr_enabled(), reason="pyarrow>=7.0.0 already required") -@gen_cluster(client=True) -async def test_minimal_version(c, s, a, b): - no_pyarrow_ctx = ( - mock.patch.dict("sys.modules", {"pyarrow": None}) - if pa is not None - else contextlib.nullcontext() - ) - with no_pyarrow_ctx: - df = dask.datasets.timeseries( - start="2000-01-01", - end="2000-01-10", - dtypes={"x": float, "y": float}, - freq="10 s", - ) - with ( - pytest.raises(ModuleNotFoundError, match="requires pyarrow"), - dask.config.set({"dataframe.shuffle.method": "p2p"}), - ): - await c.compute(df.shuffle("x")) - - @pytest.mark.gpu @pytest.mark.filterwarnings( "ignore:Ignoring the following arguments to `from_pyarrow_table_dispatch`." @@ -1659,9 +1637,7 @@ async def test_multi(c, s, a, b): await assert_scheduler_cleanup(s) -@pytest.mark.skipif( - dd._dask_expr_enabled(), reason="worker restrictions are not supported in dask-expr" -) +@pytest.mark.skipif(reason="worker restrictions are not supported in dask-expr") @gen_cluster(client=True) async def test_restrictions(c, s, a, b): df = dask.datasets.timeseries( diff --git a/distributed/shuffle/tests/utils.py b/distributed/shuffle/tests/utils.py index 693a044373..f0d7abac0a 100644 --- a/distributed/shuffle/tests/utils.py +++ b/distributed/shuffle/tests/utils.py @@ -6,14 +6,7 @@ from distributed.core import PooledRPCCall from distributed.shuffle._core import ShuffleId, ShuffleRun -UNPACK_PREFIX = "shuffle_p2p" -try: - import dask.dataframe as dd - - if dd._dask_expr_enabled(): - UNPACK_PREFIX = "p2pshuffle" -except ImportError: - pass +UNPACK_PREFIX = "p2pshuffle" class PooledRPCShuffle(PooledRPCCall): diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 30d148b5d4..413bbf1ed8 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -6733,7 +6733,7 @@ def test_futures_in_subgraphs(loop_in_thread): ddf = ddf[ddf.uid.isin(range(29))].persist() ddf["local_time"] = ddf.enter_time.dt.tz_convert("US/Central") ddf["day"] = ddf.enter_time.dt.day_name() - ddf = dd.categorical.categorize(ddf, columns=["day"], index=False) + ddf = ddf.categorize(columns=["day"], index=False) ddf.compute()