Skip to content

Commit

Permalink
Reinstate the dependency tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Jan 7, 2021
1 parent 50d8ccb commit 391118f
Showing 1 changed file with 19 additions and 29 deletions.
48 changes: 19 additions & 29 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1514,15 +1514,12 @@ def add_task(

if self.address in workers and dep_ts.state != "memory":
logger.debug(
f"Who has claims Worker {self.name} would own data of {dependency} but this is untrue."
f"Who has claims Worker {self.name} would own data of {dependency} but this is false."
)
if dep_ts.state not in "memory":
if dep_ts.runspec is None:
logger.info(f"Need to fetch {dep_ts}")
ts.waiting_for_data.add(dep_ts.key)
self.waiting_for_data_count += 1
else:
logger.info(f"Runspec for dep {dep_ts} known.")

if dep_ts.state not in ("memory",) and dep_ts.runspec is None:
ts.waiting_for_data.add(dep_ts.key)
self.waiting_for_data_count += 1

ts.dependencies.add(dep_ts)
dep_ts.dependents.add(ts)
Expand All @@ -1536,8 +1533,6 @@ def add_task(
if ts.waiting_for_data:
if ts.key not in self.data_needed:
self.data_needed.append(ts.key)
else:
logger.debug("Key %s already tracked in data_needed" % ts.key)
else:
self.transition(ts, "ready")
if self.validate:
Expand Down Expand Up @@ -2148,25 +2143,13 @@ async def gather_dep(self, worker, deps, total_nbytes, cause):
if d not in self.tasks:
logger.debug("Task %s already forgotten." % d)
continue
assert isinstance(d, str)

ts = self.tasks[d]
# There is a race condition where the worker we're fetching
# the dependencies from is dead and the task is rescheduled.
# If the task is rescheduled on this worker it may already
# be in memory once the connection breaks up. In this case,
# data is empty and we do not need to transition anything
if ts.state == "memory":
continue

if ts.key in data:
self.transition(ts, "memory", value=data[d])
elif ts.state == "ready":
if ts.runspec is not None:
pass
# This is encountered if the key is missing but was
# already reassigned for computation on this worker
else:
raise RuntimeError("Encountered unexpected state")
else:

if ts.state != "memory":
self.transition(ts, "waiting", worker=worker)
missing.add(ts)

Expand All @@ -2179,8 +2162,6 @@ async def gather_dep(self, worker, deps, total_nbytes, cause):
assert isinstance(cause, str)
assert cause in self.tasks
self.data_needed.append(cause)
else:
logger.debug("Key %s already in data_needed" % cause)

if self.validate:
self.validate_state()
Expand Down Expand Up @@ -2309,7 +2290,7 @@ def release_key(self, key, reason=None, cause=None, report=True):
self.log.append((key, "release-key", {"cause": cause}))
else:
self.log.append((key, "release-key"))
# FIXME: What happens to data entries without corresponding TaskState?

if key in self.data and not ts.dependents:
try:
del self.data[key]
Expand All @@ -2318,6 +2299,15 @@ def release_key(self, key, reason=None, cause=None, report=True):
if key in self.actors and not ts.dependents:
del self.actors[key]

# for any dependencies of key we are releasing remove task as dependent
for dependency in ts.dependencies:
dependency.dependents.discard(ts)
if not dependency.dependents and dependency.state in (
"waiting",
"flight",
):
self.release_key(dependency.key)

for worker in ts.who_has:
self.has_what[worker].discard(ts.key)

Expand Down

0 comments on commit 391118f

Please sign in to comment.