From 06a70aaf23d3c2858496e6a21a37a7b46f3a8498 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 24 Jun 2021 02:39:24 -0500 Subject: [PATCH] Deprecate utilities which have moved to dask (#4966) --- distributed/batched.py | 2 +- distributed/cfexecutor.py | 4 +- distributed/client.py | 2 +- distributed/comm/core.py | 3 +- distributed/comm/tcp.py | 3 +- distributed/comm/ucx.py | 3 +- distributed/core.py | 2 +- distributed/deploy/adaptive.py | 3 +- distributed/deploy/adaptive_core.py | 3 +- distributed/deploy/cluster.py | 12 +-- distributed/deploy/spec.py | 10 +-- distributed/diagnostics/progressbar.py | 6 +- distributed/event.py | 4 +- distributed/lock.py | 4 +- distributed/multi_lock.py | 4 +- distributed/nanny.py | 2 +- distributed/profile.py | 4 +- distributed/pubsub.py | 4 +- distributed/queues.py | 4 +- distributed/scheduler.py | 5 +- distributed/semaphore.py | 3 +- distributed/stealing.py | 3 +- distributed/tests/test_utils.py | 100 +++++++++++-------------- distributed/utils.py | 44 +++++++---- distributed/variable.py | 4 +- distributed/worker.py | 4 +- distributed/worker_client.py | 3 +- 27 files changed, 122 insertions(+), 123 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index b932f1b24c..4e59a09e80 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -5,9 +5,9 @@ from tornado.ioloop import IOLoop import dask +from dask.utils import parse_timedelta from .core import CommClosedError -from .utils import parse_timedelta logger = logging.getLogger(__name__) diff --git a/distributed/cfexecutor.py b/distributed/cfexecutor.py index f54db42b9a..8028a4bc7f 100644 --- a/distributed/cfexecutor.py +++ b/distributed/cfexecutor.py @@ -4,8 +4,10 @@ from tlz import merge from tornado import gen +from dask.utils import parse_timedelta + from .metrics import time -from .utils import TimeoutError, parse_timedelta, sync +from .utils import TimeoutError, sync @gen.coroutine diff --git a/distributed/client.py b/distributed/client.py index 4a1090516a..51047dc34c 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -36,6 +36,7 @@ ensure_dict, format_bytes, funcname, + parse_timedelta, stringify, ) @@ -78,7 +79,6 @@ key_split, log_errors, no_default, - parse_timedelta, sync, thread_state, ) diff --git a/distributed/comm/core.py b/distributed/comm/core.py index b9032a7369..ccdcbb99c2 100644 --- a/distributed/comm/core.py +++ b/distributed/comm/core.py @@ -8,11 +8,12 @@ from contextlib import suppress import dask +from dask.utils import parse_timedelta from ..metrics import time from ..protocol import pickle from ..protocol.compression import get_default_compression -from ..utils import TimeoutError, parse_timedelta +from ..utils import TimeoutError from . import registry from .addressing import parse_address diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 258f4e88a5..91e6af308e 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -20,11 +20,12 @@ from tornado.tcpserver import TCPServer import dask +from dask.utils import parse_timedelta from ..protocol.utils import pack_frames_prelude, unpack_frames from ..system import MEMORY_LIMIT from ..threadpoolexecutor import ThreadPoolExecutor -from ..utils import ensure_ip, get_ip, get_ipv6, nbytes, parse_timedelta +from ..utils import ensure_ip, get_ip, get_ipv6, nbytes from .addressing import parse_host_port, unparse_host_port from .core import Comm, CommClosedError, Connector, FatalCommClosedError, Listener from .registry import Backend, backends diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index de349d1f7e..647ed8313a 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -10,8 +10,9 @@ import weakref import dask +from dask.utils import parse_bytes -from ..utils import ensure_ip, get_ip, get_ipv6, log_errors, nbytes, parse_bytes +from ..utils import ensure_ip, get_ip, get_ipv6, log_errors, nbytes from .addressing import parse_host_port, unparse_host_port from .core import Comm, CommClosedError, Connector, Listener from .registry import Backend, backends diff --git a/distributed/core.py b/distributed/core.py index 227c42610f..8369bab1f5 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -18,6 +18,7 @@ from tornado.ioloop import IOLoop, PeriodicCallback import dask +from dask.utils import parse_timedelta from . import profile, protocol from .comm import ( @@ -36,7 +37,6 @@ get_traceback, has_keyword, is_coroutine_function, - parse_timedelta, truncate_exception, ) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index fa68ad4e93..8ffb8555d8 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -2,9 +2,10 @@ from inspect import isawaitable import dask.config +from dask.utils import parse_timedelta from ..protocol import pickle -from ..utils import log_errors, parse_timedelta +from ..utils import log_errors from .adaptive_core import AdaptiveCore logger = logging.getLogger(__name__) diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index db8ef77e9a..b077261dc1 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -6,8 +6,9 @@ import tlz as toolz from tornado.ioloop import IOLoop, PeriodicCallback +from dask.utils import parse_timedelta + from ..metrics import time -from ..utils import parse_timedelta logger = logging.getLogger(__name__) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index b00886ac96..37ddc31f39 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -9,19 +9,11 @@ from tornado.ioloop import PeriodicCallback import dask.config -from dask.utils import _deprecated, format_bytes +from dask.utils import _deprecated, format_bytes, parse_timedelta from ..core import Status from ..objects import SchedulerInfo -from ..utils import ( - Log, - Logs, - format_dashboard_link, - log_errors, - parse_timedelta, - sync, - thread_state, -) +from ..utils import Log, Logs, format_dashboard_link, log_errors, sync, thread_state from .adaptive import Adaptive logger = logging.getLogger(__name__) diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 070d6d0624..9c2f129264 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -11,18 +11,12 @@ from tornado import gen import dask +from dask.utils import parse_bytes, parse_timedelta from ..core import CommClosedError, Status, rpc from ..scheduler import Scheduler from ..security import Security -from ..utils import ( - LoopRunner, - TimeoutError, - import_term, - parse_bytes, - parse_timedelta, - silence_logging, -) +from ..utils import LoopRunner, TimeoutError, import_term, silence_logging from .adaptive import Adaptive from .cluster import Cluster diff --git a/distributed/diagnostics/progressbar.py b/distributed/diagnostics/progressbar.py index 1a05438ccc..a85b72d4b0 100644 --- a/distributed/diagnostics/progressbar.py +++ b/distributed/diagnostics/progressbar.py @@ -8,10 +8,12 @@ from tlz import valmap from tornado.ioloop import IOLoop +import dask + from ..client import default_client, futures_of from ..core import CommClosedError, coerce_to_address, connect from ..protocol.pickle import dumps -from ..utils import LoopRunner, is_kernel, key_split, parse_timedelta +from ..utils import LoopRunner, is_kernel, key_split from .progress import MultiProgress, Progress, format_time logger = logging.getLogger(__name__) @@ -34,7 +36,7 @@ def __init__(self, keys, scheduler=None, interval="100ms", complete=True): break self.keys = {k.key if hasattr(k, "key") else k for k in keys} - self.interval = parse_timedelta(interval, default="s") + self.interval = dask.utils.parse_timedelta(interval, default="s") self.complete = complete self._start_time = default_timer() diff --git a/distributed/event.py b/distributed/event.py index 40c1c5e3da..882281692c 100644 --- a/distributed/event.py +++ b/distributed/event.py @@ -4,8 +4,10 @@ from collections import defaultdict from contextlib import suppress +from dask.utils import parse_timedelta + from .client import Client -from .utils import TimeoutError, log_errors, parse_timedelta +from .utils import TimeoutError, log_errors from .worker import get_worker logger = logging.getLogger(__name__) diff --git a/distributed/lock.py b/distributed/lock.py index 64e3f29c22..d8b50ac8d0 100644 --- a/distributed/lock.py +++ b/distributed/lock.py @@ -3,8 +3,10 @@ import uuid from collections import defaultdict, deque +from dask.utils import parse_timedelta + from .client import Client -from .utils import TimeoutError, log_errors, parse_timedelta +from .utils import TimeoutError, log_errors from .worker import get_worker logger = logging.getLogger(__name__) diff --git a/distributed/multi_lock.py b/distributed/multi_lock.py index 3eda32e0be..6875229618 100644 --- a/distributed/multi_lock.py +++ b/distributed/multi_lock.py @@ -4,8 +4,10 @@ from collections import defaultdict from typing import Hashable, List +from dask.utils import parse_timedelta + from .client import Client -from .utils import TimeoutError, log_errors, parse_timedelta +from .utils import TimeoutError, log_errors from .worker import get_worker logger = logging.getLogger(__name__) diff --git a/distributed/nanny.py b/distributed/nanny.py index bf51a5c969..bcff9eee69 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -17,6 +17,7 @@ import dask from dask.system import CPU_COUNT +from dask.utils import parse_timedelta from . import preloading from .comm import get_address_host, unparse_host_port @@ -32,7 +33,6 @@ json_load_robust, mp_context, parse_ports, - parse_timedelta, silence_logging, ) from .worker import Worker, parse_memory_limit, run diff --git a/distributed/profile.py b/distributed/profile.py index 9c375291d3..160fe5a7b6 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -33,8 +33,10 @@ import tlz as toolz +from dask.utils import format_time, parse_timedelta + from .metrics import time -from .utils import color_of, format_time, parse_timedelta +from .utils import color_of def identifier(frame): diff --git a/distributed/pubsub.py b/distributed/pubsub.py index 99c528eb5e..91b006423b 100644 --- a/distributed/pubsub.py +++ b/distributed/pubsub.py @@ -4,10 +4,12 @@ import weakref from collections import defaultdict, deque +from dask.utils import parse_timedelta + from .core import CommClosedError from .metrics import time from .protocol.serialize import to_serialize -from .utils import TimeoutError, parse_timedelta, sync +from .utils import TimeoutError, sync logger = logging.getLogger(__name__) diff --git a/distributed/queues.py b/distributed/queues.py index 3dd0361c66..481f497373 100644 --- a/distributed/queues.py +++ b/distributed/queues.py @@ -3,10 +3,10 @@ import uuid from collections import defaultdict -from dask.utils import stringify +from dask.utils import parse_timedelta, stringify from .client import Client, Future -from .utils import parse_timedelta, sync, thread_state +from .utils import sync, thread_state from .worker import get_client, get_worker logger = logging.getLogger(__name__) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7dc2f8d0c3..b9c2e480c8 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -38,6 +38,7 @@ import dask from dask.highlevelgraph import HighLevelGraph +from dask.utils import format_bytes, format_time, parse_bytes, parse_timedelta from . import preloading, profile from . import versions as version_module @@ -70,15 +71,11 @@ All, TimeoutError, empty_context, - format_bytes, - format_time, get_fileno_limit, key_split, key_split_group, log_errors, no_default, - parse_bytes, - parse_timedelta, tmpfile, validate_key, ) diff --git a/distributed/semaphore.py b/distributed/semaphore.py index 2de45c42f3..48aab78b38 100644 --- a/distributed/semaphore.py +++ b/distributed/semaphore.py @@ -8,11 +8,12 @@ from tornado.ioloop import IOLoop, PeriodicCallback import dask +from dask.utils import parse_timedelta from distributed.utils_comm import retry_operation from .metrics import time -from .utils import log_errors, parse_timedelta, sync, thread_state +from .utils import log_errors, sync, thread_state from .worker import get_client, get_worker logger = logging.getLogger(__name__) diff --git a/distributed/stealing.py b/distributed/stealing.py index b352040bff..1929661abc 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -7,11 +7,12 @@ from tornado.ioloop import PeriodicCallback import dask +from dask.utils import parse_timedelta from .comm.addressing import get_address_host from .core import CommClosedError from .diagnostics.plugin import SchedulerPlugin -from .utils import log_errors, parse_timedelta +from .utils import log_errors LATENCY = 10e-3 diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index fa384cd845..6f94ca3d50 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -1,13 +1,11 @@ import array import asyncio -import datetime import io import os import queue import socket import sys import traceback -from functools import partial from time import sleep import pytest @@ -27,7 +25,6 @@ ensure_bytes, ensure_ip, format_dashboard_link, - funcname, get_ip_interface, get_traceback, is_kernel, @@ -35,9 +32,7 @@ nbytes, offload, open_port, - parse_bytes, parse_ports, - parse_timedelta, read_block, seek_delimiter, set_thread_state, @@ -250,15 +245,6 @@ def test_seek_delimiter_endline(): assert f.tell() == 7 -def test_funcname(): - def f(): - pass - - assert funcname(f) == "f" - assert funcname(partial(f)) == "f" - assert funcname(partial(partial(f))) == "f" - - def test_ensure_bytes(): data = [b"1", "1", memoryview(b"1"), bytearray(b"1"), array.array("b", [49])] for d in data: @@ -471,45 +457,6 @@ async def test_loop_runner_gen(): await asyncio.sleep(0.01) -def test_parse_bytes(): - assert parse_bytes("100") == 100 - assert parse_bytes("100 MB") == 100000000 - assert parse_bytes("100M") == 100000000 - assert parse_bytes("5kB") == 5000 - assert parse_bytes("5.4 kB") == 5400 - assert parse_bytes("1kiB") == 1024 - assert parse_bytes("1Mi") == 2 ** 20 - assert parse_bytes("1e6") == 1000000 - assert parse_bytes("1e6 kB") == 1000000000 - assert parse_bytes("MB") == 1000000 - - -def test_parse_timedelta(): - for text, value in [ - ("1s", 1), - ("100ms", 0.1), - ("5S", 5), - ("5.5s", 5.5), - ("5.5 s", 5.5), - ("1 second", 1), - ("3.3 seconds", 3.3), - ("3.3 milliseconds", 0.0033), - ("3500 us", 0.0035), - ("1 ns", 1e-9), - ("2m", 120), - ("2 minutes", 120), - (datetime.timedelta(seconds=2), 2), - (datetime.timedelta(milliseconds=100), 0.1), - ]: - result = parse_timedelta(text) - assert abs(result - value) < 1e-14 - - assert parse_timedelta("1ms", default="seconds") == 0.001 - assert parse_timedelta("1", default="seconds") == 1 - assert parse_timedelta("1", default="ms") == 0.001 - assert parse_timedelta(1, default="ms") == 0.001 - - @gen_test() async def test_all_exceptions_logging(): async def throws(): @@ -543,11 +490,6 @@ def test_warn_on_duration(): assert any("foo" in str(rec.message) for rec in record) -def test_format_bytes_compat(): - # moved to dask, but exported here for compatibility - from distributed.utils import format_bytes # noqa - - def test_logs(): log = Log("Hello") assert isinstance(log, str) @@ -611,3 +553,45 @@ def test_lru(): async def test_offload(): assert (await offload(inc, 1)) == 2 assert (await offload(lambda x, y: x + y, 1, y=2)) == 3 + + +def test_serialize_for_cli_deprecated(): + with pytest.warns(FutureWarning, match="serialize_for_cli is deprecated"): + from distributed.utils import serialize_for_cli + assert serialize_for_cli is dask.config.serialize + + +def test_deserialize_for_cli_deprecated(): + with pytest.warns(FutureWarning, match="deserialize_for_cli is deprecated"): + from distributed.utils import deserialize_for_cli + assert deserialize_for_cli is dask.config.deserialize + + +def test_parse_bytes_deprecated(): + with pytest.warns(FutureWarning, match="parse_bytes is deprecated"): + from distributed.utils import parse_bytes + assert parse_bytes is dask.utils.parse_bytes + + +def test_format_bytes_deprecated(): + with pytest.warns(FutureWarning, match="format_bytes is deprecated"): + from distributed.utils import format_bytes + assert format_bytes is dask.utils.format_bytes + + +def test_format_time_deprecated(): + with pytest.warns(FutureWarning, match="format_time is deprecated"): + from distributed.utils import format_time + assert format_time is dask.utils.format_time + + +def test_funcname_deprecated(): + with pytest.warns(FutureWarning, match="funcname is deprecated"): + from distributed.utils import funcname + assert funcname is dask.utils.funcname + + +def test_parse_timedelta_deprecated(): + with pytest.warns(FutureWarning, match="parse_timedelta is deprecated"): + from distributed.utils import parse_timedelta + assert parse_timedelta is dask.utils.parse_timedelta diff --git a/distributed/utils.py b/distributed/utils.py index 33793b2f50..dcea26e628 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -41,19 +41,7 @@ import dask from dask import istask - -# Import config serialization functions here for backward compatibility -from dask.config import deserialize as deserialize_for_cli # noqa -from dask.config import serialize as serialize_for_cli # noqa - -# provide format_bytes here for backwards compatibility -from dask.utils import ( # noqa: F401 - format_bytes, - format_time, - funcname, - parse_bytes, - parse_timedelta, -) +from dask.utils import parse_timedelta as _parse_timedelta try: from tornado.ioloop import PollIOLoop @@ -286,7 +274,7 @@ def sync(loop, func, *args, callback_timeout=None, **kwargs): """ Run coroutine in loop running in separate thread. """ - callback_timeout = parse_timedelta(callback_timeout, "s") + callback_timeout = _parse_timedelta(callback_timeout, "s") # Tornado's PollIOLoop doesn't raise when using closed, do it ourselves if PollIOLoop and ( (isinstance(loop, PollIOLoop) and getattr(loop, "_closing", False)) @@ -1150,7 +1138,7 @@ def warn_on_duration(duration, msg): start = time() yield stop = time() - if stop - start > parse_timedelta(duration): + if stop - start > _parse_timedelta(duration): warnings.warn(msg, stacklevel=2) @@ -1455,3 +1443,29 @@ def clean_dashboard_address(addrs: AnyType, default_listen_ip: str = "") -> List addresses.append({"address": host, "port": port}) return addresses + + +_deprecations = { + "deserialize_for_cli": "dask.config.deserialize", + "serialize_for_cli": "dask.config.serialize", + "format_bytes": "dask.utils.format_bytes", + "format_time": "dask.utils.format_time", + "funcname": "dask.utils.funcname", + "parse_bytes": "dask.utils.parse_bytes", + "parse_timedelta": "dask.utils.parse_timedelta", +} + + +def __getattr__(name): + if name in _deprecations: + use_instead = _deprecations[name] + + warnings.warn( + f"{name} is deprecated and will be removed in a future release. " + f"Please use {use_instead} instead.", + category=FutureWarning, + stacklevel=2, + ) + return import_term(use_instead) + else: + raise AttributeError(f"module {__name__} has no attribute {name}") diff --git a/distributed/variable.py b/distributed/variable.py index 5be0d09791..db74cabddc 100644 --- a/distributed/variable.py +++ b/distributed/variable.py @@ -6,10 +6,10 @@ from tlz import merge -from dask.utils import stringify +from dask.utils import parse_timedelta, stringify from .client import Client, Future -from .utils import TimeoutError, log_errors, parse_timedelta +from .utils import TimeoutError, log_errors from .worker import get_client, get_worker logger = logging.getLogger(__name__) diff --git a/distributed/worker.py b/distributed/worker.py index 685f6df624..3d5146f09e 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -25,7 +25,7 @@ import dask from dask.core import istask from dask.system import CPU_COUNT -from dask.utils import apply, format_bytes, funcname +from dask.utils import apply, format_bytes, funcname, parse_bytes, parse_timedelta from . import comm, preloading, profile, system, utils from .batched import BatchedSend @@ -65,9 +65,7 @@ key_split, log_errors, offload, - parse_bytes, parse_ports, - parse_timedelta, silence_logging, thread_state, typename, diff --git a/distributed/worker_client.py b/distributed/worker_client.py index 7ad1cb7e20..989a3f8f0d 100644 --- a/distributed/worker_client.py +++ b/distributed/worker_client.py @@ -4,7 +4,6 @@ import dask from .threadpoolexecutor import rejoin, secede -from .utils import parse_timedelta from .worker import get_client, get_worker, thread_state @@ -46,7 +45,7 @@ def worker_client(timeout=None, separate_thread=True): if timeout is None: timeout = dask.config.get("distributed.comm.timeouts.connect") - timeout = parse_timedelta(timeout, "s") + timeout = dask.utils.parse_timedelta(timeout, "s") worker = get_worker() client = get_client(timeout=timeout)