Skip to content

Commit

Permalink
keep depentents and dependencies immutable after initial create
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Jan 7, 2021
1 parent f0e4323 commit cdd303f
Showing 1 changed file with 2 additions and 24 deletions.
26 changes: 2 additions & 24 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1499,14 +1499,6 @@ def add_task(
dep_ts.state = "waiting"
logger.debug(f"New dep {dep_ts}")
self.log.append((dependency, "new-dep", dep_ts.state))
if dep_ts.key in self.data:
dep_ts.state = "memory"
logger.debug(
"Encountered dependency %s which was in data but not tracked in tasks"
% dep_ts.key
)
if self.validate:
self.validate_task_memory(dep_ts)
else:
dep_ts = self.tasks[dependency]
logger.debug(f"Known dep {dep_ts}")
Expand Down Expand Up @@ -1766,11 +1758,6 @@ def transition_executing_done(self, ts, value=no_value, report=True):
ts.state = "error"
out = "error"

# Don't release the dependency keys, but do remove them from `dependents`
for dependency in ts.dependencies:
dependency.dependents.discard(ts)
ts.dependencies.clear()

if report and self.batched_stream and self.status == Status.running:
self.send_task_state_to_scheduler(ts)
else:
Expand Down Expand Up @@ -2294,23 +2281,14 @@ def release_key(self, key, reason=None, cause=None, report=True):
else:
self.log.append((key, "release-key"))

if key in self.data and not ts.dependents:
if key in self.data:
try:
del self.data[key]
except FileNotFoundError:
logger.error("Tried to delete %s but no file found", exc_info=True)
if key in self.actors and not ts.dependents:
if key in self.actors:
del self.actors[key]

# for any dependencies of key we are releasing remove task as dependent
for dependency in ts.dependencies:
dependency.dependents.discard(ts)
if not dependency.dependents and dependency.state in (
"waiting",
"flight",
):
self.release_key(dependency.key)

for worker in ts.who_has:
self.has_what[worker].discard(ts.key)

Expand Down

0 comments on commit cdd303f

Please sign in to comment.