From dc471862b7efc8473583e7e96b6fcaf8d5c7c527 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 24 Aug 2021 12:47:58 -0700 Subject: [PATCH] Avoid during-iteration scheduler plugin changes (#5259) --- distributed/scheduler.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 63a7191cb3..f89e486356 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2245,7 +2245,7 @@ def _transition(self, key, finish: str, *args, **kwargs): ts._dependents = dependents ts._dependencies = dependencies parent._tasks[ts._key] = ts - for plugin in self.plugins.values(): + for plugin in list(self.plugins.values()): try: plugin.transition(key, start, finish2, *args, **kwargs) except Exception: @@ -3949,7 +3949,9 @@ def del_scheduler_file(): for preload in self.preloads: await preload.start() - await asyncio.gather(*[plugin.start(self) for plugin in self.plugins.values()]) + await asyncio.gather( + *[plugin.start(self) for plugin in list(self.plugins.values())] + ) self.start_periodic_callbacks() @@ -3988,7 +3990,9 @@ async def close(self, comm=None, fast=False, close_workers=False): else: break - await asyncio.gather(*[plugin.close() for plugin in self.plugins.values()]) + await asyncio.gather( + *[plugin.close() for plugin in list(self.plugins.values())] + ) for pc in self.periodic_callbacks.values(): pc.stop() @@ -4239,7 +4243,7 @@ async def add_worker( if ws._nthreads > len(ws._processing): parent._idle[ws._address] = ws - for plugin in self.plugins.values(): + for plugin in list(self.plugins.values()): try: result = plugin.add_worker(scheduler=self, worker=address) if inspect.isawaitable(result): @@ -4653,7 +4657,7 @@ def update_graph( recommendations[ts._key] = "erred" break - for plugin in self.plugins.values(): + for plugin in list(self.plugins.values()): try: plugin.update_graph( self, @@ -4916,7 +4920,7 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True): self.transitions(recommendations) - for plugin in self.plugins.values(): + for plugin in list(self.plugins.values()): try: result = plugin.remove_worker(scheduler=self, worker=address) if inspect.isawaitable(result): @@ -5219,7 +5223,7 @@ async def add_client(self, comm, client=None, versions=None): self.log_event(["all", client], {"action": "add-client", "client": client}) parent._clients[client] = ClientState(client, versions=versions) - for plugin in self.plugins.values(): + for plugin in list(self.plugins.values()): try: plugin.add_client(scheduler=self, client=client) except Exception as e: @@ -5274,7 +5278,7 @@ def remove_client(self, client=None): ) del parent._clients[client] - for plugin in self.plugins.values(): + for plugin in list(self.plugins.values()): try: plugin.remove_client(scheduler=self, client=client) except Exception as e: @@ -5511,8 +5515,8 @@ def remove_plugin(self, plugin=None, name=None): self.plugins.pop(plugin.name) else: # TODO: Remove this block of code once removing plugins by value is disabled - if plugin in self.plugins.values(): - if sum(plugin is p for p in self.plugins.values()) > 1: + if plugin in list(self.plugins.values()): + if sum(plugin is p for p in list(self.plugins.values())) > 1: raise ValueError( f"Multiple instances of {plugin} were found in the current scheduler " "plugins, we cannot remove this plugin." @@ -5755,7 +5759,7 @@ async def restart(self, client=None, timeout=30): self.clear_task_state() - for plugin in self.plugins.values(): + for plugin in list(self.plugins.values()): try: plugin.restart(self) except Exception as e: @@ -7023,7 +7027,7 @@ def start_task_metadata(self, comm=None, name=None): def stop_task_metadata(self, comm=None, name=None): plugins = [ p - for p in self.plugins.values() + for p in list(self.plugins.values()) if isinstance(p, CollectTaskMetaDataPlugin) and p.name == name ] if len(plugins) != 1: