Skip to content

Commit

Permalink
P2P: raise RuntimeError if pyarrow version is not sufficient (#7578)
Browse files Browse the repository at this point in the history
Co-authored-by: Hendrik Makait <hendrik.makait@gmail.com>
  • Loading branch information
fjetter and hendrikmakait authored Feb 23, 2023
1 parent 790d852 commit 76d0104
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 1 deletion.
2 changes: 2 additions & 0 deletions distributed/shuffle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

from distributed.shuffle._arrow import check_minimal_arrow_version
from distributed.shuffle._scheduler_extension import ShuffleSchedulerExtension
from distributed.shuffle._shuffle import P2PShuffleLayer, rearrange_by_column_p2p
from distributed.shuffle._worker_extension import ShuffleWorkerExtension

__all__ = [
"check_minimal_arrow_version",
"P2PShuffleLayer",
"rearrange_by_column_p2p",
"ShuffleSchedulerExtension",
Expand Down
22 changes: 22 additions & 0 deletions distributed/shuffle/_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from io import BytesIO
from typing import TYPE_CHECKING, BinaryIO

from packaging.version import parse

if TYPE_CHECKING:
import pandas as pd
import pyarrow as pa
Expand All @@ -28,6 +30,26 @@ def check_dtype_support(meta_input: pd.DataFrame) -> None:
raise TypeError("p2p does not support sparse data found in column '{name}'")


def check_minimal_arrow_version() -> None:
"""Verify that the the correct version of pyarrow is installed to support
the P2P extension.
Raises a RuntimeError in case pyarrow is not installed or installed version
is not recent enough.
"""
# First version to introduce Table.sort_by
minversion = "7.0.0"
try:
import pyarrow as pa
except ImportError:
raise RuntimeError(f"P2P shuffling requires pyarrow>={minversion}")

if parse(pa.__version__) < parse(minversion):
raise RuntimeError(
f"P2P shuffling requires pyarrow>={minversion} but only found {pa.__version__}"
)


def dump_shards(shards: list[bytes], file: BinaryIO) -> None:
"""
Write multiple shard tables to the file
Expand Down
3 changes: 2 additions & 1 deletion distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dask.highlevelgraph import HighLevelGraph
from dask.layers import SimpleShuffleLayer

from distributed.shuffle._arrow import check_dtype_support
from distributed.shuffle._arrow import check_dtype_support, check_minimal_arrow_version

logger = logging.getLogger("distributed.shuffle")
if TYPE_CHECKING:
Expand Down Expand Up @@ -138,6 +138,7 @@ def __init__(
parts_out: list | None = None,
annotations: dict | None = None,
):
check_minimal_arrow_version()
annotations = annotations or {}
annotations.update({"shuffle": lambda key: key[1]})
super().__init__(
Expand Down
21 changes: 21 additions & 0 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
from distributed.utils_test import gen_cluster, gen_test, wait_for_state
from distributed.worker_state_machine import TaskState as WorkerTaskState

try:
import pyarrow as pa
except ImportError:
pa = None


async def clean_worker(
worker: Worker, interval: float = 0.01, timeout: int | None = None
Expand Down Expand Up @@ -69,6 +74,22 @@ async def clean_scheduler(
assert not extension.heartbeats


@pytest.mark.skipif(
pa is not None,
reason="We don't have a CI job that is installing a very old pyarrow version",
)
@gen_cluster(client=True)
async def test_minimal_version(c, s, a, b):
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
dtypes={"x": float, "y": float},
freq="10 s",
)
with pytest.raises(RuntimeError, match="requires pyarrow"):
await c.compute(dd.shuffle.shuffle(df, "x", shuffle="p2p"))


@gen_cluster(client=True)
async def test_basic_integration(c, s, a, b):
df = dask.datasets.timeseries(
Expand Down

0 comments on commit 76d0104

Please sign in to comment.