Skip to content

Commit

Permalink
Type annotation of _reevaluate_occupancy_worker (#4398)
Browse files Browse the repository at this point in the history
* Return `total_duration` in `set_duration_estimate`

There is no need to retrieve this value when we already have it. So just
go ahead and return the variable directly.

* Group `old` and `new` variables

* Drop unused variable `nbytes`

* Use `.get(...)` to grab stealing extension

This way if it is `None`, we can check that and handle it much faster
(both in Python and Cython/C).

* Type iterated variable

* Assign `new - old` to `diff`

Hopefully should order computation to avoid a round off error.

* Type occupancy related variables as `double`

This should speed up mathematical operations with these variables.
  • Loading branch information
jakirkham authored Jan 7, 2021
1 parent 49e19c0 commit 8f33b9e
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4834,7 +4834,7 @@ def set_duration_estimate(self, ts: TaskState, ws: WorkerState):
if exec_time > 2 * duration:
total_duration = 2 * exec_time
ws._processing[ts] = total_duration
return ws._processing[ts]
return total_duration

def transition_waiting_processing(self, key):
try:
Expand Down Expand Up @@ -6139,23 +6139,27 @@ def reevaluate_occupancy(self, worker_index=0):

def _reevaluate_occupancy_worker(self, ws: WorkerState):
""" See reevaluate_occupancy """
old = ws._occupancy

new = 0
nbytes = 0
old: double = ws._occupancy
new: double = 0
diff: double
ts: TaskState
est: double
for ts in ws._processing:
new += self.set_duration_estimate(ts, ws)
est = self.set_duration_estimate(ts, ws)
new += est

ws._occupancy = new
self.total_occupancy += new - old
diff = new - old
self.total_occupancy += diff
self.check_idle_saturated(ws)

# significant increase in duration
if (new > old * 1.3) and ("stealing" in self.extensions):
steal = self.extensions["stealing"]
for ts in ws._processing:
steal.remove_key_from_stealable(ts)
steal.put_key_in_stealable(ts)
if new > old * 1.3:
steal = self.extensions.get("stealing")
if steal is not None:
for ts in ws._processing:
steal.remove_key_from_stealable(ts)
steal.put_key_in_stealable(ts)

async def check_worker_ttl(self):
ws: WorkerState
Expand Down

0 comments on commit 8f33b9e

Please sign in to comment.