From 391118f5ae1fd0ef5fbecb079e7b9070861878f6 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 7 Jan 2021 16:06:04 +0100 Subject: [PATCH] Reinstate the dependency tracking --- distributed/worker.py | 48 +++++++++++++++++-------------------------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 7e2577f097..010a5477ca 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1514,15 +1514,12 @@ def add_task( if self.address in workers and dep_ts.state != "memory": logger.debug( - f"Who has claims Worker {self.name} would own data of {dependency} but this is untrue." + f"Who has claims Worker {self.name} would own data of {dependency} but this is false." ) - if dep_ts.state not in "memory": - if dep_ts.runspec is None: - logger.info(f"Need to fetch {dep_ts}") - ts.waiting_for_data.add(dep_ts.key) - self.waiting_for_data_count += 1 - else: - logger.info(f"Runspec for dep {dep_ts} known.") + + if dep_ts.state not in ("memory",) and dep_ts.runspec is None: + ts.waiting_for_data.add(dep_ts.key) + self.waiting_for_data_count += 1 ts.dependencies.add(dep_ts) dep_ts.dependents.add(ts) @@ -1536,8 +1533,6 @@ def add_task( if ts.waiting_for_data: if ts.key not in self.data_needed: self.data_needed.append(ts.key) - else: - logger.debug("Key %s already tracked in data_needed" % ts.key) else: self.transition(ts, "ready") if self.validate: @@ -2148,25 +2143,13 @@ async def gather_dep(self, worker, deps, total_nbytes, cause): if d not in self.tasks: logger.debug("Task %s already forgotten." % d) continue - assert isinstance(d, str) + ts = self.tasks[d] - # There is a race condition where the worker we're fetching - # the dependencies from is dead and the task is rescheduled. - # If the task is rescheduled on this worker it may already - # be in memory once the connection breaks up. In this case, - # data is empty and we do not need to transition anything - if ts.state == "memory": - continue + if ts.key in data: self.transition(ts, "memory", value=data[d]) - elif ts.state == "ready": - if ts.runspec is not None: - pass - # This is encountered if the key is missing but was - # already reassigned for computation on this worker - else: - raise RuntimeError("Encountered unexpected state") - else: + + if ts.state != "memory": self.transition(ts, "waiting", worker=worker) missing.add(ts) @@ -2179,8 +2162,6 @@ async def gather_dep(self, worker, deps, total_nbytes, cause): assert isinstance(cause, str) assert cause in self.tasks self.data_needed.append(cause) - else: - logger.debug("Key %s already in data_needed" % cause) if self.validate: self.validate_state() @@ -2309,7 +2290,7 @@ def release_key(self, key, reason=None, cause=None, report=True): self.log.append((key, "release-key", {"cause": cause})) else: self.log.append((key, "release-key")) - # FIXME: What happens to data entries without corresponding TaskState? + if key in self.data and not ts.dependents: try: del self.data[key] @@ -2318,6 +2299,15 @@ def release_key(self, key, reason=None, cause=None, report=True): if key in self.actors and not ts.dependents: del self.actors[key] + # for any dependencies of key we are releasing remove task as dependent + for dependency in ts.dependencies: + dependency.dependents.discard(ts) + if not dependency.dependents and dependency.state in ( + "waiting", + "flight", + ): + self.release_key(dependency.key) + for worker in ts.who_has: self.has_what[worker].discard(ts.key)