Skip to content

Commit

Permalink
Remove nbytes_in_memory (#4930)
Browse files Browse the repository at this point in the history
This doesn't appear to be necessary
  • Loading branch information
mrocklin authored Jun 18, 2021
1 parent edc1238 commit d9bc3c6
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 24 deletions.
19 changes: 0 additions & 19 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,11 +872,6 @@ def __repr__(self):
+ ">"
)

@property
def nbytes_in_memory(self):
tg: TaskGroup
return sum([tg._nbytes_in_memory for tg in self._groups])

@property
def nbytes_total(self):
tg: TaskGroup
Expand Down Expand Up @@ -922,10 +917,6 @@ class TaskGroup:
The total number of bytes that this task group has produced
.. attribute:: nbytes_in_memory: int
The number of bytes currently stored by this TaskGroup
.. attribute:: duration: float
The total amount of time spent on all tasks in this TaskGroup
Expand All @@ -944,7 +935,6 @@ class TaskGroup:
_states: dict
_dependencies: set
_nbytes_total: Py_ssize_t
_nbytes_in_memory: Py_ssize_t
_duration: double
_types: set
_start: double
Expand All @@ -958,7 +948,6 @@ def __init__(self, name: str):
self._states["forgotten"] = 0
self._dependencies = set()
self._nbytes_total = 0
self._nbytes_in_memory = 0
self._duration = 0
self._types = set()
self._start = 0.0
Expand All @@ -985,10 +974,6 @@ def dependencies(self):
def nbytes_total(self):
return self._nbytes_total

@property
def nbytes_in_memory(self):
return self._nbytes_in_memory

@property
def duration(self):
return self._duration
Expand Down Expand Up @@ -1559,7 +1544,6 @@ def set_nbytes(self, nbytes: Py_ssize_t):
if old_nbytes >= 0:
diff -= old_nbytes
self._group._nbytes_total += diff
self._group._nbytes_in_memory += diff
ws: WorkerState
for ws in self._who_has:
ws._nbytes += diff
Expand Down Expand Up @@ -2646,7 +2630,6 @@ def transition_memory_released(self, key, safe: bint = False):
for ws in ts._who_has:
del ws._has_what[ts]
ws._nbytes -= ts_nbytes
ts._group._nbytes_in_memory -= ts_nbytes
worker_msgs[ws._address] = [worker_msg]

ts._who_has.clear()
Expand Down Expand Up @@ -7336,8 +7319,6 @@ def _propagate_forgotten(
ts._waiting_on.clear()

ts_nbytes: Py_ssize_t = ts.get_nbytes()
if ts._who_has:
ts._group._nbytes_in_memory -= ts_nbytes

ws: WorkerState
for ws in ts._who_has:
Expand Down
5 changes: 0 additions & 5 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1895,7 +1895,6 @@ async def test_task_groups(c, s, a, b):
# these must be true since in this simple case there is a 1to1 mapping
# between prefix and group
assert tg.duration == tp.duration
assert tg.nbytes_in_memory == tp.nbytes_in_memory
assert tg.nbytes_total == tp.nbytes_total
# It should map down to individual tasks
assert tg.nbytes_total == sum(
Expand All @@ -1908,16 +1907,13 @@ async def test_task_groups(c, s, a, b):
if ts.group is tg and ts.state == "memory"
]
)
assert tg.nbytes_in_memory == in_memory_ts

tg = s.task_groups[y.name]
assert tg.states["memory"] == 5

assert s.task_groups[y.name].dependencies == {s.task_groups[x.name]}

await c.replicate(y)
# TODO: Are we supposed to track replicated memory here? See also Scheduler.add_keys
assert tg.nbytes_in_memory == y.nbytes
assert "array" in str(tg.types)
assert "array" in str(tp.types)

Expand All @@ -1926,7 +1922,6 @@ async def test_task_groups(c, s, a, b):
while s.tasks:
await asyncio.sleep(0.01)

assert tg.nbytes_in_memory == 0
assert tg.states["forgotten"] == 5
# Ensure TaskGroup is removed once all tasks are in forgotten state
assert tg.name not in s.task_groups
Expand Down

0 comments on commit d9bc3c6

Please sign in to comment.