diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b37ec0ce88..8ec0efb847 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,9 @@ repos: + - repo: https://github.com/MarcoGorelli/absolufy-imports + rev: v0.3.1 + hooks: + - id: absolufy-imports + name: absolufy-imports - repo: https://github.com/pycqa/isort rev: 5.10.1 hooks: diff --git a/distributed/__init__.py b/distributed/__init__.py index d33514ab69..517e1401b5 100644 --- a/distributed/__init__.py +++ b/distributed/__init__.py @@ -1,13 +1,13 @@ -from . import config # isort:skip; load distributed configuration first -from . import widgets # isort:skip; load distributed widgets second +from distributed import config # isort:skip; load distributed configuration first +from distributed import widgets # isort:skip; load distributed widgets second import dask from dask.config import config # type: ignore -from ._version import get_versions -from .actor import Actor, ActorFuture, BaseActorFuture -from .client import ( +from distributed._version import get_versions +from distributed.actor import Actor, ActorFuture, BaseActorFuture +from distributed.client import ( Client, CompatibleExecutor, Executor, @@ -21,9 +21,9 @@ performance_report, wait, ) -from .core import Status, connect, rpc -from .deploy import Adaptive, LocalCluster, SpecCluster, SSHCluster -from .diagnostics.plugin import ( +from distributed.core import Status, connect, rpc +from distributed.deploy import Adaptive, LocalCluster, SpecCluster, SSHCluster +from distributed.diagnostics.plugin import ( Environ, NannyPlugin, PipInstall, @@ -32,21 +32,29 @@ UploadFile, WorkerPlugin, ) -from .diagnostics.progressbar import progress -from .event import Event -from .lock import Lock -from .multi_lock import MultiLock -from .nanny import Nanny -from .pubsub import Pub, Sub -from .queues import Queue -from .scheduler import Scheduler -from .security import Security -from .semaphore import Semaphore -from .threadpoolexecutor import rejoin -from .utils import CancelledError, TimeoutError, sync -from .variable import Variable -from .worker import Reschedule, Worker, get_client, get_worker, print, secede, warn -from .worker_client import local_client, worker_client +from distributed.diagnostics.progressbar import progress +from distributed.event import Event +from distributed.lock import Lock +from distributed.multi_lock import MultiLock +from distributed.nanny import Nanny +from distributed.pubsub import Pub, Sub +from distributed.queues import Queue +from distributed.scheduler import Scheduler +from distributed.security import Security +from distributed.semaphore import Semaphore +from distributed.threadpoolexecutor import rejoin +from distributed.utils import CancelledError, TimeoutError, sync +from distributed.variable import Variable +from distributed.worker import ( + Reschedule, + Worker, + get_client, + get_worker, + print, + secede, + warn, +) +from distributed.worker_client import local_client, worker_client def __getattr__(name): @@ -59,7 +67,7 @@ def __getattr__(name): return __version__ if name == "__git_revision__": - from ._version import get_versions + from distributed._version import get_versions __git_revision__ = get_versions()["full-revisionid"] return __git_revision__ diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 4a61609590..dd059c9d1b 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -10,13 +10,13 @@ import dask from dask.utils import parse_timedelta -from .core import Status -from .metrics import time -from .utils import import_term, log_errors +from distributed.core import Status +from distributed.metrics import time +from distributed.utils import import_term, log_errors if TYPE_CHECKING: # pragma: nocover - from .client import Client - from .scheduler import Scheduler, TaskState, WorkerState + from distributed.client import Client + from distributed.scheduler import Scheduler, TaskState, WorkerState # Main logger. This is reasonably terse also at DEBUG level. logger = logging.getLogger(__name__) diff --git a/distributed/actor.py b/distributed/actor.py index b94d879f3d..84e3fe4da3 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -11,11 +11,11 @@ from tornado.ioloop import IOLoop -from .client import Future -from .protocol import to_serialize -from .utils import iscoroutinefunction, sync, thread_state -from .utils_comm import WrappedKey -from .worker import get_client, get_worker +from distributed.client import Future +from distributed.protocol import to_serialize +from distributed.utils import iscoroutinefunction, sync, thread_state +from distributed.utils_comm import WrappedKey +from distributed.worker import get_client, get_worker _T = TypeVar("_T") diff --git a/distributed/batched.py b/distributed/batched.py index 3e6cbcfd30..e141534f17 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -7,8 +7,8 @@ import dask from dask.utils import parse_timedelta -from .core import CommClosedError -from .metrics import time +from distributed.core import CommClosedError +from distributed.metrics import time logger = logging.getLogger(__name__) diff --git a/distributed/cfexecutor.py b/distributed/cfexecutor.py index 49e2376eca..e14bf377ac 100644 --- a/distributed/cfexecutor.py +++ b/distributed/cfexecutor.py @@ -6,8 +6,8 @@ from dask.utils import parse_timedelta -from .metrics import time -from .utils import TimeoutError, sync +from distributed.metrics import time +from distributed.utils import TimeoutError, sync @gen.coroutine diff --git a/distributed/client.py b/distributed/client.py index b74a8526e0..ce7652e762 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -52,11 +52,11 @@ from tornado import gen from tornado.ioloop import PeriodicCallback -from . import cluster_dump, preloading -from . import versions as version_module # type: ignore -from .batched import BatchedSend -from .cfexecutor import ClientExecutor -from .core import ( +from distributed import cluster_dump, preloading +from distributed import versions as version_module # type: ignore +from distributed.batched import BatchedSend +from distributed.cfexecutor import ClientExecutor +from distributed.core import ( CommClosedError, ConnectionPool, PooledRPCCall, @@ -65,17 +65,22 @@ connect, rpc, ) -from .diagnostics.plugin import NannyPlugin, UploadFile, WorkerPlugin, _get_plugin_name -from .metrics import time -from .objects import HasWhat, SchedulerInfo, WhoHas -from .protocol import to_serialize -from .protocol.pickle import dumps, loads -from .publish import Datasets -from .pubsub import PubSubClientExtension -from .security import Security -from .sizeof import sizeof -from .threadpoolexecutor import rejoin -from .utils import ( +from distributed.diagnostics.plugin import ( + NannyPlugin, + UploadFile, + WorkerPlugin, + _get_plugin_name, +) +from distributed.metrics import time +from distributed.objects import HasWhat, SchedulerInfo, WhoHas +from distributed.protocol import to_serialize +from distributed.protocol.pickle import dumps, loads +from distributed.publish import Datasets +from distributed.pubsub import PubSubClientExtension +from distributed.security import Security +from distributed.sizeof import sizeof +from distributed.threadpoolexecutor import rejoin +from distributed.utils import ( All, Any, CancelledError, @@ -91,7 +96,7 @@ sync, thread_state, ) -from .utils_comm import ( +from distributed.utils_comm import ( WrappedKey, gather_from_workers, pack_data, @@ -99,7 +104,7 @@ scatter_to_workers, unpack_remotedata, ) -from .worker import get_client, get_worker, secede +from distributed.worker import get_client, get_worker, secede logger = logging.getLogger(__name__) @@ -1016,7 +1021,7 @@ def dashboard_link(self): return format_dashboard_link(host, port) def _get_scheduler_info(self): - from .scheduler import Scheduler + from distributed.scheduler import Scheduler if ( self.cluster @@ -1152,7 +1157,7 @@ async def _start(self, timeout=no_default, **kwargs): except (ValueError, KeyError): # JSON file not yet flushed await asyncio.sleep(0.01) elif self._start_arg is None: - from .deploy import LocalCluster + from distributed.deploy import LocalCluster try: self.cluster = await LocalCluster( @@ -3783,7 +3788,7 @@ async def _profile( plot = True if plot: - from . import profile + from distributed import profile data = profile.plot_data(state) figure, source = profile.plot_figure(data, sizing_mode="stretch_both") @@ -4332,17 +4337,17 @@ def start_ipython_workers( magic_names = [magic_names] if "IPython" in sys.modules: - from ._ipython_utils import register_remote_magic + from distributed._ipython_utils import register_remote_magic register_remote_magic() if magic_names: - from ._ipython_utils import register_worker_magic + from distributed._ipython_utils import register_worker_magic for worker, magic_name in zip(workers, magic_names): connection_info = info_dict[worker] register_worker_magic(connection_info, magic_name) if qtconsole: - from ._ipython_utils import connect_qtconsole + from distributed._ipython_utils import connect_qtconsole for worker, connection_info in info_dict.items(): name = "dask-" + worker.replace(":", "-").replace("/", "-") @@ -4397,11 +4402,11 @@ def start_ipython_scheduler( else: magic_name = None if magic_name: - from ._ipython_utils import register_worker_magic + from distributed._ipython_utils import register_worker_magic register_worker_magic(info, magic_name) if qtconsole: - from ._ipython_utils import connect_qtconsole + from distributed._ipython_utils import connect_qtconsole connect_qtconsole(info, name="dask-scheduler", extra_args=qtconsole_args) return info @@ -4517,10 +4522,10 @@ async def _get_task_stream( ): msgs = await self.scheduler.get_task_stream(start=start, stop=stop, count=count) if plot: - from .diagnostics.task_stream import rectangles + from distributed.diagnostics.task_stream import rectangles rects = rectangles(msgs) - from .dashboard.components.scheduler import task_stream_figure + from distributed.dashboard.components.scheduler import task_stream_figure source, figure = task_stream_figure(sizing_mode="stretch_both") source.data.update(rects) @@ -4755,7 +4760,7 @@ def unregister_worker_plugin(self, name, nanny=None): @property def amm(self): """Convenience accessors for the :doc:`active_memory_manager`""" - from .active_memory_manager import AMMClientProxy + from distributed.active_memory_manager import AMMClientProxy return AMMClientProxy(self) @@ -4980,7 +4985,7 @@ def update(self, futures): """Add multiple futures to the collection. The added futures will emit from the iterator once they finish""" - from .actor import BaseActorFuture + from distributed.actor import BaseActorFuture with self.lock: for f in futures: diff --git a/distributed/comm/__init__.py b/distributed/comm/__init__.py index a93e7705d3..031090b0dc 100644 --- a/distributed/comm/__init__.py +++ b/distributed/comm/__init__.py @@ -1,4 +1,4 @@ -from .addressing import ( +from distributed.comm.addressing import ( get_address_host, get_address_host_port, get_local_address_for, @@ -9,25 +9,25 @@ unparse_address, unparse_host_port, ) -from .core import Comm, CommClosedError, connect, listen -from .registry import backends -from .utils import get_tcp_server_address, get_tcp_server_addresses +from distributed.comm.core import Comm, CommClosedError, connect, listen +from distributed.comm.registry import backends +from distributed.comm.utils import get_tcp_server_address, get_tcp_server_addresses def _register_transports(): import dask.config - from . import inproc, ws + from distributed.comm import inproc, ws tcp_backend = dask.config.get("distributed.comm.tcp.backend") if tcp_backend == "asyncio": - from . import asyncio_tcp + from distributed.comm import asyncio_tcp backends["tcp"] = asyncio_tcp.TCPBackend() backends["tls"] = asyncio_tcp.TLSBackend() elif tcp_backend == "tornado": - from . import tcp + from distributed.comm import tcp backends["tcp"] = tcp.TCPBackend() backends["tls"] = tcp.TLSBackend() @@ -38,7 +38,7 @@ def _register_transports(): ) try: - from . import ucx + from distributed.comm import ucx except ImportError: pass diff --git a/distributed/comm/addressing.py b/distributed/comm/addressing.py index efaa22ce55..1cde0fd0a1 100644 --- a/distributed/comm/addressing.py +++ b/distributed/comm/addressing.py @@ -4,8 +4,8 @@ import dask -from ..utils import get_ip_interface -from . import registry +from distributed.comm import registry +from distributed.utils import get_ip_interface def parse_address(addr: str, strict: bool = False) -> tuple[str, str]: diff --git a/distributed/comm/asyncio_tcp.py b/distributed/comm/asyncio_tcp.py index a6eca647f8..84ba45f018 100644 --- a/distributed/comm/asyncio_tcp.py +++ b/distributed/comm/asyncio_tcp.py @@ -17,11 +17,16 @@ import dask -from ..utils import ensure_ip, get_ip, get_ipv6 -from .addressing import parse_host_port, unparse_host_port -from .core import Comm, CommClosedError, Connector, Listener -from .registry import Backend -from .utils import ensure_concrete_host, from_frames, host_array, to_frames +from distributed.comm.addressing import parse_host_port, unparse_host_port +from distributed.comm.core import Comm, CommClosedError, Connector, Listener +from distributed.comm.registry import Backend +from distributed.comm.utils import ( + ensure_concrete_host, + from_frames, + host_array, + to_frames, +) +from distributed.utils import ensure_ip, get_ip, get_ipv6 logger = logging.getLogger(__name__) diff --git a/distributed/comm/core.py b/distributed/comm/core.py index 1df3161f78..5f9235a26b 100644 --- a/distributed/comm/core.py +++ b/distributed/comm/core.py @@ -13,11 +13,11 @@ import dask from dask.utils import parse_timedelta -from ..metrics import time -from ..protocol import pickle -from ..protocol.compression import get_default_compression -from . import registry -from .addressing import parse_address +from distributed.comm import registry +from distributed.comm.addressing import parse_address +from distributed.metrics import time +from distributed.protocol import pickle +from distributed.protocol.compression import get_default_compression logger = logging.getLogger(__name__) diff --git a/distributed/comm/inproc.py b/distributed/comm/inproc.py index 60af1fd413..041c9d530a 100644 --- a/distributed/comm/inproc.py +++ b/distributed/comm/inproc.py @@ -9,10 +9,10 @@ from tornado.concurrent import Future from tornado.ioloop import IOLoop -from ..protocol import nested_deserialize -from ..utils import get_ip -from .core import Comm, CommClosedError, Connector, Listener -from .registry import Backend, backends +from distributed.comm.core import Comm, CommClosedError, Connector, Listener +from distributed.comm.registry import Backend, backends +from distributed.protocol import nested_deserialize +from distributed.utils import get_ip logger = logging.getLogger(__name__) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index f0435002b7..365860894b 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -25,20 +25,26 @@ import dask from dask.utils import parse_timedelta -from ..protocol.utils import pack_frames_prelude, unpack_frames -from ..system import MEMORY_LIMIT -from ..threadpoolexecutor import ThreadPoolExecutor -from ..utils import ensure_ip, get_ip, get_ipv6, nbytes -from .addressing import parse_host_port, unparse_host_port -from .core import Comm, CommClosedError, Connector, FatalCommClosedError, Listener -from .registry import Backend -from .utils import ( +from distributed.comm.addressing import parse_host_port, unparse_host_port +from distributed.comm.core import ( + Comm, + CommClosedError, + Connector, + FatalCommClosedError, + Listener, +) +from distributed.comm.registry import Backend +from distributed.comm.utils import ( ensure_concrete_host, from_frames, get_tcp_server_address, host_array, to_frames, ) +from distributed.protocol.utils import pack_frames_prelude, unpack_frames +from distributed.system import MEMORY_LIMIT +from distributed.threadpoolexecutor import ThreadPoolExecutor +from distributed.utils import ensure_ip, get_ip, get_ipv6, nbytes logger = logging.getLogger(__name__) diff --git a/distributed/comm/tests/test_ucx.py b/distributed/comm/tests/test_ucx.py index c979c9623b..8e6e9cb4ff 100644 --- a/distributed/comm/tests/test_ucx.py +++ b/distributed/comm/tests/test_ucx.py @@ -174,7 +174,7 @@ async def test_ucx_deserialize(): # Note we see this error on some systems with this test: # `socket.gaierror: [Errno -5] No address associated with hostname` # This may be due to a system configuration issue. - from .test_comms import check_deserialize + from distributed.comm.tests.test_comms import check_deserialize await check_deserialize("tcp://") diff --git a/distributed/comm/tests/test_ws.py b/distributed/comm/tests/test_ws.py index 5cb8886ffa..c75c1a80b7 100644 --- a/distributed/comm/tests/test_ws.py +++ b/distributed/comm/tests/test_ws.py @@ -9,6 +9,7 @@ from distributed.comm import connect, listen, ws from distributed.comm.core import FatalCommClosedError from distributed.comm.registry import backends, get_backend +from distributed.comm.tests.test_comms import check_tls_extra from distributed.security import Security from distributed.utils_test import ( gen_cluster, @@ -19,8 +20,6 @@ xfail_ssl_issue5601, ) -from .test_comms import check_tls_extra - def test_registered(): assert "ws" in backends diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 5ce0da9c7a..7fc252bf55 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -16,12 +16,17 @@ import dask from dask.utils import parse_bytes -from ..diagnostics.nvml import has_cuda_context -from ..utils import ensure_ip, get_ip, get_ipv6, log_errors, nbytes -from .addressing import parse_host_port, unparse_host_port -from .core import Comm, CommClosedError, Connector, Listener -from .registry import Backend, backends -from .utils import ensure_concrete_host, from_frames, host_array, to_frames +from distributed.comm.addressing import parse_host_port, unparse_host_port +from distributed.comm.core import Comm, CommClosedError, Connector, Listener +from distributed.comm.registry import Backend, backends +from distributed.comm.utils import ( + ensure_concrete_host, + from_frames, + host_array, + to_frames, +) +from distributed.diagnostics.nvml import has_cuda_context +from distributed.utils import ensure_ip, get_ip, get_ipv6, log_errors, nbytes logger = logging.getLogger(__name__) diff --git a/distributed/comm/utils.py b/distributed/comm/utils.py index c9873bc43d..5aff622de5 100644 --- a/distributed/comm/utils.py +++ b/distributed/comm/utils.py @@ -6,8 +6,8 @@ from dask.sizeof import sizeof from dask.utils import parse_bytes -from .. import protocol -from ..utils import get_ip, get_ipv6, nbytes, offload +from distributed import protocol +from distributed.utils import get_ip, get_ipv6, nbytes, offload logger = logging.getLogger(__name__) diff --git a/distributed/comm/ws.py b/distributed/comm/ws.py index ede37bcafd..2f6a9279fd 100644 --- a/distributed/comm/ws.py +++ b/distributed/comm/ws.py @@ -16,12 +16,27 @@ import dask -from ..utils import ensure_bytes, nbytes -from .addressing import parse_host_port, unparse_host_port -from .core import Comm, CommClosedError, Connector, FatalCommClosedError, Listener -from .registry import backends -from .tcp import BaseTCPBackend, _expect_tls_context, convert_stream_closed_error -from .utils import ensure_concrete_host, from_frames, get_tcp_server_address, to_frames +from distributed.comm.addressing import parse_host_port, unparse_host_port +from distributed.comm.core import ( + Comm, + CommClosedError, + Connector, + FatalCommClosedError, + Listener, +) +from distributed.comm.registry import backends +from distributed.comm.tcp import ( + BaseTCPBackend, + _expect_tls_context, + convert_stream_closed_error, +) +from distributed.comm.utils import ( + ensure_concrete_host, + from_frames, + get_tcp_server_address, + to_frames, +) +from distributed.utils import ensure_bytes, nbytes logger = logging.getLogger(__name__) diff --git a/distributed/config.py b/distributed/config.py index 15e563b18a..8da77348fe 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -9,7 +9,7 @@ import dask from dask.utils import import_required -from .compatibility import WINDOWS, logging_names +from distributed.compatibility import WINDOWS, logging_names config = dask.config.config diff --git a/distributed/core.py b/distributed/core.py index cf209fd5ca..df57a337d5 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -24,10 +24,8 @@ import dask from dask.utils import parse_timedelta -from distributed.utils import recursive_to_dict - -from . import profile, protocol -from .comm import ( +from distributed import profile, protocol +from distributed.comm import ( Comm, CommClosedError, connect, @@ -36,13 +34,14 @@ normalize_address, unparse_host_port, ) -from .metrics import time -from .system_monitor import SystemMonitor -from .utils import ( +from distributed.metrics import time +from distributed.system_monitor import SystemMonitor +from distributed.utils import ( TimeoutError, get_traceback, has_keyword, is_coroutine_function, + recursive_to_dict, truncate_exception, ) @@ -215,11 +214,11 @@ def stop(): # Statistics counters for various events with suppress(ImportError): - from .counter import Digest + from distributed.counter import Digest self.digests = defaultdict(partial(Digest, loop=self.io_loop)) - from .counter import Counter + from distributed.counter import Counter self.counters = defaultdict(partial(Counter, loop=self.io_loop)) diff --git a/distributed/dashboard/scheduler.py b/distributed/dashboard/scheduler.py index 98791acc9b..3d8e62d95f 100644 --- a/distributed/dashboard/scheduler.py +++ b/distributed/dashboard/scheduler.py @@ -3,9 +3,13 @@ from tornado import web from tornado.ioloop import IOLoop -from .components.nvml import gpu_doc # noqa: 1708 -from .components.nvml import NVML_ENABLED, gpu_memory_doc, gpu_utilization_doc -from .components.scheduler import ( +from distributed.dashboard.components.nvml import gpu_doc # noqa: 1708 +from distributed.dashboard.components.nvml import ( + NVML_ENABLED, + gpu_memory_doc, + gpu_utilization_doc, +) +from distributed.dashboard.components.scheduler import ( AggregateAction, BandwidthTypes, BandwidthWorkers, @@ -38,8 +42,8 @@ tg_graph_doc, workers_doc, ) -from .core import BokehApplication -from .worker import counters_doc +from distributed.dashboard.core import BokehApplication +from distributed.dashboard.worker import counters_doc applications = { "/system": systemmonitor_doc, diff --git a/distributed/dashboard/worker.py b/distributed/dashboard/worker.py index 1e65cfcc76..15e6e47ab2 100644 --- a/distributed/dashboard/worker.py +++ b/distributed/dashboard/worker.py @@ -1,6 +1,6 @@ from tornado.ioloop import IOLoop -from .components.worker import ( +from distributed.dashboard.components.worker import ( counters_doc, crossfilter_doc, profile_doc, @@ -8,7 +8,7 @@ status_doc, systemmonitor_doc, ) -from .core import BokehApplication +from distributed.dashboard.core import BokehApplication template_variables = { "pages": ["status", "system", "profile", "crossfilter", "profile-server"] diff --git a/distributed/deploy/__init__.py b/distributed/deploy/__init__.py index 1518942dc4..36ef209c88 100644 --- a/distributed/deploy/__init__.py +++ b/distributed/deploy/__init__.py @@ -1,10 +1,10 @@ from contextlib import suppress -from .adaptive import Adaptive -from .cluster import Cluster -from .local import LocalCluster -from .spec import ProcessInterface, SpecCluster -from .ssh import SSHCluster +from distributed.deploy.adaptive import Adaptive +from distributed.deploy.cluster import Cluster +from distributed.deploy.local import LocalCluster +from distributed.deploy.spec import ProcessInterface, SpecCluster +from distributed.deploy.ssh import SSHCluster with suppress(ImportError): - from .ssh import SSHCluster + from distributed.deploy.ssh import SSHCluster diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 8280575be9..d17f82fc89 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -6,9 +6,9 @@ import dask.config from dask.utils import parse_timedelta -from ..protocol import pickle -from ..utils import log_errors -from .adaptive_core import AdaptiveCore +from distributed.deploy.adaptive_core import AdaptiveCore +from distributed.protocol import pickle +from distributed.utils import log_errors logger = logging.getLogger(__name__) diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index 1ee32896a3..a5f8eee0a1 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -12,10 +12,10 @@ from dask.utils import parse_timedelta -from ..metrics import time +from distributed.metrics import time if TYPE_CHECKING: - from ..scheduler import WorkerState + from distributed.scheduler import WorkerState logger = logging.getLogger(__name__) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 9c7eadc38f..047357cc79 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -11,9 +11,10 @@ from dask.utils import _deprecated, format_bytes, parse_timedelta, typename from dask.widgets import get_template -from ..core import Status -from ..objects import SchedulerInfo -from ..utils import ( +from distributed.core import Status +from distributed.deploy.adaptive import Adaptive +from distributed.objects import SchedulerInfo +from distributed.utils import ( Log, Logs, LoopRunner, @@ -22,7 +23,6 @@ format_dashboard_link, log_errors, ) -from .adaptive import Adaptive logger = logging.getLogger(__name__) diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index 1180c93108..db74bf9201 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -7,12 +7,12 @@ from dask.system import CPU_COUNT from dask.widgets import get_template -from ..nanny import Nanny -from ..scheduler import Scheduler -from ..security import Security -from ..worker import Worker, parse_memory_limit -from .spec import SpecCluster -from .utils import nprocesses_nthreads +from distributed.deploy.spec import SpecCluster +from distributed.deploy.utils import nprocesses_nthreads +from distributed.nanny import Nanny +from distributed.scheduler import Scheduler +from distributed.security import Security +from distributed.worker import Worker, parse_memory_limit logger = logging.getLogger(__name__) diff --git a/distributed/deploy/old_ssh.py b/distributed/deploy/old_ssh.py index 4b6e8d894d..66ab526718 100644 --- a/distributed/deploy/old_ssh.py +++ b/distributed/deploy/old_ssh.py @@ -11,7 +11,7 @@ from tlz import merge from tornado import gen -from ..metrics import time +from distributed.metrics import time logger = logging.getLogger(__name__) diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 439b3ca471..3a9fa16a3e 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -16,12 +16,12 @@ from dask.utils import parse_bytes, parse_timedelta from dask.widgets import get_template -from ..core import CommClosedError, Status, rpc -from ..scheduler import Scheduler -from ..security import Security -from ..utils import NoOpAwaitable, TimeoutError, import_term, silence_logging -from .adaptive import Adaptive -from .cluster import Cluster +from distributed.core import CommClosedError, Status, rpc +from distributed.deploy.adaptive import Adaptive +from distributed.deploy.cluster import Cluster +from distributed.scheduler import Scheduler +from distributed.security import Security +from distributed.utils import NoOpAwaitable, TimeoutError, import_term, silence_logging logger = logging.getLogger(__name__) diff --git a/distributed/deploy/ssh.py b/distributed/deploy/ssh.py index 8cb4d88e43..da8151f854 100644 --- a/distributed/deploy/ssh.py +++ b/distributed/deploy/ssh.py @@ -10,7 +10,7 @@ import dask import dask.config -from .spec import ProcessInterface, SpecCluster +from distributed.deploy.spec import ProcessInterface, SpecCluster logger = logging.getLogger(__name__) @@ -396,7 +396,7 @@ def SSHCluster( ) if set(kwargs) & old_cluster_kwargs: - from .old_ssh import SSHCluster as OldSSHCluster + from distributed.deploy.old_ssh import SSHCluster as OldSSHCluster warnings.warn( "Note that the SSHCluster API has been replaced. " diff --git a/distributed/deploy/utils_test.py b/distributed/deploy/utils_test.py index 53fba2dd46..08f36a32fe 100644 --- a/distributed/deploy/utils_test.py +++ b/distributed/deploy/utils_test.py @@ -4,7 +4,7 @@ import pytest -from ..client import Client +from distributed.client import Client class ClusterTest: diff --git a/distributed/diagnostics/__init__.py b/distributed/diagnostics/__init__.py index d7b3337a69..6f6bb154bb 100644 --- a/distributed/diagnostics/__init__.py +++ b/distributed/diagnostics/__init__.py @@ -1,3 +1,3 @@ -from .graph_layout import GraphLayout -from .memory_sampler import MemorySampler -from .plugin import SchedulerPlugin +from distributed.diagnostics.graph_layout import GraphLayout +from distributed.diagnostics.memory_sampler import MemorySampler +from distributed.diagnostics.plugin import SchedulerPlugin diff --git a/distributed/diagnostics/eventstream.py b/distributed/diagnostics/eventstream.py index 4ace48282e..88e6adfca1 100644 --- a/distributed/diagnostics/eventstream.py +++ b/distributed/diagnostics/eventstream.py @@ -1,8 +1,8 @@ import logging -from ..core import coerce_to_address, connect -from ..worker import dumps_function -from .plugin import SchedulerPlugin +from distributed.core import coerce_to_address, connect +from distributed.diagnostics.plugin import SchedulerPlugin +from distributed.worker import dumps_function logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/graph_layout.py b/distributed/diagnostics/graph_layout.py index 5e48f93908..88bdbeddeb 100644 --- a/distributed/diagnostics/graph_layout.py +++ b/distributed/diagnostics/graph_layout.py @@ -1,6 +1,6 @@ import uuid -from .plugin import SchedulerPlugin +from distributed.diagnostics.plugin import SchedulerPlugin class GraphLayout(SchedulerPlugin): diff --git a/distributed/diagnostics/memory_sampler.py b/distributed/diagnostics/memory_sampler.py index b29378df11..11acd79113 100644 --- a/distributed/diagnostics/memory_sampler.py +++ b/distributed/diagnostics/memory_sampler.py @@ -9,8 +9,8 @@ if TYPE_CHECKING: # circular dependencies - from ..client import Client - from ..scheduler import Scheduler + from distributed.client import Client + from distributed.scheduler import Scheduler class MemorySampler: @@ -80,7 +80,7 @@ def sample( Default: 0.5 """ if not client: - from ..client import get_client + from distributed.client import get_client client = get_client() diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index 4a3ce06d6d..9fb5e5c369 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -255,7 +255,7 @@ def __init__(self, packages, pip_options=None, restart=False): self.pip_options = pip_options async def setup(self, worker): - from ..lock import Lock + from distributed.lock import Lock async with Lock(socket.gethostname()): # don't clobber one installation logger.info("Pip installing the following packages: %s", self.packages) diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index a3d0dc5ca3..c620c25b8e 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -10,9 +10,9 @@ from dask.base import tokenize from dask.utils import stringify -from ..metrics import time -from ..utils import key_split -from .plugin import SchedulerPlugin +from distributed.diagnostics.plugin import SchedulerPlugin +from distributed.metrics import time +from distributed.utils import key_split logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/progress_stream.py b/distributed/diagnostics/progress_stream.py index 440f7a5904..45f84b1c2c 100644 --- a/distributed/diagnostics/progress_stream.py +++ b/distributed/diagnostics/progress_stream.py @@ -2,10 +2,10 @@ from tlz import merge, valmap -from ..core import coerce_to_address, connect -from ..utils import color_of, key_split -from ..worker import dumps_function -from .progress import AllProgress +from distributed.core import coerce_to_address, connect +from distributed.diagnostics.progress import AllProgress +from distributed.utils import color_of, key_split +from distributed.worker import dumps_function logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/progressbar.py b/distributed/diagnostics/progressbar.py index 02ab8420f4..4a8c91702c 100644 --- a/distributed/diagnostics/progressbar.py +++ b/distributed/diagnostics/progressbar.py @@ -10,11 +10,16 @@ import dask -from ..client import default_client, futures_of -from ..core import CommClosedError, clean_exception, coerce_to_address, connect -from ..protocol.pickle import dumps -from ..utils import LoopRunner, is_kernel, key_split -from .progress import MultiProgress, Progress, format_time +from distributed.client import default_client, futures_of +from distributed.core import ( + CommClosedError, + clean_exception, + coerce_to_address, + connect, +) +from distributed.diagnostics.progress import MultiProgress, Progress, format_time +from distributed.protocol.pickle import dumps +from distributed.utils import LoopRunner, is_kernel, key_split logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/task_stream.py b/distributed/diagnostics/task_stream.py index 93921a89fd..2f956397e6 100644 --- a/distributed/diagnostics/task_stream.py +++ b/distributed/diagnostics/task_stream.py @@ -4,10 +4,10 @@ import dask from dask.utils import format_time, parse_timedelta -from ..metrics import time -from ..utils import key_split -from .plugin import SchedulerPlugin -from .progress_stream import color_of +from distributed.diagnostics.plugin import SchedulerPlugin +from distributed.diagnostics.progress_stream import color_of +from distributed.metrics import time +from distributed.utils import key_split logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/websocket.py b/distributed/diagnostics/websocket.py index 84aed34360..99a0faca88 100644 --- a/distributed/diagnostics/websocket.py +++ b/distributed/diagnostics/websocket.py @@ -1,6 +1,6 @@ -from ..utils import key_split -from .plugin import SchedulerPlugin -from .task_stream import colors +from distributed.diagnostics.plugin import SchedulerPlugin +from distributed.diagnostics.task_stream import colors +from distributed.utils import key_split class WebsocketPlugin(SchedulerPlugin): diff --git a/distributed/diskutils.py b/distributed/diskutils.py index fc694f5cb7..70bf8ea829 100644 --- a/distributed/diskutils.py +++ b/distributed/diskutils.py @@ -11,7 +11,7 @@ import dask -from . import locket +from distributed import locket logger = logging.getLogger(__name__) diff --git a/distributed/event.py b/distributed/event.py index 2f9a7787af..0765e00315 100644 --- a/distributed/event.py +++ b/distributed/event.py @@ -6,9 +6,9 @@ from dask.utils import parse_timedelta -from .client import Client -from .utils import TimeoutError, log_errors -from .worker import get_worker +from distributed.client import Client +from distributed.utils import TimeoutError, log_errors +from distributed.worker import get_worker logger = logging.getLogger(__name__) diff --git a/distributed/http/__init__.py b/distributed/http/__init__.py index b41a454ed2..00349e3398 100644 --- a/distributed/http/__init__.py +++ b/distributed/http/__init__.py @@ -1 +1 @@ -from .utils import get_handlers +from distributed.http.utils import get_handlers diff --git a/distributed/http/scheduler/info.py b/distributed/http/scheduler/info.py index 6d780c0457..40f44b79a2 100644 --- a/distributed/http/scheduler/info.py +++ b/distributed/http/scheduler/info.py @@ -12,10 +12,10 @@ from dask.utils import format_bytes, format_time -from ...diagnostics.websocket import WebsocketPlugin -from ...metrics import time -from ...utils import log_errors -from ..utils import RequestHandler, redirect +from distributed.diagnostics.websocket import WebsocketPlugin +from distributed.http.utils import RequestHandler, redirect +from distributed.metrics import time +from distributed.utils import log_errors ns = { func.__name__: func diff --git a/distributed/http/scheduler/json.py b/distributed/http/scheduler/json.py index 758e2ba338..69b2510d86 100644 --- a/distributed/http/scheduler/json.py +++ b/distributed/http/scheduler/json.py @@ -1,7 +1,7 @@ from __future__ import annotations -from ...utils import log_errors -from ..utils import RequestHandler +from distributed.http.utils import RequestHandler +from distributed.utils import log_errors class CountsJSON(RequestHandler): diff --git a/distributed/http/scheduler/missing_bokeh.py b/distributed/http/scheduler/missing_bokeh.py index a21d37e5dc..868216615e 100644 --- a/distributed/http/scheduler/missing_bokeh.py +++ b/distributed/http/scheduler/missing_bokeh.py @@ -1,7 +1,7 @@ from __future__ import annotations -from ...utils import log_errors -from ..utils import RequestHandler, redirect +from distributed.http.utils import RequestHandler, redirect +from distributed.utils import log_errors class MissingBokeh(RequestHandler): diff --git a/distributed/http/scheduler/prometheus/__init__.py b/distributed/http/scheduler/prometheus/__init__.py index 9ec024ea56..9ccc283cc3 100644 --- a/distributed/http/scheduler/prometheus/__init__.py +++ b/distributed/http/scheduler/prometheus/__init__.py @@ -1,5 +1,5 @@ from __future__ import annotations -from .core import PrometheusHandler +from distributed.http.scheduler.prometheus.core import PrometheusHandler routes: list[tuple] = [("/metrics", PrometheusHandler, {})] diff --git a/distributed/http/scheduler/prometheus/core.py b/distributed/http/scheduler/prometheus/core.py index 2a27494078..1e32663689 100644 --- a/distributed/http/scheduler/prometheus/core.py +++ b/distributed/http/scheduler/prometheus/core.py @@ -1,11 +1,10 @@ import toolz from distributed.http.prometheus import PrometheusCollector +from distributed.http.scheduler.prometheus.semaphore import SemaphoreMetricCollector from distributed.http.utils import RequestHandler from distributed.scheduler import ALL_TASK_STATES -from .semaphore import SemaphoreMetricCollector - class SchedulerMetricCollector(PrometheusCollector): def __init__(self, server): diff --git a/distributed/http/utils.py b/distributed/http/utils.py index f2ba90e9cd..e52b3c1ed7 100644 --- a/distributed/http/utils.py +++ b/distributed/http/utils.py @@ -6,7 +6,7 @@ import toolz from tornado import web -from ..utils import has_keyword +from distributed.utils import has_keyword dirname = os.path.dirname(__file__) diff --git a/distributed/http/worker/prometheus/__init__.py b/distributed/http/worker/prometheus/__init__.py index 9ec024ea56..15ea78d9a3 100644 --- a/distributed/http/worker/prometheus/__init__.py +++ b/distributed/http/worker/prometheus/__init__.py @@ -1,5 +1,5 @@ from __future__ import annotations -from .core import PrometheusHandler +from distributed.http.worker.prometheus.core import PrometheusHandler routes: list[tuple] = [("/metrics", PrometheusHandler, {})] diff --git a/distributed/lock.py b/distributed/lock.py index a4ed492632..5830e2de94 100644 --- a/distributed/lock.py +++ b/distributed/lock.py @@ -5,9 +5,9 @@ from dask.utils import parse_timedelta -from .client import Client -from .utils import TimeoutError, log_errors -from .worker import get_worker +from distributed.client import Client +from distributed.utils import TimeoutError, log_errors +from distributed.worker import get_worker logger = logging.getLogger(__name__) diff --git a/distributed/locket.py b/distributed/locket.py index 0333465889..73529ae03a 100644 --- a/distributed/locket.py +++ b/distributed/locket.py @@ -7,7 +7,7 @@ import weakref from time import sleep -from .metrics import time +from distributed.metrics import time __all__ = ["lock_file"] diff --git a/distributed/multi_lock.py b/distributed/multi_lock.py index 7141a30025..31b2e6ebbd 100644 --- a/distributed/multi_lock.py +++ b/distributed/multi_lock.py @@ -8,9 +8,9 @@ from dask.utils import parse_timedelta -from .client import Client -from .utils import TimeoutError, log_errors -from .worker import get_worker +from distributed.client import Client +from distributed.utils import TimeoutError, log_errors +from distributed.worker import get_worker logger = logging.getLogger(__name__) diff --git a/distributed/nanny.py b/distributed/nanny.py index 65a2d30378..fd7a6882cd 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -23,18 +23,24 @@ from dask.system import CPU_COUNT from dask.utils import parse_timedelta -from . import preloading -from .comm import get_address_host, unparse_host_port -from .comm.addressing import address_from_user_args -from .core import CommClosedError, RPCClosed, Status, coerce_to_address, error_message -from .diagnostics.plugin import _get_plugin_name -from .metrics import time -from .node import ServerNode -from .process import AsyncProcess -from .proctitle import enable_proctitle_on_children -from .protocol import pickle -from .security import Security -from .utils import ( +from distributed import preloading +from distributed.comm import get_address_host, unparse_host_port +from distributed.comm.addressing import address_from_user_args +from distributed.core import ( + CommClosedError, + RPCClosed, + Status, + coerce_to_address, + error_message, +) +from distributed.diagnostics.plugin import _get_plugin_name +from distributed.metrics import time +from distributed.node import ServerNode +from distributed.process import AsyncProcess +from distributed.proctitle import enable_proctitle_on_children +from distributed.protocol import pickle +from distributed.security import Security +from distributed.utils import ( TimeoutError, get_ip, json_load_robust, @@ -43,10 +49,10 @@ parse_ports, silence_logging, ) -from .worker import Worker, parse_memory_limit, run +from distributed.worker import Worker, parse_memory_limit, run if TYPE_CHECKING: - from .diagnostics.plugin import NannyPlugin + from distributed.diagnostics.plugin import NannyPlugin logger = logging.getLogger(__name__) diff --git a/distributed/node.py b/distributed/node.py index b7b5b02031..6db2c7711e 100644 --- a/distributed/node.py +++ b/distributed/node.py @@ -8,11 +8,11 @@ import dask -from .comm import get_address_host, get_tcp_server_addresses -from .core import Server -from .http.routing import RoutingApplication -from .utils import DequeHandler, clean_dashboard_address -from .versions import get_versions +from distributed.comm import get_address_host, get_tcp_server_addresses +from distributed.core import Server +from distributed.http.routing import RoutingApplication +from distributed.utils import DequeHandler, clean_dashboard_address +from distributed.versions import get_versions class ServerNode(Server): diff --git a/distributed/preloading.py b/distributed/preloading.py index 35cb628bdb..458642a01d 100644 --- a/distributed/preloading.py +++ b/distributed/preloading.py @@ -16,12 +16,12 @@ from dask.utils import tmpfile -from .core import Server -from .utils import import_file +from distributed.core import Server +from distributed.utils import import_file if TYPE_CHECKING: # This has to be inside this guard to avoid a circular import - from .client import Client + from distributed.client import Client logger = logging.getLogger(__name__) diff --git a/distributed/process.py b/distributed/process.py index 6a11832479..4597f276ef 100644 --- a/distributed/process.py +++ b/distributed/process.py @@ -11,7 +11,7 @@ import dask -from .utils import TimeoutError, mp_context +from distributed.utils import TimeoutError, mp_context logger = logging.getLogger(__name__) diff --git a/distributed/profile.py b/distributed/profile.py index 93bcc24d6f..bb832735e8 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -38,8 +38,8 @@ from dask.utils import format_time, parse_timedelta -from .metrics import time -from .utils import color_of +from distributed.metrics import time +from distributed.utils import color_of def identifier(frame): diff --git a/distributed/protocol/__init__.py b/distributed/protocol/__init__.py index dc871d877c..817a54558b 100644 --- a/distributed/protocol/__init__.py +++ b/distributed/protocol/__init__.py @@ -1,10 +1,10 @@ from contextlib import suppress from functools import partial -from .compression import compressions, default_compression -from .core import decompress, dumps, loads, maybe_compress, msgpack -from .cuda import cuda_deserialize, cuda_serialize -from .serialize import ( +from distributed.protocol.compression import compressions, default_compression +from distributed.protocol.core import decompress, dumps, loads, maybe_compress, msgpack +from distributed.protocol.cuda import cuda_deserialize, cuda_serialize +from distributed.protocol.serialize import ( Serialize, Serialized, dask_deserialize, @@ -25,43 +25,43 @@ @dask_serialize.register_lazy("numpy") @dask_deserialize.register_lazy("numpy") def _register_numpy(): - from . import numpy + from distributed.protocol import numpy @dask_serialize.register_lazy("scipy") @dask_deserialize.register_lazy("scipy") def _register_scipy(): - from . import scipy + from distributed.protocol import scipy @dask_serialize.register_lazy("h5py") @dask_deserialize.register_lazy("h5py") def _register_h5py(): - from . import h5py + from distributed.protocol import h5py @dask_serialize.register_lazy("netCDF4") @dask_deserialize.register_lazy("netCDF4") def _register_netcdf4(): - from . import netcdf4 + from distributed.protocol import netcdf4 @dask_serialize.register_lazy("keras") @dask_deserialize.register_lazy("keras") def _register_keras(): - from . import keras + from distributed.protocol import keras @dask_serialize.register_lazy("sparse") @dask_deserialize.register_lazy("sparse") def _register_sparse(): - from . import sparse + from distributed.protocol import sparse @dask_serialize.register_lazy("pyarrow") @dask_deserialize.register_lazy("pyarrow") def _register_arrow(): - from . import arrow + from distributed.protocol import arrow @dask_serialize.register_lazy("torch") @@ -69,7 +69,7 @@ def _register_arrow(): @dask_serialize.register_lazy("torchvision") @dask_deserialize.register_lazy("torchvision") def _register_torch(): - from . import torch + from distributed.protocol import torch @cuda_serialize.register_lazy("cupy") @@ -81,7 +81,7 @@ def _register_torch(): @dask_serialize.register_lazy("cupyx") @dask_deserialize.register_lazy("cupyx") def _register_cupy(): - from . import cupy + from distributed.protocol import cupy @cuda_serialize.register_lazy("numba") @@ -89,7 +89,7 @@ def _register_cupy(): @dask_serialize.register_lazy("numba") @dask_deserialize.register_lazy("numba") def _register_numba(): - from . import numba + from distributed.protocol import numba @cuda_serialize.register_lazy("rmm") @@ -97,7 +97,7 @@ def _register_numba(): @dask_serialize.register_lazy("rmm") @dask_deserialize.register_lazy("rmm") def _register_rmm(): - from . import rmm + from distributed.protocol import rmm @cuda_serialize.register_lazy("cudf") diff --git a/distributed/protocol/arrow.py b/distributed/protocol/arrow.py index 2850c47466..7b05f93c49 100644 --- a/distributed/protocol/arrow.py +++ b/distributed/protocol/arrow.py @@ -1,6 +1,6 @@ import pyarrow -from .serialize import dask_deserialize, dask_serialize +from distributed.protocol.serialize import dask_deserialize, dask_serialize if pyarrow.__version__ < "0.10": raise ImportError( diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 37ef777319..3994df2db6 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -25,7 +25,7 @@ except ImportError: blosc = False -from ..utils import ensure_bytes +from distributed.utils import ensure_bytes compressions: dict[ str | None | Literal[False], diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index 1be2d761e3..b10bfcba18 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -2,8 +2,8 @@ import msgpack -from .compression import decompress, maybe_compress -from .serialize import ( +from distributed.protocol.compression import decompress, maybe_compress +from distributed.protocol.serialize import ( Serialize, Serialized, merge_and_deserialize, @@ -11,7 +11,7 @@ msgpack_encode_default, serialize_and_split, ) -from .utils import msgpack_opts +from distributed.protocol.utils import msgpack_opts logger = logging.getLogger(__name__) diff --git a/distributed/protocol/cuda.py b/distributed/protocol/cuda.py index e21ea5f7f7..1f2d5c3228 100644 --- a/distributed/protocol/cuda.py +++ b/distributed/protocol/cuda.py @@ -1,8 +1,11 @@ import dask from dask.utils import typename -from . import pickle -from .serialize import ObjectDictSerializer, register_serialization_family +from distributed.protocol import pickle +from distributed.protocol.serialize import ( + ObjectDictSerializer, + register_serialization_family, +) cuda_serialize = dask.utils.Dispatch("cuda_serialize") cuda_deserialize = dask.utils.Dispatch("cuda_deserialize") diff --git a/distributed/protocol/cupy.py b/distributed/protocol/cupy.py index eeeae68755..872b9b202f 100644 --- a/distributed/protocol/cupy.py +++ b/distributed/protocol/cupy.py @@ -5,13 +5,21 @@ import cupy -from .cuda import cuda_deserialize, cuda_serialize -from .serialize import dask_deserialize, dask_serialize, register_generic +from distributed.protocol.cuda import cuda_deserialize, cuda_serialize +from distributed.protocol.serialize import ( + dask_deserialize, + dask_serialize, + register_generic, +) try: - from .rmm import dask_deserialize_rmm_device_buffer as dask_deserialize_cuda_buffer + from distributed.protocol.rmm import ( + dask_deserialize_rmm_device_buffer as dask_deserialize_cuda_buffer, + ) except ImportError: - from .numba import dask_deserialize_numba_array as dask_deserialize_cuda_buffer + from distributed.protocol.numba import ( + dask_deserialize_numba_array as dask_deserialize_cuda_buffer, + ) @cuda_serialize.register(cupy.ndarray) diff --git a/distributed/protocol/h5py.py b/distributed/protocol/h5py.py index 8a47c7abdc..8d6df8856f 100644 --- a/distributed/protocol/h5py.py +++ b/distributed/protocol/h5py.py @@ -1,6 +1,6 @@ import h5py -from .serialize import dask_deserialize, dask_serialize +from distributed.protocol.serialize import dask_deserialize, dask_serialize @dask_serialize.register(h5py.File) diff --git a/distributed/protocol/keras.py b/distributed/protocol/keras.py index c2c24e3992..560fac1a8c 100644 --- a/distributed/protocol/keras.py +++ b/distributed/protocol/keras.py @@ -1,6 +1,11 @@ import keras -from .serialize import dask_deserialize, dask_serialize, deserialize, serialize +from distributed.protocol.serialize import ( + dask_deserialize, + dask_serialize, + deserialize, + serialize, +) @dask_serialize.register(keras.Model) diff --git a/distributed/protocol/netcdf4.py b/distributed/protocol/netcdf4.py index d3d0b1e2c0..9ed0d3554d 100644 --- a/distributed/protocol/netcdf4.py +++ b/distributed/protocol/netcdf4.py @@ -1,6 +1,11 @@ import netCDF4 -from .serialize import dask_deserialize, dask_serialize, deserialize, serialize +from distributed.protocol.serialize import ( + dask_deserialize, + dask_serialize, + deserialize, + serialize, +) @dask_serialize.register(netCDF4.Dataset) diff --git a/distributed/protocol/numba.py b/distributed/protocol/numba.py index 668f07e092..9756743829 100644 --- a/distributed/protocol/numba.py +++ b/distributed/protocol/numba.py @@ -3,11 +3,11 @@ import numba.cuda import numpy as np -from .cuda import cuda_deserialize, cuda_serialize -from .serialize import dask_deserialize, dask_serialize +from distributed.protocol.cuda import cuda_deserialize, cuda_serialize +from distributed.protocol.serialize import dask_deserialize, dask_serialize try: - from .rmm import dask_deserialize_rmm_device_buffer + from distributed.protocol.rmm import dask_deserialize_rmm_device_buffer except ImportError: dask_deserialize_rmm_device_buffer = None diff --git a/distributed/protocol/numpy.py b/distributed/protocol/numpy.py index a416297afa..472f00271b 100644 --- a/distributed/protocol/numpy.py +++ b/distributed/protocol/numpy.py @@ -2,9 +2,9 @@ import numpy as np -from ..utils import log_errors -from . import pickle -from .serialize import dask_deserialize, dask_serialize +from distributed.protocol import pickle +from distributed.protocol.serialize import dask_deserialize, dask_serialize +from distributed.utils import log_errors def itemsize(dt): diff --git a/distributed/protocol/rmm.py b/distributed/protocol/rmm.py index e25919c0fb..1c9ef820f7 100644 --- a/distributed/protocol/rmm.py +++ b/distributed/protocol/rmm.py @@ -3,8 +3,8 @@ import numpy import rmm -from .cuda import cuda_deserialize, cuda_serialize -from .serialize import dask_deserialize, dask_serialize +from distributed.protocol.cuda import cuda_deserialize, cuda_serialize +from distributed.protocol.serialize import dask_deserialize, dask_serialize # Used for RMM 0.11.0+ otherwise Numba serializers used if hasattr(rmm, "DeviceBuffer"): diff --git a/distributed/protocol/scipy.py b/distributed/protocol/scipy.py index 0ad0adc0a2..88ca9f621a 100644 --- a/distributed/protocol/scipy.py +++ b/distributed/protocol/scipy.py @@ -3,7 +3,11 @@ """ import scipy -from .serialize import dask_deserialize, dask_serialize, register_generic +from distributed.protocol.serialize import ( + dask_deserialize, + dask_serialize, + register_generic, +) register_generic(scipy.sparse.spmatrix, "dask", dask_serialize, dask_deserialize) diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index 1611dc43dc..c8bfc9dce7 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -13,16 +13,16 @@ from dask.base import normalize_token from dask.utils import typename -from ..utils import ensure_bytes, has_keyword -from . import pickle -from .compression import decompress, maybe_compress -from .utils import ( +from distributed.protocol import pickle +from distributed.protocol.compression import decompress, maybe_compress +from distributed.protocol.utils import ( frame_split_size, merge_memoryviews, msgpack_opts, pack_frames_prelude, unpack_frames, ) +from distributed.utils import ensure_bytes, has_keyword dask_serialize = dask.utils.Dispatch("dask_serialize") dask_deserialize = dask.utils.Dispatch("dask_deserialize") diff --git a/distributed/protocol/sparse.py b/distributed/protocol/sparse.py index 42d625b0df..079bc5d33c 100644 --- a/distributed/protocol/sparse.py +++ b/distributed/protocol/sparse.py @@ -1,6 +1,11 @@ import sparse -from .serialize import dask_deserialize, dask_serialize, deserialize, serialize +from distributed.protocol.serialize import ( + dask_deserialize, + dask_serialize, + deserialize, + serialize, +) @dask_serialize.register(sparse.COO) diff --git a/distributed/protocol/torch.py b/distributed/protocol/torch.py index f6315f6999..7bd9ff1f29 100644 --- a/distributed/protocol/torch.py +++ b/distributed/protocol/torch.py @@ -1,6 +1,6 @@ import torch -from .serialize import ( +from distributed.protocol.serialize import ( dask_deserialize, dask_serialize, deserialize, diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index 2846eb46bb..c7a10dce42 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -6,7 +6,7 @@ import dask -from ..utils import nbytes +from distributed.utils import nbytes BIG_BYTES_SHARD_SIZE = dask.utils.parse_bytes(dask.config.get("distributed.comm.shard")) diff --git a/distributed/publish.py b/distributed/publish.py index 0310c826a1..6377251937 100644 --- a/distributed/publish.py +++ b/distributed/publish.py @@ -2,7 +2,7 @@ from dask.utils import stringify -from .utils import log_errors +from distributed.utils import log_errors class PublishExtension: diff --git a/distributed/pubsub.py b/distributed/pubsub.py index 6d5ab7924e..f1cbc62e53 100644 --- a/distributed/pubsub.py +++ b/distributed/pubsub.py @@ -6,10 +6,10 @@ from dask.utils import parse_timedelta -from .core import CommClosedError -from .metrics import time -from .protocol.serialize import to_serialize -from .utils import TimeoutError, sync +from distributed.core import CommClosedError +from distributed.metrics import time +from distributed.protocol.serialize import to_serialize +from distributed.utils import TimeoutError, sync logger = logging.getLogger(__name__) diff --git a/distributed/pytest_resourceleaks.py b/distributed/pytest_resourceleaks.py index af99af2bcd..d8961da215 100644 --- a/distributed/pytest_resourceleaks.py +++ b/distributed/pytest_resourceleaks.py @@ -57,9 +57,9 @@ def test1(): import psutil import pytest -from .comm.tcp import BaseTCPConnector -from .compatibility import WINDOWS -from .metrics import time +from distributed.comm.tcp import BaseTCPConnector +from distributed.compatibility import WINDOWS +from distributed.metrics import time def pytest_addoption(parser): diff --git a/distributed/queues.py b/distributed/queues.py index b7eb37cd93..c29c4f1ab2 100644 --- a/distributed/queues.py +++ b/distributed/queues.py @@ -5,8 +5,8 @@ from dask.utils import parse_timedelta, stringify -from .client import Client, Future -from .worker import get_client, get_worker +from distributed.client import Client, Future +from distributed.worker import get_client, get_worker logger = logging.getLogger(__name__) diff --git a/distributed/recreate_tasks.py b/distributed/recreate_tasks.py index ec596bc461..82b72092b4 100644 --- a/distributed/recreate_tasks.py +++ b/distributed/recreate_tasks.py @@ -2,10 +2,10 @@ from dask.utils import stringify -from .client import futures_of, wait -from .utils import sync -from .utils_comm import pack_data -from .worker import _deserialize +from distributed.client import futures_of, wait +from distributed.utils import sync +from distributed.utils_comm import pack_data +from distributed.worker import _deserialize logger = logging.getLogger(__name__) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ce0622c0c2..9d7cb471f6 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -51,40 +51,37 @@ from dask.utils import format_bytes, format_time, parse_bytes, parse_timedelta, tmpfile from dask.widgets import get_template -from distributed import cluster_dump -from distributed.utils import recursive_to_dict - -from . import preloading, profile -from . import versions as version_module -from .active_memory_manager import ActiveMemoryManagerExtension, RetireWorker -from .batched import BatchedSend -from .comm import ( +from distributed import cluster_dump, preloading, profile +from distributed import versions as version_module +from distributed.active_memory_manager import ActiveMemoryManagerExtension, RetireWorker +from distributed.batched import BatchedSend +from distributed.comm import ( Comm, get_address_host, normalize_address, resolve_address, unparse_host_port, ) -from .comm.addressing import addresses_from_user_args -from .core import CommClosedError, Status, clean_exception, rpc, send_recv -from .diagnostics.memory_sampler import MemorySamplerExtension -from .diagnostics.plugin import SchedulerPlugin, _get_plugin_name -from .event import EventExtension -from .http import get_handlers -from .lock import LockExtension -from .metrics import time -from .multi_lock import MultiLockExtension -from .node import ServerNode -from .proctitle import setproctitle -from .protocol.pickle import dumps, loads -from .publish import PublishExtension -from .pubsub import PubSubSchedulerExtension -from .queues import QueueExtension -from .recreate_tasks import ReplayTaskScheduler -from .security import Security -from .semaphore import SemaphoreExtension -from .stealing import WorkStealing -from .utils import ( +from distributed.comm.addressing import addresses_from_user_args +from distributed.core import CommClosedError, Status, clean_exception, rpc, send_recv +from distributed.diagnostics.memory_sampler import MemorySamplerExtension +from distributed.diagnostics.plugin import SchedulerPlugin, _get_plugin_name +from distributed.event import EventExtension +from distributed.http import get_handlers +from distributed.lock import LockExtension +from distributed.metrics import time +from distributed.multi_lock import MultiLockExtension +from distributed.node import ServerNode +from distributed.proctitle import setproctitle +from distributed.protocol.pickle import dumps, loads +from distributed.publish import PublishExtension +from distributed.pubsub import PubSubSchedulerExtension +from distributed.queues import QueueExtension +from distributed.recreate_tasks import ReplayTaskScheduler +from distributed.security import Security +from distributed.semaphore import SemaphoreExtension +from distributed.stealing import WorkStealing +from distributed.utils import ( All, TimeoutError, empty_context, @@ -93,11 +90,16 @@ key_split_group, log_errors, no_default, + recursive_to_dict, validate_key, ) -from .utils_comm import gather_from_workers, retry_operation, scatter_to_workers -from .utils_perf import disable_gc_diagnosis, enable_gc_diagnosis -from .variable import VariableExtension +from distributed.utils_comm import ( + gather_from_workers, + retry_operation, + scatter_to_workers, +) +from distributed.utils_perf import disable_gc_diagnosis, enable_gc_diagnosis +from distributed.variable import VariableExtension try: from cython import compiled @@ -7353,7 +7355,7 @@ def run_function(self, comm, function, args=(), kwargs=None, wait=True): -------- Client.run_on_scheduler """ - from .worker import run + from distributed.worker import run if not dask.config.get("distributed.scheduler.pickle"): raise ValueError( @@ -7635,7 +7637,7 @@ def start_ipython(self): Returns Jupyter connection info dictionary. """ - from ._ipython_utils import start_ipython + from distributed._ipython_utils import start_ipython if self._ipython_kernel is None: self._ipython_kernel = start_ipython( @@ -7744,7 +7746,7 @@ async def performance_report( self.get_profile(server=True, start=start), ] ) - from . import profile + from distributed import profile def profile_to_figure(state): data = profile.plot_data(state) @@ -7766,8 +7768,8 @@ def profile_to_figure(state): for k in sorted(timespent.keys()): tasks_timings += f"\n
  • {k} time: {format_time(timespent[k])}
  • " - from .dashboard.components.scheduler import task_stream_figure - from .diagnostics.task_stream import rectangles + from distributed.dashboard.components.scheduler import task_stream_figure + from distributed.diagnostics.task_stream import rectangles rects = rectangles(task_stream) source, task_stream = task_stream_figure(sizing_mode="stretch_both") diff --git a/distributed/semaphore.py b/distributed/semaphore.py index f0d255000c..9e7abd872c 100644 --- a/distributed/semaphore.py +++ b/distributed/semaphore.py @@ -10,11 +10,10 @@ import dask from dask.utils import parse_timedelta +from distributed.metrics import time +from distributed.utils import SyncMethodMixin, log_errors from distributed.utils_comm import retry_operation - -from .metrics import time -from .utils import SyncMethodMixin, log_errors -from .worker import get_client, get_worker +from distributed.worker import get_client, get_worker logger = logging.getLogger(__name__) diff --git a/distributed/shuffle/__init__.py b/distributed/shuffle/__init__.py index d530f9b679..29d5610d37 100644 --- a/distributed/shuffle/__init__.py +++ b/distributed/shuffle/__init__.py @@ -1,5 +1,9 @@ -from .shuffle import rearrange_by_column_p2p -from .shuffle_extension import ShuffleId, ShuffleMetadata, ShuffleWorkerExtension +from distributed.shuffle.shuffle import rearrange_by_column_p2p +from distributed.shuffle.shuffle_extension import ( + ShuffleId, + ShuffleMetadata, + ShuffleWorkerExtension, +) __all__ = [ "rearrange_by_column_p2p", diff --git a/distributed/shuffle/shuffle.py b/distributed/shuffle/shuffle.py index 33fe1189a0..c832dd1c36 100644 --- a/distributed/shuffle/shuffle.py +++ b/distributed/shuffle/shuffle.py @@ -6,7 +6,11 @@ from dask.delayed import Delayed, delayed from dask.highlevelgraph import HighLevelGraph -from .shuffle_extension import NewShuffleMetadata, ShuffleId, ShuffleWorkerExtension +from distributed.shuffle.shuffle_extension import ( + NewShuffleMetadata, + ShuffleId, + ShuffleWorkerExtension, +) if TYPE_CHECKING: import pandas as pd diff --git a/distributed/shuffle/tests/test_graph.py b/distributed/shuffle/tests/test_graph.py index 42d6be98d3..3844ff3db4 100644 --- a/distributed/shuffle/tests/test_graph.py +++ b/distributed/shuffle/tests/test_graph.py @@ -14,11 +14,10 @@ from dask.dataframe.shuffle import partitioning_index, rearrange_by_column_tasks from dask.utils_test import hlg_layer_topological +from distributed.shuffle.shuffle import rearrange_by_column_p2p +from distributed.shuffle.shuffle_extension import ShuffleWorkerExtension from distributed.utils_test import gen_cluster -from ..shuffle import rearrange_by_column_p2p -from ..shuffle_extension import ShuffleWorkerExtension - if TYPE_CHECKING: from distributed import Client, Scheduler, Worker diff --git a/distributed/shuffle/tests/test_shuffle_extension.py b/distributed/shuffle/tests/test_shuffle_extension.py index ad53628067..ec50e59656 100644 --- a/distributed/shuffle/tests/test_shuffle_extension.py +++ b/distributed/shuffle/tests/test_shuffle_extension.py @@ -10,14 +10,13 @@ pd = pytest.importorskip("pandas") dd = pytest.importorskip("dask.dataframe") -from distributed.utils_test import gen_cluster - -from ..shuffle_extension import ( +from distributed.shuffle.shuffle_extension import ( NewShuffleMetadata, ShuffleId, ShuffleMetadata, ShuffleWorkerExtension, ) +from distributed.utils_test import gen_cluster if TYPE_CHECKING: from distributed import Client, Future, Scheduler, Worker diff --git a/distributed/spill.py b/distributed/spill.py index b734d4bfe0..4d219e79f6 100644 --- a/distributed/spill.py +++ b/distributed/spill.py @@ -10,8 +10,8 @@ import zict from packaging.version import parse as parse_version -from .protocol import deserialize_bytes, serialize_bytelist -from .sizeof import safe_sizeof +from distributed.protocol import deserialize_bytes, serialize_bytelist +from distributed.sizeof import safe_sizeof logger = logging.getLogger(__name__) has_zict_210 = parse_version(zict.__version__) > parse_version("2.0.0") diff --git a/distributed/stealing.py b/distributed/stealing.py index 37f53c891c..6f957ca2f4 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -14,10 +14,10 @@ import dask from dask.utils import parse_timedelta -from .comm.addressing import get_address_host -from .core import CommClosedError, Status -from .diagnostics.plugin import SchedulerPlugin -from .utils import log_errors, recursive_to_dict +from distributed.comm.addressing import get_address_host +from distributed.core import CommClosedError, Status +from distributed.diagnostics.plugin import SchedulerPlugin +from distributed.utils import log_errors, recursive_to_dict # Stealing requires multiple network bounces and if successful also task # submission which may include code serialization. Therefore, be very diff --git a/distributed/system_monitor.py b/distributed/system_monitor.py index 1d2a975239..062409ddfc 100644 --- a/distributed/system_monitor.py +++ b/distributed/system_monitor.py @@ -2,11 +2,11 @@ import psutil -from .compatibility import WINDOWS -from .metrics import time +from distributed.compatibility import WINDOWS +from distributed.metrics import time try: - from .diagnostics import nvml + from distributed.diagnostics import nvml except Exception: nvml = None # type: ignore diff --git a/distributed/threadpoolexecutor.py b/distributed/threadpoolexecutor.py index 2bd224ff4e..18a5819bb3 100644 --- a/distributed/threadpoolexecutor.py +++ b/distributed/threadpoolexecutor.py @@ -26,8 +26,8 @@ import queue import threading -from . import _concurrent_futures_thread as thread -from .metrics import time +from distributed import _concurrent_futures_thread as thread +from distributed.metrics import time logger = logging.getLogger(__name__) diff --git a/distributed/utils.py b/distributed/utils.py index 23d6d56f08..eecb673473 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -48,8 +48,8 @@ from dask.utils import parse_timedelta as _parse_timedelta from dask.widgets import get_template -from .compatibility import WINDOWS -from .metrics import time +from distributed.compatibility import WINDOWS +from distributed.metrics import time try: from dask.context import thread_state @@ -77,7 +77,7 @@ def _initialize_mp_context(): if "pkg_resources" in sys.modules: preload.append("pkg_resources") - from .versions import optional_packages, required_packages + from distributed.versions import optional_packages, required_packages for pkg, _ in required_packages + optional_packages: try: @@ -689,7 +689,7 @@ def key_split_group(x) -> str: @contextmanager def log_errors(pdb=False): - from .comm import CommClosedError + from distributed.comm import CommClosedError try: yield diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index 4acea7e8b1..9d2b6f7794 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -11,8 +11,8 @@ from dask.optimization import SubgraphCallable from dask.utils import parse_timedelta, stringify -from .core import rpc -from .utils import All +from distributed.core import rpc +from distributed.utils import All logger = logging.getLogger(__name__) @@ -33,7 +33,7 @@ async def gather_from_workers(who_has, rpc, close=True, serializers=None, who=No gather _gather """ - from .worker import get_data_from_worker + from distributed.worker import get_data_from_worker bad_addresses = set() missing_workers = set() diff --git a/distributed/utils_perf.py b/distributed/utils_perf.py index c81eb013c7..6643b738b9 100644 --- a/distributed/utils_perf.py +++ b/distributed/utils_perf.py @@ -5,8 +5,8 @@ from dask.utils import format_bytes -from .compatibility import PYPY -from .metrics import thread_time +from distributed.compatibility import PYPY +from distributed.metrics import thread_time logger = _logger = logging.getLogger(__name__) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index f6625e7418..3a4a31e726 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -44,24 +44,22 @@ import dask -from distributed.comm.tcp import TCP - -from . import system -from . import versions as version_module -from .client import Client, _global_clients, default_client -from .comm import Comm -from .comm.tcp import BaseTCPConnector -from .compatibility import WINDOWS -from .config import initialize_logging -from .core import CommClosedError, ConnectionPool, Status, connect, rpc -from .deploy import SpecCluster -from .diagnostics.plugin import WorkerPlugin -from .metrics import time -from .nanny import Nanny -from .node import ServerNode -from .proctitle import enable_proctitle_on_children -from .security import Security -from .utils import ( +from distributed import system +from distributed import versions as version_module +from distributed.client import Client, _global_clients, default_client +from distributed.comm import Comm +from distributed.comm.tcp import TCP, BaseTCPConnector +from distributed.compatibility import WINDOWS +from distributed.config import initialize_logging +from distributed.core import CommClosedError, ConnectionPool, Status, connect, rpc +from distributed.deploy import SpecCluster +from distributed.diagnostics.plugin import WorkerPlugin +from distributed.metrics import time +from distributed.nanny import Nanny +from distributed.node import ServerNode +from distributed.proctitle import enable_proctitle_on_children +from distributed.security import Security +from distributed.utils import ( DequeHandler, TimeoutError, _offload_executor, @@ -73,7 +71,7 @@ reset_logger_locks, sync, ) -from .worker import Worker +from distributed.worker import Worker try: import dask.array # register config @@ -1405,7 +1403,7 @@ def new_config(new_config): """ Temporarily change configuration dictionary. """ - from .config import defaults + from distributed.config import defaults config = dask.config.config orig_config = copy.deepcopy(config) diff --git a/distributed/variable.py b/distributed/variable.py index 5206dc6254..a27abc3ab8 100644 --- a/distributed/variable.py +++ b/distributed/variable.py @@ -8,10 +8,10 @@ from dask.utils import parse_timedelta, stringify -from .client import Client, Future -from .metrics import time -from .utils import TimeoutError, log_errors -from .worker import get_client, get_worker +from distributed.client import Client, Future +from distributed.metrics import time +from distributed.utils import TimeoutError, log_errors +from distributed.worker import get_client, get_worker logger = logging.getLogger(__name__) diff --git a/distributed/versions.py b/distributed/versions.py index d1e51d4c9c..bb1cc55094 100644 --- a/distributed/versions.py +++ b/distributed/versions.py @@ -107,7 +107,7 @@ def get_package_info( def error_message(scheduler, workers, client, client_name="client"): - from .utils import asciitable + from distributed.utils import asciitable client = client.get("packages") if client else "UNKNOWN" scheduler = scheduler.get("packages") if scheduler else "UNKNOWN" diff --git a/distributed/widgets/__init__.py b/distributed/widgets/__init__.py index efdb7eccac..71efac71d0 100644 --- a/distributed/widgets/__init__.py +++ b/distributed/widgets/__init__.py @@ -2,7 +2,7 @@ from dask.widgets import FILTERS, TEMPLATE_PATHS -from ..utils import key_split +from distributed.utils import key_split TEMPLATE_PATHS.append( os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") diff --git a/distributed/worker.py b/distributed/worker.py index 589ab3aee5..402b58d2ca 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -45,13 +45,13 @@ typename, ) -from . import comm, preloading, profile, system, utils -from .batched import BatchedSend -from .comm import Comm, connect, get_address_host -from .comm.addressing import address_from_user_args, parse_address -from .comm.utils import OFFLOAD_THRESHOLD -from .compatibility import to_thread -from .core import ( +from distributed import comm, preloading, profile, system, utils +from distributed.batched import BatchedSend +from distributed.comm import Comm, connect, get_address_host +from distributed.comm.addressing import address_from_user_args, parse_address +from distributed.comm.utils import OFFLOAD_THRESHOLD +from distributed.compatibility import to_thread +from distributed.core import ( CommClosedError, Status, coerce_to_address, @@ -59,21 +59,21 @@ pingpong, send_recv, ) -from .diagnostics import nvml -from .diagnostics.plugin import _get_plugin_name -from .diskutils import WorkDir, WorkSpace -from .http import get_handlers -from .metrics import time -from .node import ServerNode -from .proctitle import setproctitle -from .protocol import pickle, to_serialize -from .pubsub import PubSubWorkerExtension -from .security import Security -from .shuffle import ShuffleWorkerExtension -from .sizeof import safe_sizeof as sizeof -from .threadpoolexecutor import ThreadPoolExecutor -from .threadpoolexecutor import secede as tpe_secede -from .utils import ( +from distributed.diagnostics import nvml +from distributed.diagnostics.plugin import _get_plugin_name +from distributed.diskutils import WorkDir, WorkSpace +from distributed.http import get_handlers +from distributed.metrics import time +from distributed.node import ServerNode +from distributed.proctitle import setproctitle +from distributed.protocol import pickle, to_serialize +from distributed.pubsub import PubSubWorkerExtension +from distributed.security import Security +from distributed.shuffle import ShuffleWorkerExtension +from distributed.sizeof import safe_sizeof as sizeof +from distributed.threadpoolexecutor import ThreadPoolExecutor +from distributed.threadpoolexecutor import secede as tpe_secede +from distributed.utils import ( LRU, TimeoutError, _maybe_complex, @@ -92,17 +92,21 @@ thread_state, warn_on_duration, ) -from .utils_comm import gather_from_workers, pack_data, retry_operation -from .utils_perf import ThrottledGC, disable_gc_diagnosis, enable_gc_diagnosis -from .versions import get_versions +from distributed.utils_comm import gather_from_workers, pack_data, retry_operation +from distributed.utils_perf import ( + ThrottledGC, + disable_gc_diagnosis, + enable_gc_diagnosis, +) +from distributed.versions import get_versions if TYPE_CHECKING: from typing_extensions import TypeAlias - from .actor import Actor - from .client import Client - from .diagnostics.plugin import WorkerPlugin - from .nanny import Nanny + from distributed.actor import Actor + from distributed.client import Client + from distributed.diagnostics.plugin import WorkerPlugin + from distributed.nanny import Nanny # {TaskState -> finish: str | (finish: str, *args)} Recs: TypeAlias = "dict[TaskState, str | tuple]" @@ -924,7 +928,7 @@ def __init__( elif self.memory_limit and ( self.memory_target_fraction or self.memory_spill_fraction ): - from .spill import SpillBuffer + from distributed.spill import SpillBuffer if self.memory_target_fraction: target = int( @@ -1440,7 +1444,7 @@ def start_ipython(self, comm): Returns Jupyter connection info dictionary. """ - from ._ipython_utils import start_ipython + from distributed._ipython_utils import start_ipython if self._ipython_kernel is None: self._ipython_kernel = start_ipython( @@ -1857,7 +1861,7 @@ async def get_data( if len(data) < len(keys): for k in set(keys) - set(data): if k in self.actors: - from .actor import Actor + from distributed.actor import Actor data[k] = Actor(type(self.actors[k]), self.address, k, worker=self) @@ -3822,7 +3826,7 @@ def _prepare_args_for_execution( try: data[k] = self.data[k] except KeyError: - from .actor import Actor # TODO: create local actor + from distributed.actor import Actor # TODO: create local actor data[k] = Actor(type(self.actors[k]), self.address, k, self) args2 = pack_data(args, data, key_types=(bytes, str)) @@ -3888,7 +3892,7 @@ def check_pause(memory): check_pause(memory) # Dump data to disk if above 70% if self.memory_spill_fraction and frac > self.memory_spill_fraction: - from .spill import SpillBuffer + from distributed.spill import SpillBuffer assert isinstance(self.data, SpillBuffer) @@ -4283,7 +4287,7 @@ def _get_client(self, timeout: float | None = None) -> Client: timeout = parse_timedelta(timeout, "s") try: - from .client import default_client + from distributed.client import default_client client = default_client() except ValueError: # no clients found, need to make a new one @@ -4308,7 +4312,7 @@ def _get_client(self, timeout: float | None = None) -> Client: self._client = client if not self._client: - from .client import Client + from distributed.client import Client asynchronous = in_async_call(self.loop) self._client = Client( @@ -4434,7 +4438,7 @@ def get_client(address=None, timeout=None, resolve_address=True) -> Client: if not address or worker.scheduler.address == address: return worker._get_client(timeout=timeout) - from .client import Client + from distributed.client import Client try: client = Client.current() # TODO: assumes the same scheduler diff --git a/distributed/worker_client.py b/distributed/worker_client.py index 059d5dfaad..5a775b3819 100644 --- a/distributed/worker_client.py +++ b/distributed/worker_client.py @@ -4,9 +4,8 @@ import dask from distributed.metrics import time - -from .threadpoolexecutor import rejoin, secede -from .worker import get_client, get_worker, thread_state +from distributed.threadpoolexecutor import rejoin, secede +from distributed.worker import get_client, get_worker, thread_state @contextmanager