Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Constrained spill #5543

Merged
merged 127 commits into from
Feb 16, 2022
Merged
Changes from 1 commit
Commits
Show all changes
127 commits
Select commit Hold shift + click to select a range
cd07540
Constrain spill to disk to predefined capacity
crusaderky Nov 19, 2021
67d067e
this works except when key > target and max spill
ncclementi Nov 23, 2021
d12e3d4
Update distributed/spill.py
ncclementi Nov 24, 2021
25f3e63
use len for sieze of bytes
ncclementi Nov 24, 2021
6e545d0
Merge branch 'constrained_spill' of github.com:ncclementi/distributed…
ncclementi Nov 24, 2021
4e95614
modify spill buffer to use new lru
ncclementi Nov 29, 2021
0fd78a8
remove comment use for notes
ncclementi Nov 29, 2021
87f1675
add max-spill to config
ncclementi Nov 29, 2021
42a34c4
add max-spill to worker
ncclementi Nov 29, 2021
9bc1659
use False as default for max_spill on spillbuffer
ncclementi Nov 29, 2021
d27070a
adapt test_spill to use new SpillBuffer
ncclementi Nov 29, 2021
93b3f7b
add tests for spill buffer with max_spill limit
ncclementi Nov 29, 2021
0f1c728
add not to trigger warning properly on zict version
ncclementi Nov 29, 2021
344b974
extend SpillBuffer docstring to mention max_spill
ncclementi Nov 29, 2021
4980086
use consitently slow/fast
ncclementi Dec 1, 2021
5f9226a
Update max_spill annotation on worker
ncclementi Dec 1, 2021
410e7d4
update max_spill reading
ncclementi Dec 1, 2021
0a82455
Update max-spill in distributed-schema.yaml
ncclementi Dec 1, 2021
c723d00
Update max spill definition/comment
ncclementi Dec 1, 2021
291adfc
fix import for type checking
ncclementi Dec 1, 2021
ab04c6d
Merge branch 'constrained_spill' of github.com:ncclementi/distributed…
ncclementi Dec 1, 2021
bcb4d3a
add comment to fix zict version warning later
ncclementi Dec 1, 2021
225a153
add comment to fix zict version later
ncclementi Dec 1, 2021
ed7edf4
remove unnecessary comments
ncclementi Dec 1, 2021
1390bc4
add test cases for overwrite key in fast/slow bigger than max_spill
ncclementi Dec 1, 2021
b9bf50a
add comment
ncclementi Dec 1, 2021
977fdd1
add spill logging warning catching to tests
ncclementi Dec 1, 2021
edaaec2
use count str method to check for spill raises
ncclementi Dec 1, 2021
8c5ba04
use async context manager in test_spill_to_disk
ncclementi Dec 2, 2021
5870c1b
add spill disk constrained worker test
ncclementi Dec 2, 2021
351c8be
include possibility of max_spill = 0
ncclementi Dec 3, 2021
f5a7038
modify test for zict > 2.0 where we keep keys when exception happens
ncclementi Dec 3, 2021
68aa83f
fix extra line of code
ncclementi Dec 3, 2021
5c7fcea
fix version check on spill.py
ncclementi Dec 3, 2021
84a43c5
pin zict temporarily to zict slow_raise branch
ncclementi Dec 3, 2021
6744431
attempt to fix test_fail_write_to_disk (broken)
ncclementi Dec 15, 2021
52fd06c
attempt to test bad key
ncclementi Dec 16, 2021
4f62f69
wrong limit size
ncclementi Dec 16, 2021
f796811
remove breakpoint
ncclementi Dec 16, 2021
009fc74
Update distributed/spill.py
ncclementi Dec 17, 2021
e167e5b
Update distributed/spill.py
ncclementi Dec 17, 2021
0fb62e9
Update distributed/spill.py
ncclementi Dec 17, 2021
8608d7b
Update distributed/spill.py
ncclementi Dec 17, 2021
798a97a
Update distributed/tests/test_spill.py
ncclementi Dec 17, 2021
844d02a
Update distributed/tests/test_spill.py
ncclementi Dec 17, 2021
8b674fd
Update distributed/tests/test_spill.py
ncclementi Dec 17, 2021
52b46bd
Apply suggestions from code review
ncclementi Dec 17, 2021
0fcd218
include review comments
ncclementi Dec 17, 2021
a9bf0f9
fix exception raising/catching related to comms
ncclementi Jan 11, 2022
9b2c217
fix typos
ncclementi Jan 11, 2022
d9642b1
add tests
ncclementi Jan 11, 2022
f60f9e8
add skipif windows and modify comment
ncclementi Jan 12, 2022
f10798d
fix OSError catching when writing to disk directly
ncclementi Jan 13, 2022
d38088f
remove prints
ncclementi Jan 13, 2022
7cf9415
Merge branch 'main' of github.com:dask/distributed into constrained_s…
ncclementi Jan 14, 2022
77fcf1c
fix typos in distributed-schema.yaml
ncclementi Jan 14, 2022
f561f31
fix try except when adding to dict
ncclementi Jan 18, 2022
65815c1
Apply suggestions from code review
ncclementi Jan 18, 2022
fab2c34
go back to loosversion
ncclementi Jan 18, 2022
34851e9
fix some asserts according to new logs
ncclementi Jan 18, 2022
ff7ed20
limit logging by trcking logged keys
ncclementi Jan 18, 2022
3db086a
add comment
ncclementi Jan 18, 2022
44ecf6a
better typing
ncclementi Jan 18, 2022
363a646
add minimum loggin time interval and cleanup
ncclementi Jan 21, 2022
545ede8
replace LooseVersion for parse_version
ncclementi Jan 21, 2022
7c181c1
Update spill docstring
ncclementi Feb 2, 2022
81af9ca
fix logging tracking
ncclementi Feb 2, 2022
2227414
update logging tracking
ncclementi Feb 2, 2022
fb0984a
update logging tracking
ncclementi Feb 2, 2022
1534d3a
fix condition based on new variable name
ncclementi Feb 2, 2022
3a15ab3
when pickle error log
ncclementi Feb 2, 2022
026b19a
add checks
ncclementi Feb 2, 2022
cdc6075
clean Slow
ncclementi Feb 2, 2022
30ababd
use importorskip for zict
ncclementi Feb 2, 2022
8bb8e5a
simplify decorator for zict version
ncclementi Feb 2, 2022
7f43b31
update comments test_spill
ncclementi Feb 2, 2022
a7d1df6
update comment
ncclementi Feb 3, 2022
2f25e3a
update comment
ncclementi Feb 3, 2022
98eec51
fix typo itroduced in suggestions
ncclementi Feb 3, 2022
f96b5d5
avoid skipif code duplication
ncclementi Feb 3, 2022
9a2159f
fix type: keys are converted from hashable to str when in transit
ncclementi Feb 3, 2022
112aab4
undo a type changed by mistake
ncclementi Feb 3, 2022
d7915bb
use better exception in pytest raise
ncclementi Feb 3, 2022
8a42f01
add missing case test_spill
ncclementi Feb 3, 2022
f2596bb
add asserts for fast and slow in fail to serialize test
ncclementi Feb 3, 2022
c6d90e3
use os instead of subprocess call to modify tmpdir permissions
ncclementi Feb 3, 2022
8fba457
use a better size to avoid confusion
ncclementi Feb 7, 2022
8ab7ee7
add missing case on worker test constrained spill
ncclementi Feb 7, 2022
6616991
use better exception
ncclementi Feb 7, 2022
c875000
re-write Bad class
ncclementi Feb 7, 2022
b2f3bf3
handle key eviction when no target
ncclementi Feb 9, 2022
4254ebe
add test for spill buffer evict method
ncclementi Feb 9, 2022
0a5a458
add test_fail_to_write_disk_evict
ncclementi Feb 10, 2022
bce8f0c
add docs to spill buffer setitem and evict
ncclementi Feb 10, 2022
1322dd4
Merge branch 'main' of github.com:dask/distributed into constrained_s…
ncclementi Feb 10, 2022
f1fd40e
Apply suggestions from code review to sppill.py
ncclementi Feb 10, 2022
4675f7a
Apply suggestion in test_spill
ncclementi Feb 10, 2022
ccaffaa
Apply more suggestion in test_spill.py
ncclementi Feb 10, 2022
f478089
Apply more suggestion in worker and test_worker.py
ncclementi Feb 10, 2022
c446c29
fix merging issue
ncclementi Feb 10, 2022
6da3fd8
fix test assertion
ncclementi Feb 10, 2022
4c861b3
add test and problems
ncclementi Feb 10, 2022
407d609
Update distributed/tests/test_spill.py
crusaderky Feb 11, 2022
36fa72a
Update distributed/tests/test_spill.py
crusaderky Feb 11, 2022
838b513
Update distributed/tests/test_worker.py
crusaderky Feb 11, 2022
0e85804
Merge branch 'main' of github.com:dask/distributed into constrained_s…
ncclementi Feb 11, 2022
1283044
Apply suggestions from code review
ncclementi Feb 11, 2022
ee41e1c
Merge branch 'constrained_spill' of github.com:ncclementi/distributed…
ncclementi Feb 11, 2022
645782c
Code review
crusaderky Feb 11, 2022
60dabf5
lint
crusaderky Feb 11, 2022
bfefc27
fix flaky test
crusaderky Feb 11, 2022
2299a74
Merge pull request #1 from crusaderky/constrained_spill_2
ncclementi Feb 11, 2022
25d0ff8
Update distributed/spill.py
crusaderky Feb 14, 2022
a5e45cb
Update distributed/tests/test_worker.py
crusaderky Feb 14, 2022
19e16ad
Update distributed/tests/test_worker.py
crusaderky Feb 14, 2022
55a2b49
Further code review
crusaderky Feb 14, 2022
9d68819
Merge pull request #3 from crusaderky/constrained_spill_2
ncclementi Feb 14, 2022
d01d139
Update continuous_integration/environment-3.7.yaml
crusaderky Feb 14, 2022
4591390
Update continuous_integration/environment-3.8.yaml
crusaderky Feb 14, 2022
998354c
Merge branch 'constrained_spill' of github.com:ncclementi/distributed…
ncclementi Feb 14, 2022
d9014e5
Merge branch 'main' of github.com:dask/distributed into constrained_s…
ncclementi Feb 14, 2022
5dd0fe2
compatibility with zict 2.0
ncclementi Feb 14, 2022
0cbba2d
Cleaner implementation of requires zict 210
ncclementi Feb 15, 2022
a7342c4
Update distributed/spill.py
crusaderky Feb 15, 2022
2e55210
Update distributed/spill.py
crusaderky Feb 15, 2022
2bca7d5
Update distributed/spill.py
crusaderky Feb 15, 2022
e27c7e3
Update continuous_integration/environment-3.9.yaml
crusaderky Feb 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions distributed/spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .sizeof import safe_sizeof

