Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

absolufy-imports - No relative - PEP8 #5924

Merged
merged 1 commit into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
repos:
- repo: https://github.com/MarcoGorelli/absolufy-imports
rev: v0.3.1
hooks:
- id: absolufy-imports
name: absolufy-imports
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
Expand Down
56 changes: 32 additions & 24 deletions distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from . import config # isort:skip; load distributed configuration first
from . import widgets # isort:skip; load distributed widgets second
from distributed import config # isort:skip; load distributed configuration first
from distributed import widgets # isort:skip; load distributed widgets second


import dask
from dask.config import config # type: ignore

from ._version import get_versions
from .actor import Actor, ActorFuture, BaseActorFuture
from .client import (
from distributed._version import get_versions
from distributed.actor import Actor, ActorFuture, BaseActorFuture
from distributed.client import (
Client,
CompatibleExecutor,
Executor,
Expand All @@ -21,9 +21,9 @@
performance_report,
wait,
)
from .core import Status, connect, rpc
from .deploy import Adaptive, LocalCluster, SpecCluster, SSHCluster
from .diagnostics.plugin import (
from distributed.core import Status, connect, rpc
from distributed.deploy import Adaptive, LocalCluster, SpecCluster, SSHCluster
from distributed.diagnostics.plugin import (
Environ,
NannyPlugin,
PipInstall,
Expand All @@ -32,21 +32,29 @@
UploadFile,
WorkerPlugin,
)
from .diagnostics.progressbar import progress
from .event import Event
from .lock import Lock
from .multi_lock import MultiLock
from .nanny import Nanny
from .pubsub import Pub, Sub
from .queues import Queue
from .scheduler import Scheduler
from .security import Security
from .semaphore import Semaphore
from .threadpoolexecutor import rejoin
from .utils import CancelledError, TimeoutError, sync
from .variable import Variable
from .worker import Reschedule, Worker, get_client, get_worker, print, secede, warn
from .worker_client import local_client, worker_client
from distributed.diagnostics.progressbar import progress
from distributed.event import Event
from distributed.lock import Lock
from distributed.multi_lock import MultiLock
from distributed.nanny import Nanny
from distributed.pubsub import Pub, Sub
from distributed.queues import Queue
from distributed.scheduler import Scheduler
from distributed.security import Security
from distributed.semaphore import Semaphore
from distributed.threadpoolexecutor import rejoin
from distributed.utils import CancelledError, TimeoutError, sync
from distributed.variable import Variable
from distributed.worker import (
Reschedule,
Worker,
get_client,
get_worker,
print,
secede,
warn,
)
from distributed.worker_client import local_client, worker_client


def __getattr__(name):
Expand All @@ -59,7 +67,7 @@ def __getattr__(name):
return __version__

if name == "__git_revision__":
from ._version import get_versions
from distributed._version import get_versions

__git_revision__ = get_versions()["full-revisionid"]
return __git_revision__
Expand Down
10 changes: 5 additions & 5 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import dask
from dask.utils import parse_timedelta

from .core import Status
from .metrics import time
from .utils import import_term, log_errors
from distributed.core import Status
from distributed.metrics import time
from distributed.utils import import_term, log_errors

if TYPE_CHECKING: # pragma: nocover
from .client import Client
from .scheduler import Scheduler, TaskState, WorkerState
from distributed.client import Client
from distributed.scheduler import Scheduler, TaskState, WorkerState

# Main logger. This is reasonably terse also at DEBUG level.
logger = logging.getLogger(__name__)
Expand Down
10 changes: 5 additions & 5 deletions distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

from tornado.ioloop import IOLoop

from .client import Future
from .protocol import to_serialize
from .utils import iscoroutinefunction, sync, thread_state
from .utils_comm import WrappedKey
from .worker import get_client, get_worker
from distributed.client import Future
from distributed.protocol import to_serialize
from distributed.utils import iscoroutinefunction, sync, thread_state
from distributed.utils_comm import WrappedKey
from distributed.worker import get_client, get_worker

_T = TypeVar("_T")

Expand Down
4 changes: 2 additions & 2 deletions distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import dask
from dask.utils import parse_timedelta

from .core import CommClosedError
from .metrics import time
from distributed.core import CommClosedError
from distributed.metrics import time

logger = logging.getLogger(__name__)

Expand Down
4 changes: 2 additions & 2 deletions distributed/cfexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from dask.utils import parse_timedelta

from .metrics import time
from .utils import TimeoutError, sync
from distributed.metrics import time
from distributed.utils import TimeoutError, sync


@gen.coroutine
Expand Down
65 changes: 35 additions & 30 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@
from tornado import gen
from tornado.ioloop import PeriodicCallback

from . import cluster_dump, preloading
from . import versions as version_module # type: ignore
from .batched import BatchedSend
from .cfexecutor import ClientExecutor
from .core import (
from distributed import cluster_dump, preloading
from distributed import versions as version_module # type: ignore
from distributed.batched import BatchedSend
from distributed.cfexecutor import ClientExecutor
from distributed.core import (
CommClosedError,
ConnectionPool,
PooledRPCCall,
Expand All @@ -65,17 +65,22 @@
connect,
rpc,
)
from .diagnostics.plugin import NannyPlugin, UploadFile, WorkerPlugin, _get_plugin_name
from .metrics import time
from .objects import HasWhat, SchedulerInfo, WhoHas
from .protocol import to_serialize
from .protocol.pickle import dumps, loads
from .publish import Datasets
from .pubsub import PubSubClientExtension
from .security import Security
from .sizeof import sizeof
from .threadpoolexecutor import rejoin
from .utils import (
from distributed.diagnostics.plugin import (
NannyPlugin,
UploadFile,
WorkerPlugin,
_get_plugin_name,
)
from distributed.metrics import time
from distributed.objects import HasWhat, SchedulerInfo, WhoHas
from distributed.protocol import to_serialize
from distributed.protocol.pickle import dumps, loads
from distributed.publish import Datasets
from distributed.pubsub import PubSubClientExtension
from distributed.security import Security
from distributed.sizeof import sizeof
from distributed.threadpoolexecutor import rejoin
from distributed.utils import (
All,
Any,
CancelledError,
Expand All @@ -91,15 +96,15 @@
sync,
thread_state,
)
from .utils_comm import (
from distributed.utils_comm import (
WrappedKey,
gather_from_workers,
pack_data,
retry_operation,
scatter_to_workers,
unpack_remotedata,
)
from .worker import get_client, get_worker, secede
from distributed.worker import get_client, get_worker, secede

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1016,7 +1021,7 @@ def dashboard_link(self):
return format_dashboard_link(host, port)

def _get_scheduler_info(self):
from .scheduler import Scheduler
from distributed.scheduler import Scheduler

if (
self.cluster
Expand Down Expand Up @@ -1152,7 +1157,7 @@ async def _start(self, timeout=no_default, **kwargs):
except (ValueError, KeyError): # JSON file not yet flushed
await asyncio.sleep(0.01)
elif self._start_arg is None:
from .deploy import LocalCluster
from distributed.deploy import LocalCluster

try:
self.cluster = await LocalCluster(
Expand Down Expand Up @@ -3783,7 +3788,7 @@ async def _profile(
plot = True

if plot:
from . import profile
from distributed import profile

data = profile.plot_data(state)
figure, source = profile.plot_figure(data, sizing_mode="stretch_both")
Expand Down Expand Up @@ -4332,17 +4337,17 @@ def start_ipython_workers(
magic_names = [magic_names]

if "IPython" in sys.modules:
from ._ipython_utils import register_remote_magic
from distributed._ipython_utils import register_remote_magic

register_remote_magic()
if magic_names:
from ._ipython_utils import register_worker_magic
from distributed._ipython_utils import register_worker_magic

for worker, magic_name in zip(workers, magic_names):
connection_info = info_dict[worker]
register_worker_magic(connection_info, magic_name)
if qtconsole:
from ._ipython_utils import connect_qtconsole
from distributed._ipython_utils import connect_qtconsole

for worker, connection_info in info_dict.items():
name = "dask-" + worker.replace(":", "-").replace("/", "-")
Expand Down Expand Up @@ -4397,11 +4402,11 @@ def start_ipython_scheduler(
else:
magic_name = None
if magic_name:
from ._ipython_utils import register_worker_magic
from distributed._ipython_utils import register_worker_magic

register_worker_magic(info, magic_name)
if qtconsole:
from ._ipython_utils import connect_qtconsole
from distributed._ipython_utils import connect_qtconsole

connect_qtconsole(info, name="dask-scheduler", extra_args=qtconsole_args)
return info
Expand Down Expand Up @@ -4517,10 +4522,10 @@ async def _get_task_stream(
):
msgs = await self.scheduler.get_task_stream(start=start, stop=stop, count=count)
if plot:
from .diagnostics.task_stream import rectangles
from distributed.diagnostics.task_stream import rectangles

rects = rectangles(msgs)
from .dashboard.components.scheduler import task_stream_figure
from distributed.dashboard.components.scheduler import task_stream_figure

source, figure = task_stream_figure(sizing_mode="stretch_both")
source.data.update(rects)
Expand Down Expand Up @@ -4755,7 +4760,7 @@ def unregister_worker_plugin(self, name, nanny=None):
@property
def amm(self):
"""Convenience accessors for the :doc:`active_memory_manager`"""
from .active_memory_manager import AMMClientProxy
from distributed.active_memory_manager import AMMClientProxy

return AMMClientProxy(self)

Expand Down Expand Up @@ -4980,7 +4985,7 @@ def update(self, futures):
"""Add multiple futures to the collection.

The added futures will emit from the iterator once they finish"""
from .actor import BaseActorFuture
from distributed.actor import BaseActorFuture

with self.lock:
for f in futures:
Expand Down
16 changes: 8 additions & 8 deletions distributed/comm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .addressing import (
from distributed.comm.addressing import (
get_address_host,
get_address_host_port,
get_local_address_for,
Expand All @@ -9,25 +9,25 @@
unparse_address,
unparse_host_port,
)
from .core import Comm, CommClosedError, connect, listen
from .registry import backends
from .utils import get_tcp_server_address, get_tcp_server_addresses
from distributed.comm.core import Comm, CommClosedError, connect, listen
from distributed.comm.registry import backends
from distributed.comm.utils import get_tcp_server_address, get_tcp_server_addresses


def _register_transports():
import dask.config

from . import inproc, ws
from distributed.comm import inproc, ws

tcp_backend = dask.config.get("distributed.comm.tcp.backend")

if tcp_backend == "asyncio":
from . import asyncio_tcp
from distributed.comm import asyncio_tcp

backends["tcp"] = asyncio_tcp.TCPBackend()
backends["tls"] = asyncio_tcp.TLSBackend()
elif tcp_backend == "tornado":
from . import tcp
from distributed.comm import tcp

backends["tcp"] = tcp.TCPBackend()
backends["tls"] = tcp.TLSBackend()
Expand All @@ -38,7 +38,7 @@ def _register_transports():
)

try:
from . import ucx
from distributed.comm import ucx
except ImportError:
pass

Expand Down
4 changes: 2 additions & 2 deletions distributed/comm/addressing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import dask

from ..utils import get_ip_interface
from . import registry
from distributed.comm import registry
from distributed.utils import get_ip_interface


def parse_address(addr: str, strict: bool = False) -> tuple[str, str]:
Expand Down
15 changes: 10 additions & 5 deletions distributed/comm/asyncio_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

import dask

from ..utils import ensure_ip, get_ip, get_ipv6
from .addressing import parse_host_port, unparse_host_port
from .core import Comm, CommClosedError, Connector, Listener
from .registry import Backend
from .utils import ensure_concrete_host, from_frames, host_array, to_frames
from distributed.comm.addressing import parse_host_port, unparse_host_port
from distributed.comm.core import Comm, CommClosedError, Connector, Listener
from distributed.comm.registry import Backend
from distributed.comm.utils import (
ensure_concrete_host,
from_frames,
host_array,
to_frames,
)
from distributed.utils import ensure_ip, get_ip, get_ipv6

logger = logging.getLogger(__name__)

Expand Down
Loading