Skip to content

Commit

Permalink
Avoid during-iteration scheduler plugin changes (#5259)
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis authored Aug 24, 2021
1 parent 09a89c5 commit dc47186
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2245,7 +2245,7 @@ def _transition(self, key, finish: str, *args, **kwargs):
ts._dependents = dependents
ts._dependencies = dependencies
parent._tasks[ts._key] = ts
for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
plugin.transition(key, start, finish2, *args, **kwargs)
except Exception:
Expand Down Expand Up @@ -3949,7 +3949,9 @@ def del_scheduler_file():
for preload in self.preloads:
await preload.start()

await asyncio.gather(*[plugin.start(self) for plugin in self.plugins.values()])
await asyncio.gather(
*[plugin.start(self) for plugin in list(self.plugins.values())]
)

self.start_periodic_callbacks()

Expand Down Expand Up @@ -3988,7 +3990,9 @@ async def close(self, comm=None, fast=False, close_workers=False):
else:
break

await asyncio.gather(*[plugin.close() for plugin in self.plugins.values()])
await asyncio.gather(
*[plugin.close() for plugin in list(self.plugins.values())]
)

for pc in self.periodic_callbacks.values():
pc.stop()
Expand Down Expand Up @@ -4239,7 +4243,7 @@ async def add_worker(
if ws._nthreads > len(ws._processing):
parent._idle[ws._address] = ws

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
result = plugin.add_worker(scheduler=self, worker=address)
if inspect.isawaitable(result):
Expand Down Expand Up @@ -4653,7 +4657,7 @@ def update_graph(
recommendations[ts._key] = "erred"
break

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
plugin.update_graph(
self,
Expand Down Expand Up @@ -4916,7 +4920,7 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True):

self.transitions(recommendations)

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
result = plugin.remove_worker(scheduler=self, worker=address)
if inspect.isawaitable(result):
Expand Down Expand Up @@ -5219,7 +5223,7 @@ async def add_client(self, comm, client=None, versions=None):
self.log_event(["all", client], {"action": "add-client", "client": client})
parent._clients[client] = ClientState(client, versions=versions)

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
plugin.add_client(scheduler=self, client=client)
except Exception as e:
Expand Down Expand Up @@ -5274,7 +5278,7 @@ def remove_client(self, client=None):
)
del parent._clients[client]

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
plugin.remove_client(scheduler=self, client=client)
except Exception as e:
Expand Down Expand Up @@ -5511,8 +5515,8 @@ def remove_plugin(self, plugin=None, name=None):
self.plugins.pop(plugin.name)
else:
# TODO: Remove this block of code once removing plugins by value is disabled
if plugin in self.plugins.values():
if sum(plugin is p for p in self.plugins.values()) > 1:
if plugin in list(self.plugins.values()):
if sum(plugin is p for p in list(self.plugins.values())) > 1:
raise ValueError(
f"Multiple instances of {plugin} were found in the current scheduler "
"plugins, we cannot remove this plugin."
Expand Down Expand Up @@ -5755,7 +5759,7 @@ async def restart(self, client=None, timeout=30):

self.clear_task_state()

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
plugin.restart(self)
except Exception as e:
Expand Down Expand Up @@ -7023,7 +7027,7 @@ def start_task_metadata(self, comm=None, name=None):
def stop_task_metadata(self, comm=None, name=None):
plugins = [
p
for p in self.plugins.values()
for p in list(self.plugins.values())
if isinstance(p, CollectTaskMetaDataPlugin) and p.name == name
]
if len(plugins) != 1:
Expand Down

0 comments on commit dc47186

Please sign in to comment.