From 3c7b5b1f26371a829f03ee2280790fad5eb9d1e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Sat, 23 May 2020 17:47:37 +0200 Subject: [PATCH] Don't send empty dependencies (#3423) --- distributed/client.py | 1 + distributed/diagnostics/graph_layout.py | 13 +++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 8773311194..36234ea2e9 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -2574,6 +2574,7 @@ def _graph_to_futures( dependencies = { tokey(k): [tokey(dep) for dep in deps] for k, deps in dependencies.items() + if deps } for k, deps in future_dependencies.items(): if deps: diff --git a/distributed/diagnostics/graph_layout.py b/distributed/diagnostics/graph_layout.py index a348d2e04e..ab36bd6978 100644 --- a/distributed/diagnostics/graph_layout.py +++ b/distributed/diagnostics/graph_layout.py @@ -36,16 +36,21 @@ def __init__(self, scheduler): } priority = {k: ts.priority for k, ts in scheduler.tasks.items()} self.update_graph( - self.scheduler, dependencies=dependencies, priority=priority + self.scheduler, + tasks=self.scheduler.tasks, + dependencies=dependencies, + priority=priority, ) - def update_graph(self, scheduler, dependencies=None, priority=None, **kwargs): - stack = sorted(dependencies, key=lambda k: priority.get(k, 0), reverse=True) + def update_graph( + self, scheduler, dependencies=None, priority=None, tasks=None, **kwargs + ): + stack = sorted(tasks, key=lambda k: priority.get(k, 0), reverse=True) while stack: key = stack.pop() if key in self.x or key not in scheduler.tasks: continue - deps = dependencies[key] + deps = dependencies.get(key, ()) if deps: if not all(dep in self.y for dep in deps): stack.append(key)