From 0e8f2f1a89a94cf76eefa703bbd0f16d9cf86b87 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 24 Mar 2022 04:43:45 -0500 Subject: [PATCH 1/4] Set lower tick frequency in tests (#5977) --- distributed/tests/test_core.py | 4 ++-- distributed/utils_test.py | 19 ++++++++++++------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 05b7133b1a..c2e9cc180c 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -761,7 +761,7 @@ async def test_counters(): assert c["op"].components[0] == {"identity": 2, "div": 1} -@gen_cluster() +@gen_cluster(config={"distributed.admin.tick.interval": "20 ms"}) async def test_ticks(s, a, b): pytest.importorskip("crick") await asyncio.sleep(0.1) @@ -770,7 +770,7 @@ async def test_ticks(s, a, b): assert 0.01 < c.components[0].quantile(0.5) < 0.5 -@gen_cluster() +@gen_cluster(config={"distributed.admin.tick.interval": "20 ms"}) async def test_tick_logging(s, a, b): pytest.importorskip("crick") from distributed import core diff --git a/distributed/utils_test.py b/distributed/utils_test.py index c1cbfdca23..3e558a0b55 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1770,13 +1770,18 @@ def clean(threads=not WINDOWS, instances=True, timeout=1, processes=True): with check_active_rpc(loop, timeout): reset_config() - dask.config.set({"distributed.comm.timeouts.connect": "5s"}) - # Restore default logging levels - # XXX use pytest hooks/fixtures instead? - for name, level in logging_levels.items(): - logging.getLogger(name).setLevel(level) - - yield loop + with dask.config.set( + { + "distributed.comm.timeouts.connect": "5s", + "distributed.admin.tick.interval": "500 ms", + } + ): + # Restore default logging levels + # XXX use pytest hooks/fixtures instead? + for name, level in logging_levels.items(): + logging.getLogger(name).setLevel(level) + + yield loop @pytest.fixture From 9d3064abe2b5c65eb00a092bd4e08a213f670320 Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Thu, 24 Mar 2022 11:18:34 +0100 Subject: [PATCH 2/4] Remove cache in iscoroutinefunction to avoid holding on to refs (#5985) --- distributed/utils.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/distributed/utils.py b/distributed/utils.py index afe048c2ac..64203488d1 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1124,21 +1124,8 @@ def color_of(x, palette=palette): return palette[n % len(palette)] -def _iscoroutinefunction(f): - return inspect.iscoroutinefunction(f) or gen.is_coroutine_function(f) - - -@functools.lru_cache(None) -def _iscoroutinefunction_cached(f): - return _iscoroutinefunction(f) - - def iscoroutinefunction(f): - # Attempt to use lru_cache version and fall back to non-cached version if needed - try: - return _iscoroutinefunction_cached(f) - except TypeError: # unhashable type - return _iscoroutinefunction(f) + return inspect.iscoroutinefunction(f) or gen.is_coroutine_function(f) @contextmanager From 0b10b031c0d3958d389ca45ffa5736d596093202 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 24 Mar 2022 10:35:36 +0000 Subject: [PATCH 3/4] update celery and other outdated 3rd party URLs (#5988) --- docs/source/related-work.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/related-work.rst b/docs/source/related-work.rst index 9e20f2471f..11aa254d6b 100644 --- a/docs/source/related-work.rst +++ b/docs/source/related-work.rst @@ -72,14 +72,14 @@ There are a couple of older projects that often get mentioned * Pyro_: Remote objects / RPC .. _Luigi: https://luigi.readthedocs.io/en/latest/ -.. _MPI4Py: http://mpi4py.readthedocs.io/en/stable/ +.. _MPI4Py: https://mpi4py.readthedocs.io/en/stable/ .. _PyZMQ: https://github.com/zeromq/pyzmq -.. _Celery: http://www.celeryproject.org/ +.. _Celery: https://docs.celeryq.dev/ .. _`IPython Parallel`: https://ipyparallel.readthedocs.io/en/latest/ .. _Scoop: https://github.com/soravux/scoop/ .. _`concurrent.futures`: https://docs.python.org/3/library/concurrent.futures.html -.. _Dispy: http://dispy.sourceforge.net/ -.. _Pyro: https://pythonhosted.org/Pyro4/ +.. _Dispy: https://dispy.org/ +.. _Pyro: https://pyro4.readthedocs.io/ Relationship ------------ From 5c7d5550aa535ad525a842ce7796badec7ff9453 Mon Sep 17 00:00:00 2001 From: Duncan McGregor Date: Thu, 24 Mar 2022 12:29:11 +0000 Subject: [PATCH 4/4] Remove duplication from stealing (#5787) --- distributed/stealing.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index 6f957ca2f4..8fb7e14a19 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -415,10 +415,7 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier): if not idle: break - if _has_restrictions(ts): - thieves = [ws for ws in idle if _can_steal(ws, ts, sat)] - else: - thieves = idle + thieves = _potential_thieves_for(ts, idle) if not thieves: break thief = thieves[i % len(thieves)] @@ -451,10 +448,7 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier): continue i += 1 - if _has_restrictions(ts): - thieves = [ws for ws in idle if _can_steal(ws, ts, sat)] - else: - thieves = idle + thieves = _potential_thieves_for(ts, idle) if not thieves: continue thief = thieves[i % len(thieves)] @@ -492,18 +486,16 @@ def story(self, *keys): return out -def _has_restrictions(ts): - """Determine whether the given task has restrictions and whether these - restrictions are strict. - """ - return not ts.loose_restrictions and ( - ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions - ) +def _potential_thieves_for(ts, idle): + """Return the list of workers from ``idle`` that could steal ``ts``.""" + if _has_restrictions(ts): + return [ws for ws in idle if _can_steal(ws, ts)] + else: + return idle -def _can_steal(thief, ts, victim): - """Determine whether worker ``thief`` can steal task ``ts`` from worker - ``victim``. +def _can_steal(thief, ts): + """Determine whether worker ``thief`` can steal task ``ts``. Assumes that `ts` has some restrictions. """ @@ -529,4 +521,13 @@ def _can_steal(thief, ts, victim): return True +def _has_restrictions(ts): + """Determine whether the given task has restrictions and whether these + restrictions are strict. + """ + return not ts.loose_restrictions and ( + ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions + ) + + fast_tasks = {"split-shuffle"}