Skip to content

Commit

Permalink
Add nprocs auto option to dask-worker CLI (#4377)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson authored Jan 7, 2021
1 parent 8f33b9e commit 5812314
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 45 deletions.
11 changes: 9 additions & 2 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from distributed import Nanny
from distributed.cli.utils import check_python_3, install_signal_handlers
from distributed.comm import get_address_host_port
from distributed.deploy.utils import nprocesses_nthreads
from distributed.preloading import validate_preload_argv
from distributed.proctitle import (
enable_proctitle_on_children,
Expand Down Expand Up @@ -124,11 +125,12 @@
@click.option("--nthreads", type=int, default=0, help="Number of threads per process.")
@click.option(
"--nprocs",
type=int,
type=str,
default=1,
show_default=True,
help="Number of worker processes to launch. "
"If negative, then (CPU_COUNT + 1 + nprocs) is used.",
"If negative, then (CPU_COUNT + 1 + nprocs) is used. "
"Set to 'auto' to set nprocs and nthreads dynamically based on CPU_COUNT",
)
@click.option(
"--name",
Expand Down Expand Up @@ -292,6 +294,11 @@ def main(
if v is not None
}

if nprocs == "auto":
nprocs, nthreads = nprocesses_nthreads()
else:
nprocs = int(nprocs)

if nprocs < 0:
nprocs = CPU_COUNT + 1 + nprocs

Expand Down
9 changes: 9 additions & 0 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import distributed.cli.dask_worker
from distributed import Client, Scheduler
from distributed.deploy.utils import nprocesses_nthreads
from distributed.metrics import time
from distributed.utils import sync, tmpfile, parse_ports
from distributed.utils_test import popen, terminate_process, wait_for_port
Expand Down Expand Up @@ -246,6 +247,14 @@ def test_nprocs_negative(loop):
c.wait_for_workers(cpu_count(), timeout="10 seconds")


def test_nprocs_auto(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-worker", "127.0.0.1:8786", "--nprocs=auto"]) as worker:
with Client("tcp://127.0.0.1:8786", loop=loop) as c:
procs, _ = nprocesses_nthreads()
c.wait_for_workers(procs, timeout="10 seconds")


def test_nprocs_expands_name(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(
Expand Down
30 changes: 1 addition & 29 deletions distributed/deploy/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import warnings
import weakref

from dask.utils import factors
from dask.system import CPU_COUNT
import toolz

from .spec import SpecCluster
from .utils import nprocesses_nthreads
from ..nanny import Nanny
from ..scheduler import Scheduler
from ..security import Security
Expand Down Expand Up @@ -243,34 +243,6 @@ def start_worker(self, *args, **kwargs):
)


def nprocesses_nthreads(n=CPU_COUNT):
"""
The default breakdown of processes and threads for a given number of cores
Parameters
----------
n: int
Number of available cores
Examples
--------
>>> nprocesses_nthreads(4)
(4, 1)
>>> nprocesses_nthreads(32)
(8, 4)
Returns
-------
nprocesses, nthreads
"""
if n <= 4:
processes = n
else:
processes = min(f for f in factors(n) if f >= math.sqrt(n))
threads = n // processes
return (processes, threads)


clusters_to_close = weakref.WeakSet()


Expand Down
14 changes: 14 additions & 0 deletions distributed/deploy/tests/test_deploy_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from distributed.deploy.utils import nprocesses_nthreads


def test_default_process_thread_breakdown():
assert nprocesses_nthreads(1) == (1, 1)
assert nprocesses_nthreads(4) == (4, 1)
assert nprocesses_nthreads(5) == (5, 1)
assert nprocesses_nthreads(8) == (4, 2)
assert nprocesses_nthreads(12) in ((6, 2), (4, 3))
assert nprocesses_nthreads(20) == (5, 4)
assert nprocesses_nthreads(24) in ((6, 4), (8, 3))
assert nprocesses_nthreads(32) == (8, 4)
assert nprocesses_nthreads(40) in ((8, 5), (10, 4))
assert nprocesses_nthreads(80) in ((10, 8), (16, 5))
15 changes: 1 addition & 14 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dask.system import CPU_COUNT
from distributed import Client, Worker, Nanny, get_client
from distributed.core import Status
from distributed.deploy.local import LocalCluster, nprocesses_nthreads
from distributed.deploy.local import LocalCluster
from distributed.metrics import time
from distributed.system import MEMORY_LIMIT
from distributed.utils_test import ( # noqa: F401
Expand Down Expand Up @@ -815,19 +815,6 @@ def test_local_tls_restart(loop):
assert workers_before != workers_after


def test_default_process_thread_breakdown():
assert nprocesses_nthreads(1) == (1, 1)
assert nprocesses_nthreads(4) == (4, 1)
assert nprocesses_nthreads(5) == (5, 1)
assert nprocesses_nthreads(8) == (4, 2)
assert nprocesses_nthreads(12) in ((6, 2), (4, 3))
assert nprocesses_nthreads(20) == (5, 4)
assert nprocesses_nthreads(24) in ((6, 4), (8, 3))
assert nprocesses_nthreads(32) == (8, 4)
assert nprocesses_nthreads(40) in ((8, 5), (10, 4))
assert nprocesses_nthreads(80) in ((10, 8), (16, 5))


def test_asynchronous_property(loop):
with LocalCluster(
4,
Expand Down
32 changes: 32 additions & 0 deletions distributed/deploy/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import math

from dask.system import CPU_COUNT
from dask.utils import factors


def nprocesses_nthreads(n=CPU_COUNT):
"""
The default breakdown of processes and threads for a given number of cores
Parameters
----------
n: int
Number of available cores
Examples
--------
>>> nprocesses_nthreads(4)
(4, 1)
>>> nprocesses_nthreads(32)
(8, 4)
Returns
-------
nprocesses, nthreads
"""
if n <= 4:
processes = n
else:
processes = min(f for f in factors(n) if f >= math.sqrt(n))
threads = n // processes
return (processes, threads)

0 comments on commit 5812314

Please sign in to comment.