From d984074c8cb968ad9e98cd14e1340a87f7034398 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 16 Oct 2023 19:53:54 +0200 Subject: [PATCH 1/4] Unlimited limiter --- distributed/dashboard/components/scheduler.py | 4 +- distributed/shuffle/_buffer.py | 6 +-- distributed/shuffle/_limiter.py | 37 ++++++++++++----- distributed/shuffle/tests/test_limiter.py | 40 ++++++++++++++----- 4 files changed, 64 insertions(+), 23 deletions(-) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index d25c56f65e..85d03ff6aa 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -4459,7 +4459,9 @@ def update(self): for prefix in ["comm", "disk"]: data[f"{prefix}_total"].append(d[prefix]["total"]) data[f"{prefix}_memory"].append(d[prefix]["memory"]) - data[f"{prefix}_memory_limit"].append(d[prefix]["memory_limit"]) + data[f"{prefix}_memory_limit"].append( + d[prefix]["memory_limit"] or 0 + ) data[f"{prefix}_buckets"].append(d[prefix]["buckets"]) data[f"{prefix}_avg_duration"].append( d[prefix]["diagnostics"].get("avg_duration", 0) diff --git a/distributed/shuffle/_buffer.py b/distributed/shuffle/_buffer.py index e329d54dcc..279eff53d6 100644 --- a/distributed/shuffle/_buffer.py +++ b/distributed/shuffle/_buffer.py @@ -46,7 +46,7 @@ class ShardsBuffer(Generic[ShardType]): shards: defaultdict[str, _List[ShardType]] sizes: defaultdict[str, int] concurrency_limit: int - memory_limiter: ResourceLimiter | None + memory_limiter: ResourceLimiter diagnostics: dict[str, float] max_message_size: int @@ -74,7 +74,7 @@ def __init__( self._exception = None self.concurrency_limit = concurrency_limit self._inputs_done = False - self.memory_limiter = memory_limiter + self.memory_limiter = memory_limiter or ResourceLimiter() self.diagnostics: dict[str, float] = defaultdict(float) self._tasks = [ asyncio.create_task(self._background_task()) @@ -97,7 +97,7 @@ def heartbeat(self) -> dict[str, Any]: "written": self.bytes_written, "read": self.bytes_read, "diagnostics": self.diagnostics, - "memory_limit": self.memory_limiter._maxvalue if self.memory_limiter else 0, + "memory_limit": self.memory_limiter.limit, } async def process(self, id: str, shards: list[ShardType], size: int) -> None: diff --git a/distributed/shuffle/_limiter.py b/distributed/shuffle/_limiter.py index f3591b53f7..79c8e60351 100644 --- a/distributed/shuffle/_limiter.py +++ b/distributed/shuffle/_limiter.py @@ -9,7 +9,7 @@ class ResourceLimiter: """Limit an abstract resource This allows us to track usage of an abstract resource. If the usage of this - resources goes beyond a defined maxvalue, we can block further execution + resources goes beyond a defined limit, we can block further execution Example:: @@ -18,12 +18,20 @@ class ResourceLimiter: limiter.increase(2) limiter.decrease(1) - # This will block since we're still not below maxvalue + # This will block since we're still not below limit await limiter.wait_for_available() """ - def __init__(self, maxvalue: int) -> None: - self._maxvalue = maxvalue + limit: int | None + time_blocked_total: float + time_blocked_avg: float + + _acquired: int + _condition: asyncio.Condition + _waiters: int + + def __init__(self, limit: int | None = None) -> None: + self.limit = limit self._acquired = 0 self._condition = asyncio.Condition() self._waiters = 0 @@ -31,26 +39,35 @@ def __init__(self, maxvalue: int) -> None: self.time_blocked_avg = 0.0 def __repr__(self) -> str: - return f"" + return f"" - def available(self) -> int: + @property + def available(self) -> int | None: """How far can the value be increased before blocking""" - return max(0, self._maxvalue - self._acquired) + if self.limit is None: + return None + return max(0, self.limit - self._acquired) + + @property + def full(self) -> bool: + """Return True if the limit has been reached""" + return self.available is None or bool(self.available) + @property def free(self) -> bool: """Return True if nothing has been acquired / the limiter is in a neutral state""" return self._acquired == 0 async def wait_for_available(self) -> None: - """Block until the counter drops below maxvalue""" + """Block until the counter drops below limit""" start = time() duration = 0.0 try: - if self.available(): + if not self.full: return async with self._condition: self._waiters += 1 - await self._condition.wait_for(self.available) + await self._condition.wait_for(lambda: not self.full) self._waiters -= 1 duration = time() - start finally: diff --git a/distributed/shuffle/tests/test_limiter.py b/distributed/shuffle/tests/test_limiter.py index da035f9ba7..3b8dd4a3d1 100644 --- a/distributed/shuffle/tests/test_limiter.py +++ b/distributed/shuffle/tests/test_limiter.py @@ -16,33 +16,33 @@ async def test_limiter_basic(): assert isinstance(repr(res), str) res.increase(2) - assert res.available() == 3 + assert res.available == 3 res.increase(3) - assert not res.available() + assert not res.available # This is too much res.increase(1) - assert not res.available() + assert not res.available with pytest.raises(asyncio.TimeoutError): await wait_for(res.wait_for_available(), 0.1) await res.decrease(1) - assert not res.available() + assert not res.available with pytest.raises(asyncio.TimeoutError): await wait_for(res.wait_for_available(), 0.1) await res.decrease(1) - assert res.available() == 1 + assert res.available == 1 await res.wait_for_available() res.increase(1) - assert not res.available() + assert not res.available with pytest.raises(asyncio.TimeoutError): await wait_for(res.wait_for_available(), 0.1) await res.decrease(5) - assert res.available() == 5 + assert res.available == 5 with pytest.raises(RuntimeError, match="more"): await res.decrease(1) @@ -52,10 +52,32 @@ async def test_limiter_basic(): await wait_for(res.wait_for_available(), 0.1) await res.decrease(3) - assert not res.available() + assert not res.available await res.decrease(5) - assert res.available() == 3 + assert res.available == 3 + + +@gen_test() +async def test_unlimited_limiter(): + res = ResourceLimiter() + + assert res.free + assert res.available is None + assert not res.full + + res.increase(3) + assert not res.free + assert res.available is None + assert not res.full + + res.increase(2**40) + assert not res.free + assert res.available is None + assert not res.full + + await res.wait_for_available() + assert res.time_blocked_total == 0 @gen_test() From 433b82a3d5e9ac0379ad30c79a49aaae4d687976 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 16 Oct 2023 19:56:51 +0200 Subject: [PATCH 2/4] Make limiter required --- distributed/shuffle/_buffer.py | 4 ++-- distributed/shuffle/_comms.py | 13 ++++++------- distributed/shuffle/_disk.py | 4 ++-- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/distributed/shuffle/_buffer.py b/distributed/shuffle/_buffer.py index 279eff53d6..2a37d6c380 100644 --- a/distributed/shuffle/_buffer.py +++ b/distributed/shuffle/_buffer.py @@ -64,7 +64,7 @@ class ShardsBuffer(Generic[ShardType]): def __init__( self, - memory_limiter: ResourceLimiter | None, + memory_limiter: ResourceLimiter, concurrency_limit: int = 2, max_message_size: int = -1, ) -> None: @@ -74,7 +74,7 @@ def __init__( self._exception = None self.concurrency_limit = concurrency_limit self._inputs_done = False - self.memory_limiter = memory_limiter or ResourceLimiter() + self.memory_limiter = memory_limiter self.diagnostics: dict[str, float] = defaultdict(float) self._tasks = [ asyncio.create_task(self._background_task()) diff --git a/distributed/shuffle/_comms.py b/distributed/shuffle/_comms.py index 255d433d88..020313debe 100644 --- a/distributed/shuffle/_comms.py +++ b/distributed/shuffle/_comms.py @@ -39,12 +39,11 @@ class CommShardsBuffer(ShardsBuffer): How to send a list of shards to a worker Expects an address of the target worker (string) and a payload of shards (list of bytes) to send to that worker - memory_limiter : ResourceLimiter, optional - Limiter for memory usage (in bytes), or None if no limiting - should be applied. If the incoming data that has yet to be - processed exceeds this limit, then the buffer will block until - below the threshold. See :meth:`.write` for the implementation - of this scheme. + memory_limiter : ResourceLimiter + Limiter for memory usage (in bytes). If the incoming data that + has yet to be processed exceeds this limit, then the buffer will + block until below the threshold. See :meth:`.write` for the + implementation of this scheme. concurrency_limit : int Number of background tasks to run. """ @@ -54,7 +53,7 @@ class CommShardsBuffer(ShardsBuffer): def __init__( self, send: Callable[[str, list[tuple[Any, bytes]]], Awaitable[None]], - memory_limiter: ResourceLimiter | None = None, + memory_limiter: ResourceLimiter, concurrency_limit: int = 10, ): super().__init__( diff --git a/distributed/shuffle/_disk.py b/distributed/shuffle/_disk.py index b4878cd1ea..da7aa1ea35 100644 --- a/distributed/shuffle/_disk.py +++ b/distributed/shuffle/_disk.py @@ -110,7 +110,7 @@ class DiskShardsBuffer(ShardsBuffer): ---------- directory : str or pathlib.Path Where to write and read data. Ideally points to fast disk. - memory_limiter : ResourceLimiter, optional + memory_limiter : ResourceLimiter Limiter for in-memory buffering (at most this much data) before writes to disk occur. If the incoming data that has yet to be processed exceeds this limit, then the buffer will block @@ -122,7 +122,7 @@ def __init__( self, directory: str | pathlib.Path, read: Callable[[pathlib.Path], tuple[Any, int]], - memory_limiter: ResourceLimiter | None = None, + memory_limiter: ResourceLimiter, ): super().__init__( memory_limiter=memory_limiter, From 2f32a2327c2127e94c482dfd285eee488c71f34a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 17 Oct 2023 12:36:54 +0200 Subject: [PATCH 3/4] Typing --- distributed/shuffle/_limiter.py | 17 +++++++++------- distributed/shuffle/tests/test_buffer.py | 4 ++-- distributed/shuffle/tests/test_comm_buffer.py | 14 +++++++------ distributed/shuffle/tests/test_disk_buffer.py | 20 +++++++++++++------ distributed/shuffle/tests/test_limiter.py | 8 ++++---- 5 files changed, 38 insertions(+), 25 deletions(-) diff --git a/distributed/shuffle/_limiter.py b/distributed/shuffle/_limiter.py index 79c8e60351..35b30267f4 100644 --- a/distributed/shuffle/_limiter.py +++ b/distributed/shuffle/_limiter.py @@ -1,11 +1,14 @@ from __future__ import annotations import asyncio +from typing import Generic, TypeVar from distributed.metrics import time +_T = TypeVar("_T", int, None) -class ResourceLimiter: + +class ResourceLimiter(Generic[_T]): """Limit an abstract resource This allows us to track usage of an abstract resource. If the usage of this @@ -22,7 +25,7 @@ class ResourceLimiter: await limiter.wait_for_available() """ - limit: int | None + limit: _T time_blocked_total: float time_blocked_avg: float @@ -30,7 +33,7 @@ class ResourceLimiter: _condition: asyncio.Condition _waiters: int - def __init__(self, limit: int | None = None) -> None: + def __init__(self, limit: _T): self.limit = limit self._acquired = 0 self._condition = asyncio.Condition() @@ -42,19 +45,19 @@ def __repr__(self) -> str: return f"" @property - def available(self) -> int | None: + def available(self) -> _T: """How far can the value be increased before blocking""" if self.limit is None: - return None + return self.limit return max(0, self.limit - self._acquired) @property def full(self) -> bool: """Return True if the limit has been reached""" - return self.available is None or bool(self.available) + return self.available is not None and not self.available @property - def free(self) -> bool: + def empty(self) -> bool: """Return True if nothing has been acquired / the limiter is in a neutral state""" return self._acquired == 0 diff --git a/distributed/shuffle/tests/test_buffer.py b/distributed/shuffle/tests/test_buffer.py index 53e862d829..a3ad4b07ea 100644 --- a/distributed/shuffle/tests/test_buffer.py +++ b/distributed/shuffle/tests/test_buffer.py @@ -70,7 +70,7 @@ async def test_memory_limit(big_payloads): many_small = [asyncio.create_task(buf.write(small_payload)) for _ in range(11)] assert buf.memory_limiter - while buf.memory_limiter.available(): + while buf.memory_limiter.available: await asyncio.sleep(0.1) new_put = asyncio.create_task(buf.write(small_payload)) @@ -80,7 +80,7 @@ async def test_memory_limit(big_payloads): many_small = asyncio.gather(*many_small) await new_put - while not buf.memory_limiter.free(): + while not buf.memory_limiter.empty: await asyncio.sleep(0.1) buf.allow_process.clear() big_tasks = [ diff --git a/distributed/shuffle/tests/test_comm_buffer.py b/distributed/shuffle/tests/test_comm_buffer.py index f10d945a14..36896c547d 100644 --- a/distributed/shuffle/tests/test_comm_buffer.py +++ b/distributed/shuffle/tests/test_comm_buffer.py @@ -20,7 +20,7 @@ async def test_basic(tmp_path): async def send(address, shards): d[address].extend(shards) - mc = CommShardsBuffer(send=send) + mc = CommShardsBuffer(send=send, memory_limiter=ResourceLimiter(None)) await mc.write({"x": b"0" * 1000, "y": b"1" * 500}) await mc.write({"x": b"0" * 1000, "y": b"1" * 500}) @@ -37,7 +37,7 @@ async def test_exceptions(tmp_path): async def send(address, shards): raise Exception(123) - mc = CommShardsBuffer(send=send) + mc = CommShardsBuffer(send=send, memory_limiter=ResourceLimiter(None)) await mc.write({"x": b"0" * 1000, "y": b"1" * 500}) while not mc._exception: @@ -63,7 +63,9 @@ async def send(address, shards): d[address].extend(shards) sending_first.set() - mc = CommShardsBuffer(send=send, concurrency_limit=1) + mc = CommShardsBuffer( + send=send, concurrency_limit=1, memory_limiter=ResourceLimiter(None) + ) await mc.write({"x": b"0", "y": b"1"}) await mc.write({"x": b"0", "y": b"1"}) flush_task = asyncio.create_task(mc.flush()) @@ -96,7 +98,7 @@ async def send(address, shards): send=send, memory_limiter=ResourceLimiter(parse_bytes("100 MiB")) ) payload = { - x: gen_bytes(frac, comm_buffer.memory_limiter._maxvalue) for x in range(nshards) + x: gen_bytes(frac, comm_buffer.memory_limiter.limit) for x in range(nshards) } async with comm_buffer as mc: @@ -113,7 +115,7 @@ async def send(address, shards): assert len(d) == 10 assert ( sum(map(len, d[0])) - == len(gen_bytes(frac, comm_buffer.memory_limiter._maxvalue)) * nputs + == len(gen_bytes(frac, comm_buffer.memory_limiter.limit)) * nputs ) @@ -137,7 +139,7 @@ async def send(address, shards): send=send, memory_limiter=ResourceLimiter(parse_bytes("100 MiB")) ) payload = { - x: gen_bytes(frac, comm_buffer.memory_limiter._maxvalue) for x in range(nshards) + x: gen_bytes(frac, comm_buffer.memory_limiter.limit) for x in range(nshards) } async with comm_buffer as mc: diff --git a/distributed/shuffle/tests/test_disk_buffer.py b/distributed/shuffle/tests/test_disk_buffer.py index 19192de582..40347a90b9 100644 --- a/distributed/shuffle/tests/test_disk_buffer.py +++ b/distributed/shuffle/tests/test_disk_buffer.py @@ -8,6 +8,7 @@ import pytest from distributed.shuffle._disk import DiskShardsBuffer +from distributed.shuffle._limiter import ResourceLimiter from distributed.utils_test import gen_test @@ -20,7 +21,9 @@ def read_bytes(path: Path) -> tuple[bytes, int]: @gen_test() async def test_basic(tmp_path): - async with DiskShardsBuffer(directory=tmp_path, read=read_bytes) as mf: + async with DiskShardsBuffer( + directory=tmp_path, read=read_bytes, memory_limiter=ResourceLimiter(None) + ) as mf: await mf.write({"x": b"0" * 1000, "y": b"1" * 500}) await mf.write({"x": b"0" * 1000, "y": b"1" * 500}) @@ -41,7 +44,9 @@ async def test_basic(tmp_path): @gen_test() async def test_read_before_flush(tmp_path): payload = {"1": b"foo"} - async with DiskShardsBuffer(directory=tmp_path, read=read_bytes) as mf: + async with DiskShardsBuffer( + directory=tmp_path, read=read_bytes, memory_limiter=ResourceLimiter(None) + ) as mf: with pytest.raises(RuntimeError): mf.read(1) @@ -59,7 +64,9 @@ async def test_read_before_flush(tmp_path): @pytest.mark.parametrize("count", [2, 100, 1000]) @gen_test() async def test_many(tmp_path, count): - async with DiskShardsBuffer(directory=tmp_path, read=read_bytes) as mf: + async with DiskShardsBuffer( + directory=tmp_path, read=read_bytes, memory_limiter=ResourceLimiter(None) + ) as mf: d = {i: str(i).encode() * 100 for i in range(count)} for _ in range(10): @@ -84,7 +91,9 @@ async def _process(self, *args: Any, **kwargs: Any) -> None: @gen_test() async def test_exceptions(tmp_path): - async with BrokenDiskShardsBuffer(directory=tmp_path, read=read_bytes) as mf: + async with BrokenDiskShardsBuffer( + directory=tmp_path, read=read_bytes, memory_limiter=ResourceLimiter(None) + ) as mf: await mf.write({"x": [b"0" * 1000], "y": [b"1" * 500]}) while not mf._exception: @@ -114,8 +123,7 @@ async def test_high_pressure_flush_with_exception(tmp_path): payload = {f"shard-{ix}": [f"shard-{ix}".encode() * 100] for ix in range(100)} async with EventuallyBrokenDiskShardsBuffer( - directory=tmp_path, - read=read_bytes, + directory=tmp_path, read=read_bytes, memory_limiter=ResourceLimiter(None) ) as mf: tasks = [] for _ in range(10): diff --git a/distributed/shuffle/tests/test_limiter.py b/distributed/shuffle/tests/test_limiter.py index 3b8dd4a3d1..46253423d3 100644 --- a/distributed/shuffle/tests/test_limiter.py +++ b/distributed/shuffle/tests/test_limiter.py @@ -60,19 +60,19 @@ async def test_limiter_basic(): @gen_test() async def test_unlimited_limiter(): - res = ResourceLimiter() + res = ResourceLimiter(None) - assert res.free + assert res.empty assert res.available is None assert not res.full res.increase(3) - assert not res.free + assert not res.empty assert res.available is None assert not res.full res.increase(2**40) - assert not res.free + assert not res.empty assert res.available is None assert not res.full From 0245170c334edcbf982244f70ff7f59400f6ef24 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 17 Oct 2023 13:01:28 +0200 Subject: [PATCH 4/4] Simplify --- distributed/shuffle/_buffer.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/distributed/shuffle/_buffer.py b/distributed/shuffle/_buffer.py index 2a37d6c380..fd09705d5b 100644 --- a/distributed/shuffle/_buffer.py +++ b/distributed/shuffle/_buffer.py @@ -119,8 +119,7 @@ async def process(self, id: str, shards: list[ShardType], size: int) -> None: "avg_duration" ] + 0.02 * (stop - start) finally: - if self.memory_limiter: - await self.memory_limiter.decrease(size) + await self.memory_limiter.decrease(size) self.bytes_memory -= size async def _process(self, id: str, shards: list[ShardType]) -> None: @@ -198,15 +197,13 @@ async def write(self, data: dict[str, ShardType]) -> None: self.bytes_memory += total_batch_size self.bytes_total += total_batch_size - if self.memory_limiter: - self.memory_limiter.increase(total_batch_size) + self.memory_limiter.increase(total_batch_size) async with self._shards_available: for worker, shard in data.items(): self.shards[worker].append(shard) self.sizes[worker] += sizes[worker] self._shards_available.notify() - if self.memory_limiter: - await self.memory_limiter.wait_for_available() + await self.memory_limiter.wait_for_available() del data assert total_batch_size