diff --git a/distributed/scheduler.py b/distributed/scheduler.py index fe8d1c61f8..77a9be2f6d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4834,7 +4834,7 @@ def set_duration_estimate(self, ts: TaskState, ws: WorkerState): if exec_time > 2 * duration: total_duration = 2 * exec_time ws._processing[ts] = total_duration - return ws._processing[ts] + return total_duration def transition_waiting_processing(self, key): try: @@ -6139,23 +6139,27 @@ def reevaluate_occupancy(self, worker_index=0): def _reevaluate_occupancy_worker(self, ws: WorkerState): """ See reevaluate_occupancy """ - old = ws._occupancy - - new = 0 - nbytes = 0 + old: double = ws._occupancy + new: double = 0 + diff: double + ts: TaskState + est: double for ts in ws._processing: - new += self.set_duration_estimate(ts, ws) + est = self.set_duration_estimate(ts, ws) + new += est ws._occupancy = new - self.total_occupancy += new - old + diff = new - old + self.total_occupancy += diff self.check_idle_saturated(ws) # significant increase in duration - if (new > old * 1.3) and ("stealing" in self.extensions): - steal = self.extensions["stealing"] - for ts in ws._processing: - steal.remove_key_from_stealable(ts) - steal.put_key_in_stealable(ts) + if new > old * 1.3: + steal = self.extensions.get("stealing") + if steal is not None: + for ts in ws._processing: + steal.remove_key_from_stealable(ts) + steal.put_key_in_stealable(ts) async def check_worker_ttl(self): ws: WorkerState