From 2bf3115740370dd5ddb82f32e4286043063a107c Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 17 Nov 2021 13:33:59 -0500 Subject: [PATCH 1/4] add first draft of constrained spill to disk --- distributed/distributed.yaml | 3 +++ distributed/spill.py | 42 ++++++++++++++++++++++++++------- distributed/tests/test_spill.py | 33 ++++++++++++++++++++++++++ distributed/worker.py | 12 ++++++++++ 4 files changed, 82 insertions(+), 8 deletions(-) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 6949fdd56e..e5162a8844 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -145,6 +145,9 @@ distributed: pause: 0.80 # fraction at which we pause worker threads terminate: 0.95 # fraction at which we terminate the worker + #spill-limit to disk + spill-limit: False + http: routes: - distributed.http.worker.prometheus diff --git a/distributed/spill.py b/distributed/spill.py index 2c849c2447..ef13a31b31 100644 --- a/distributed/spill.py +++ b/distributed/spill.py @@ -1,8 +1,12 @@ from __future__ import annotations from collections.abc import Hashable, Mapping -from functools import partial -from typing import Any +from ctypes import sizeof # noqa: F401 do we use this? +from functools import ( # noqa: F401 do we use this total ordering + partial, + total_ordering, +) +from typing import Any, Literal from zict import Buffer, File, Func @@ -18,17 +22,25 @@ class SpillBuffer(Buffer): spilled_by_key: dict[Hashable, int] spilled_total: int - def __init__(self, spill_directory: str, target: int): + def __init__( + self, + spill_directory: str, + target: int, + disk_limit: int | Literal[False] | None = None, + ): self.spilled_by_key = {} self.spilled_total = 0 - storage = Func( + + self.disk_limit = disk_limit + self.spilled_total_disk = 0 # MAYBE CHOOSE A DIFFERENT NAME + self.storage = Func( partial(serialize_bytelist, on_error="raise"), deserialize_bytes, File(spill_directory), ) super().__init__( {}, - storage, + self.storage, target, weight=self._weight, fast_to_slow_callbacks=[self._on_evict], @@ -49,9 +61,19 @@ def disk(self) -> Mapping[Hashable, Any]: """ return self.slow - @staticmethod - def _weight(key: Hashable, value: Any) -> int: - return safe_sizeof(value) + # @staticmethod + def _weight(self, key: Hashable, value: Any) -> int: + # Disk limit will be false by default so we need to check we have a limit + # otherwise the second condition is always true + # this triggers the right path but will record -1 on the tracking of what's + # on fast so not really working + if self.disk_limit and ( + safe_sizeof(value) + self.spilled_total_disk > self.disk_limit + ): + print("spill-limit reached keeping task in memory") + return -1 # this should keep the key in fast + else: + return safe_sizeof(value) def _on_evict(self, key: Hashable, value: Any) -> None: b = safe_sizeof(value) @@ -71,6 +93,10 @@ def __setitem__(self, key: Hashable, value: Any) -> None: self.spilled_by_key[key] = b self.spilled_total += b + if self.disk_limit: + # track total spilled to disk (on disk) if limit is provided + self.spilled_total_disk += len(self.storage.d.get(key)) + def __delitem__(self, key: Hashable) -> None: self.spilled_total -= self.spilled_by_key.pop(key, 0) super().__delitem__(key) diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index d013b14115..753b972ea1 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -80,3 +80,36 @@ def test_spillbuffer(tmpdir): assert set(buf.disk) == {"d", "e"} assert buf.spilled_by_key == {"d": slarge, "e": slarge} assert buf.spilled_total == slarge * 2 + + +def test_spillbuffer_disk_limit(tmpdir): + buf = SpillBuffer(str(tmpdir), target=200, disk_limit=500) + + # Convenience aliases + assert buf.memory is buf.fast + assert buf.disk is buf.slow + + assert not buf.spilled_by_key + assert buf.spilled_total == 0 + assert buf.spilled_total_disk == 0 + + a, b, c = "a" * 100, "b" * 200, "c" * 200 + + s = sizeof(b) + + buf["a"] = a + assert not buf.disk + assert not buf.spilled_by_key + assert buf.spilled_total == buf.spilled_total_disk == 0 + assert set(buf.memory) == {"a"} + + buf["b"] = b + assert set(buf.disk) == {"b"} + assert buf.spilled_by_key == {"b": s} + assert buf.spilled_total == s + assert buf.spilled_total_disk == len(buf.storage.d.get("b")) + + # add a key that will go over the disk limit, should keep it in fast + buf["c"] = c + assert set(buf.memory) == {"a", "c"} + # this works but the count of what is in fast is off. since this will sum a -1 but diff --git a/distributed/worker.py b/distributed/worker.py index bbd707ae7b..e00e91e0fc 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -394,6 +394,9 @@ class Worker(ServerNode): memory_pause_fraction: float or False Fraction of memory at which we stop running new tasks (default: read from config key distributed.worker.memory.pause) + spill_limit: int, string or False (### NOT SURE WHAT TYPE YET) + Limit of number of bytes to be spilled on disk. + (default: read from config key distributed.worker.memory.spill-limit) executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], "offload" The executor(s) to use. Depending on the type, it has the following meanings: - Executor instance: The default executor. @@ -512,6 +515,7 @@ class Worker(ServerNode): memory_target_fraction: float | Literal[False] memory_spill_fraction: float | Literal[False] memory_pause_fraction: float | Literal[False] + spill_limit: int | Literal[False] data: MutableMapping[str, Any] # {task key: task payload} actors: dict[str, Actor | None] loop: IOLoop @@ -563,6 +567,7 @@ def __init__( memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, + spill_limit: str | Literal[False] | None = None, extensions: list[type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = DEFAULT_METRICS, startup_information: Mapping[ @@ -811,6 +816,12 @@ def __init__( else dask.config.get("distributed.worker.memory.pause") ) + self.spill_limit = ( + parse_bytes(spill_limit) + if spill_limit is not None + else dask.config.get("distributed.worker.memory.spill-limit") + ) + if isinstance(data, MutableMapping): self.data = data elif callable(data): @@ -829,6 +840,7 @@ def __init__( * (self.memory_target_fraction or self.memory_spill_fraction) ) or sys.maxsize, + disk_limit=self.spill_limit, ) else: self.data = {} From d1d1019cb34a9e7765c38d521c2fa06e567ebab4 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 17 Nov 2021 14:56:17 -0500 Subject: [PATCH 2/4] fix Literal import --- distributed/spill.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/spill.py b/distributed/spill.py index ef13a31b31..4fd3050169 100644 --- a/distributed/spill.py +++ b/distributed/spill.py @@ -6,8 +6,9 @@ partial, total_ordering, ) -from typing import Any, Literal +from typing import Any +from typing_extensions import Literal from zict import Buffer, File, Func from .protocol import deserialize_bytes, serialize_bytelist From 54c02a312685717a1289fc49c66e8b248fc4ff7a Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 18 Nov 2021 10:14:43 -0500 Subject: [PATCH 3/4] add spill-limit to ditributed schema and fix typo --- distributed/distributed-schema.yaml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index f5b7b73a5e..08311b6399 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -501,10 +501,17 @@ properties: description: >- When the process memory reaches this level the nanny process will kill the worker (if a nanny is present) + + spill-limit: + oneOf: + - {type: string} + - {enum: [false]} + description: | + A limit on the spilling to disk, after this limit is hit we stop writing to disk. http: type: object - decription: Settings for Dask's embedded HTTP Server + description: Settings for Dask's embedded HTTP Server properties: routes: type: array From 81b12da2dcc1a6b958b4e4db4682bade536d7d41 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 18 Nov 2021 10:17:39 -0500 Subject: [PATCH 4/4] avoid super setitem use customed version (still not working) --- distributed/spill.py | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/distributed/spill.py b/distributed/spill.py index 4fd3050169..caf58251e1 100644 --- a/distributed/spill.py +++ b/distributed/spill.py @@ -62,19 +62,19 @@ def disk(self) -> Mapping[Hashable, Any]: """ return self.slow - # @staticmethod - def _weight(self, key: Hashable, value: Any) -> int: + @staticmethod + def _weight(key: Hashable, value: Any) -> int: # Disk limit will be false by default so we need to check we have a limit # otherwise the second condition is always true # this triggers the right path but will record -1 on the tracking of what's # on fast so not really working - if self.disk_limit and ( - safe_sizeof(value) + self.spilled_total_disk > self.disk_limit - ): - print("spill-limit reached keeping task in memory") - return -1 # this should keep the key in fast - else: - return safe_sizeof(value) + # if self.disk_limit and ( + # safe_sizeof(value) + self.spilled_total_disk > self.disk_limit + # ): + # print("spill-limit reached keeping task in memory") + # return -1 # this should keep the key in fast + # else: + return safe_sizeof(value) def _on_evict(self, key: Hashable, value: Any) -> None: b = safe_sizeof(value) @@ -86,7 +86,21 @@ def _on_retrieve(self, key: Hashable, value: Any) -> None: def __setitem__(self, key: Hashable, value: Any) -> None: self.spilled_total -= self.spilled_by_key.pop(key, 0) - super().__setitem__(key, value) + # super().__setitem__(key, value) + + if self.weight(key, value) <= self.n or ( + self.disk_limit + and (safe_sizeof(value) + self.spilled_total_disk > self.disk_limit) + ): + print("im here") + if key in self.slow: + del self.slow[key] + self.fast[key] = value + else: + if key in self.fast: + del self.fast[key] + self.slow[key] = value + if key in self.slow: # value is individually larger than target so it went directly to slow. # _on_evict was not called.