diff --git a/distributed/worker.py b/distributed/worker.py index 010a5477ca..d964b5073c 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1538,7 +1538,10 @@ def add_task( if self.validate: if who_has: assert all(self.tasks[dep] in ts.dependencies for dep in who_has) - self.validate_state() + assert all(self.tasks[dep.key] for dep in ts.dependencies) + for dependency in ts.dependencies: + self.validate_task(dependency) + self.validate_task(ts) except Exception as e: logger.exception(e) if LOG_PDB: @@ -2498,7 +2501,6 @@ async def ensure_computing(self): continue if self.meets_resource_constraints(key): self.constrained.popleft() - # FIXME: Should this be part of the transition step? # Ensure task is deserialized prior to execution try: ts.runspec = await self._maybe_deserialize_task(ts) @@ -2518,7 +2520,6 @@ async def ensure_computing(self): elif ts.key in self.data: self.transition(ts, "memory") elif ts.state in READY: - # FIXME: Should this be part of the transition step? # Ensure task is deserialized prior to execution try: ts.runspec = await self._maybe_deserialize_task(ts) @@ -2929,8 +2930,6 @@ def validate_task_flight(self, ts): def validate_task(self, ts): assert ts.key in self.tasks - for dependency in ts.dependencies: - dependency in self.tasks try: if ts.state == "memory": self.validate_task_memory(ts)