-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC]: Allow in-memory buffering of p2p shuffle results #7618
base: main
Are you sure you want to change the base?
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 21 files + 1 21 suites +1 10h 42m 19s ⏱️ + 47m 21s For more details on these failures, see this check. Results for commit f4f05fb. ± Comparison against base commit dae3e87. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm currently not too eager to introduce another hierarchical spill system. I'm worried this gets a bit too complicated and in the way of refactoring the buffer interface. for instance, we've been talking recently more about how we could elevate the interface of the buffers to not accept bytes directly but rather accept shard types, i.e. dataframes or arrays directly. This would require us to refactor how we offload compute, etc.
Long story short, by not subclassing more I think this will all be easier.
I was thinking about introducing something like a "min buffer size" (similar to what currently Buffer.max_message_size
is doing. The name is a relict since this started as a comm buffer-only).
distributed/distributed/shuffle/_buffer.py
Lines 140 to 167 in 310fc95
while True: | |
async with self._shards_available: | |
await self._shards_available.wait_for(_continue) | |
if self._inputs_done and not self.shards: | |
break | |
part_id = max(self.sizes, key=self.sizes.__getitem__) | |
if self.max_message_size > 0: | |
size = 0 | |
shards: _List[ShardType] = _List() | |
while size < self.max_message_size: | |
try: | |
shard = self.shards[part_id].pop() | |
shards.append(shard) | |
s = sizeof(shard) | |
size += s | |
self.sizes[part_id] -= s | |
except IndexError: | |
break | |
finally: | |
if not self.shards[part_id]: | |
del self.shards[part_id] | |
assert not self.sizes[part_id] | |
del self.sizes[part_id] | |
else: | |
shards = self.shards.pop(part_id) | |
size = self.sizes.pop(part_id) | |
self._shards_available.notify_all() | |
await self.process(part_id, shards, size) |
the code linked here is running in the background and responsible to take in-memory buffers and spill them to the disk. On the receiving side, this is happening effectively immediately as soon as new shards are available.
A possibility to keep more in memory is to modify this loop logic and the _continue
condition
distributed/distributed/shuffle/_buffer.py
Lines 137 to 138 in 310fc95
def _continue() -> bool: | |
return bool(self.shards or self._inputs_done) |
with a min size or lower threshold
This would effectively cause the buffer only to write to disk when this threshold is reached, i.e. we can set this to 50% of worker memory and only utilize disk once we're above this threshold.
The downside of this approach is that we currently still require flush
to be called before reading again, see
distributed/distributed/shuffle/_buffer.py
Lines 224 to 241 in 310fc95
async def flush(self) -> None: | |
"""Wait until all writes are finished. | |
This closes the buffer such that no new writes are allowed | |
""" | |
async with self._flush_lock: | |
self._accepts_input = False | |
async with self._shards_available: | |
self._shards_available.notify_all() | |
await self._shards_available.wait_for( | |
lambda: not self.shards or self._exception or self._inputs_done | |
) | |
self._inputs_done = True | |
self._shards_available.notify_all() | |
await asyncio.gather(*self._tasks) | |
if not self._exception: | |
assert not self.bytes_memory, (type(self), self.bytes_memory) |
To get rid of this flush (that can be a second step, of course) we'd need to adjust read
to read from disk and memory
distributed/distributed/shuffle/_disk.py
Lines 77 to 97 in 310fc95
def read(self, id: int | str) -> bytes: | |
"""Read a complete file back into memory""" | |
self.raise_on_exception() | |
if not self._inputs_done: | |
raise RuntimeError("Tried to read from file before done.") | |
try: | |
with self.time("read"): | |
with open( | |
self.directory / str(id), mode="rb", buffering=100_000_000 | |
) as f: | |
data = f.read() | |
size = f.tell() | |
except FileNotFoundError: | |
raise KeyError(id) | |
if data: | |
self.bytes_read += size | |
return data | |
else: | |
raise KeyError(id) |
This is at least how we thought we'd get disk to be optional. cc @hendrikmakait
I realize that the above plan is not easy and rather complex. You mentioned it to be likely easy to put the disk buffer behind the memory buffer. If that's the case, I'm fine experimenting in this direction. I'm slightly worried that the behavior of the buffers is harder to control with a multi layered system
# TODO: This needs rethinking because it is run on the worker | ||
# where the config may be out of sync. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with this limitation. There are many config options that need to be set on the worker, e.g. everything around memory management
I'll have a play around with these two different ideas. My thought with the disk behind memory buffer had been that you would fill up to some size in the memory buffer and then block and write in one go to disk (rather than trickle out) if you hit the limit. But it sounds like the approach you're suggesting would achieve something similar. |
I had a go at this alternate approach, with a draft here #7664. This is actually quite a minimal change (just to the diskshardsbuffer). |
Will be used to implement in-memory buffering of the disk writes (in the case where one can afford to keep the entire shuffle in memory).
This just says "I know my shuffle will fit in memory, please don't touch the disk", gated behind setting the config option `"distributed.shuffle.p2p.stage_in_memory"` to `True`. This needs to be done on workers, but it should be the case that everything else is agnostic to whether workers have set it or not (it does not need to be a collective decision).
5ddd597
to
9cd3c14
Compare
@hendrikmakait I ported this on top of the |
Thanks, @wence-! I'll take this for a spin on Coiled with a few benchmarks to see its effect on clusters with low-bandwidth disk I/O. |
I should do some slightly less minimal testing before you burn too much compute... |
Benchmarks show a mild performance improvement when skipping disk but not as large as expected. It looks like we're held back by GIL contention. |
self._disk_buffer: FileShardsBuffer | ||
# TODO: This needs rethinking because it is run on the worker | ||
# where the config may be out of sync. | ||
if config.get("distributed.shuffle.p2p.stage_in_memory", False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think users would not want to have to choose between always writing to disk or staging in memory. Can we expose this as a keyword argument or annotation instead?
directory=directory, | ||
memory_limiter=memory_limiter_disk, | ||
) | ||
if config.get("distributed.shuffle.p2p.stage_in_memory", False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of configuring this at shuffle execution time on the worker, we should instead configure this at graph creation time. This would allow us to specify this on a per-shuffle basis.
key = str(id) | ||
if key not in self._memory_buffers: | ||
raise FileNotFoundError(f"Shard with {id=} is unknown") | ||
buf = self._memory_buffers[str(id)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buf = self._memory_buffers[str(id)] | |
buf = self._memory_buffers.pop(str(id)) |
This will remove the BinaryIO
object from the MemoryShardsBuffer
upon reading to prevent memory duplication after unpacking. From what I understand, reading should be an exactly-once operation.
@wence-: It's been a while since the last activity on this PR. Have you made any progress in the meantime - in particular on testing? Disk-less P2P has become an important feature request for Coiled. If you don't have the capacity to finish this up in the next couple of days, I will pick this up next week to ensure progress on P2P. Sounds good? |
Apologies, I was pulled onto other things and did not update to note this was parked. Please do pick this up (or feel free to drop it if you have a better design!) |
This is WIP to avoid the use of disk buffers. Opening for comments to see if this is a reasonable approach that.
I think it should be possible to hook disk buffers on the back of the in memory ones, relatively easily, though I have not done so yet.
cc: @fjetter, @hendrikmakait