From 5812314e7a3081d8e95808c12724f035c6a01ffd Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 7 Jan 2021 17:50:45 +0000 Subject: [PATCH] Add nprocs auto option to dask-worker CLI (#4377) --- distributed/cli/dask_worker.py | 11 +++++-- distributed/cli/tests/test_dask_worker.py | 9 ++++++ distributed/deploy/local.py | 30 +---------------- distributed/deploy/tests/test_deploy_utils.py | 14 ++++++++ distributed/deploy/tests/test_local.py | 15 +-------- distributed/deploy/utils.py | 32 +++++++++++++++++++ 6 files changed, 66 insertions(+), 45 deletions(-) create mode 100644 distributed/deploy/tests/test_deploy_utils.py create mode 100644 distributed/deploy/utils.py diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 140d0d1ab8f..3d60ce35603 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -14,6 +14,7 @@ from distributed import Nanny from distributed.cli.utils import check_python_3, install_signal_handlers from distributed.comm import get_address_host_port +from distributed.deploy.utils import nprocesses_nthreads from distributed.preloading import validate_preload_argv from distributed.proctitle import ( enable_proctitle_on_children, @@ -124,11 +125,12 @@ @click.option("--nthreads", type=int, default=0, help="Number of threads per process.") @click.option( "--nprocs", - type=int, + type=str, default=1, show_default=True, help="Number of worker processes to launch. " - "If negative, then (CPU_COUNT + 1 + nprocs) is used.", + "If negative, then (CPU_COUNT + 1 + nprocs) is used. " + "Set to 'auto' to set nprocs and nthreads dynamically based on CPU_COUNT", ) @click.option( "--name", @@ -292,6 +294,11 @@ def main( if v is not None } + if nprocs == "auto": + nprocs, nthreads = nprocesses_nthreads() + else: + nprocs = int(nprocs) + if nprocs < 0: nprocs = CPU_COUNT + 1 + nprocs diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index f877ff9209b..0055c38e6dd 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -12,6 +12,7 @@ import distributed.cli.dask_worker from distributed import Client, Scheduler +from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time from distributed.utils import sync, tmpfile, parse_ports from distributed.utils_test import popen, terminate_process, wait_for_port @@ -246,6 +247,14 @@ def test_nprocs_negative(loop): c.wait_for_workers(cpu_count(), timeout="10 seconds") +def test_nprocs_auto(loop): + with popen(["dask-scheduler", "--no-dashboard"]) as sched: + with popen(["dask-worker", "127.0.0.1:8786", "--nprocs=auto"]) as worker: + with Client("tcp://127.0.0.1:8786", loop=loop) as c: + procs, _ = nprocesses_nthreads() + c.wait_for_workers(procs, timeout="10 seconds") + + def test_nprocs_expands_name(loop): with popen(["dask-scheduler", "--no-dashboard"]) as sched: with popen( diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index f06892a1820..fa83ffae607 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -4,11 +4,11 @@ import warnings import weakref -from dask.utils import factors from dask.system import CPU_COUNT import toolz from .spec import SpecCluster +from .utils import nprocesses_nthreads from ..nanny import Nanny from ..scheduler import Scheduler from ..security import Security @@ -243,34 +243,6 @@ def start_worker(self, *args, **kwargs): ) -def nprocesses_nthreads(n=CPU_COUNT): - """ - The default breakdown of processes and threads for a given number of cores - - Parameters - ---------- - n: int - Number of available cores - - Examples - -------- - >>> nprocesses_nthreads(4) - (4, 1) - >>> nprocesses_nthreads(32) - (8, 4) - - Returns - ------- - nprocesses, nthreads - """ - if n <= 4: - processes = n - else: - processes = min(f for f in factors(n) if f >= math.sqrt(n)) - threads = n // processes - return (processes, threads) - - clusters_to_close = weakref.WeakSet() diff --git a/distributed/deploy/tests/test_deploy_utils.py b/distributed/deploy/tests/test_deploy_utils.py new file mode 100644 index 00000000000..871aca3fd30 --- /dev/null +++ b/distributed/deploy/tests/test_deploy_utils.py @@ -0,0 +1,14 @@ +from distributed.deploy.utils import nprocesses_nthreads + + +def test_default_process_thread_breakdown(): + assert nprocesses_nthreads(1) == (1, 1) + assert nprocesses_nthreads(4) == (4, 1) + assert nprocesses_nthreads(5) == (5, 1) + assert nprocesses_nthreads(8) == (4, 2) + assert nprocesses_nthreads(12) in ((6, 2), (4, 3)) + assert nprocesses_nthreads(20) == (5, 4) + assert nprocesses_nthreads(24) in ((6, 4), (8, 3)) + assert nprocesses_nthreads(32) == (8, 4) + assert nprocesses_nthreads(40) in ((8, 5), (10, 4)) + assert nprocesses_nthreads(80) in ((10, 8), (16, 5)) diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 2b63f28b10f..4f88d1fc3d6 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -17,7 +17,7 @@ from dask.system import CPU_COUNT from distributed import Client, Worker, Nanny, get_client from distributed.core import Status -from distributed.deploy.local import LocalCluster, nprocesses_nthreads +from distributed.deploy.local import LocalCluster from distributed.metrics import time from distributed.system import MEMORY_LIMIT from distributed.utils_test import ( # noqa: F401 @@ -815,19 +815,6 @@ def test_local_tls_restart(loop): assert workers_before != workers_after -def test_default_process_thread_breakdown(): - assert nprocesses_nthreads(1) == (1, 1) - assert nprocesses_nthreads(4) == (4, 1) - assert nprocesses_nthreads(5) == (5, 1) - assert nprocesses_nthreads(8) == (4, 2) - assert nprocesses_nthreads(12) in ((6, 2), (4, 3)) - assert nprocesses_nthreads(20) == (5, 4) - assert nprocesses_nthreads(24) in ((6, 4), (8, 3)) - assert nprocesses_nthreads(32) == (8, 4) - assert nprocesses_nthreads(40) in ((8, 5), (10, 4)) - assert nprocesses_nthreads(80) in ((10, 8), (16, 5)) - - def test_asynchronous_property(loop): with LocalCluster( 4, diff --git a/distributed/deploy/utils.py b/distributed/deploy/utils.py new file mode 100644 index 00000000000..4bfa32419bc --- /dev/null +++ b/distributed/deploy/utils.py @@ -0,0 +1,32 @@ +import math + +from dask.system import CPU_COUNT +from dask.utils import factors + + +def nprocesses_nthreads(n=CPU_COUNT): + """ + The default breakdown of processes and threads for a given number of cores + + Parameters + ---------- + n: int + Number of available cores + + Examples + -------- + >>> nprocesses_nthreads(4) + (4, 1) + >>> nprocesses_nthreads(32) + (8, 4) + + Returns + ------- + nprocesses, nthreads + """ + if n <= 4: + processes = n + else: + processes = min(f for f in factors(n) if f >= math.sqrt(n)) + threads = n // processes + return (processes, threads)