logger = logging.getLogger(__name__)
has_zict_210 = parse_version(zict.__version__) > parse_version("2.0.0")

ncclementi marked this conversation as resolved.
Show resolved Hide resolved

class SpillBuffer(zict.Buffer):
Expand Down Expand Up @@ -48,9 +49,7 @@ def __init__(
min_log_interval: float = 2,
):

if max_spill is not False and parse_version(zict.__version__) <= parse_version(
"2.0.0"
):
if max_spill and not has_zict_210:
crusaderky marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("zict > 2.0.0 required to set max_weight")

super().__init__(
Expand Down Expand Up @@ -102,10 +101,11 @@ def handle_errors(self, key: str | None):
# This happens only when the key is individually larger than target.
# The exception will be caught by Worker and logged; the status of
# the task will be set to error.
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
if parse_version(zict.__version__) <= parse_version("2.0.0"):
pass
else:
if has_zict_210:
del self[key]
else:
assert key not in self.fast
assert key not in self.slow
raise orig_e
else:
# The key we just inserted is smaller than target, but it caused
Expand Down Expand Up @@ -142,10 +142,11 @@ def __setitem__(self, key: str, value: Any) -> None:
super().__setitem__(key, value)
self.logged_pickle_errors.discard(key)
except HandledError:
if parse_version(zict.__version__) <= parse_version("2.0.0"):
pass
else:
if has_zict_210:
assert key in self.fast
else:
assert key not in self.fast
logger.error("Key %s lost. Please upgrade to zict >= 2.1.0", key)
assert key not in self.slow

def evict(self) -> int:
Expand Down Expand Up @@ -231,10 +232,14 @@ def __setitem__(self, key: str, value: Any) -> None:

pickled_size = sum(len(frame) for frame in pickled)

ncclementi marked this conversation as resolved.
Show resolved Hide resolved
# Thanks to Buffer.__setitem__, we never update existing keys in slow,
# but always delete them and reinsert them.
assert key not in self.d
assert key not in self.weight_by_key
if has_zict_210:
# Thanks to Buffer.__setitem__, we never update existing
# keys in slow, but always delete them and reinsert them.
assert key not in self.d
assert key not in self.weight_by_key
else:
self.d.pop(key, 0)
crusaderky marked this conversation as resolved.
Show resolved Hide resolved
self.total_weight -= self.weight_by_key.pop(key, PickledSize(0, 0))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crusaderky it seems this is breaking the linting, what is PickledSize in here, it's not defined?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, it's from #5805

crusaderky marked this conversation as resolved.
Show resolved Hide resolved

if (
self.max_weight is not False
Expand Down