Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC]: Allow in-memory buffering of p2p shuffle results #7618

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

wence-
Copy link
Contributor

@wence- wence- commented Mar 6, 2023

This is WIP to avoid the use of disk buffers. Opening for comments to see if this is a reasonable approach that.

I think it should be possible to hook disk buffers on the back of the in memory ones, relatively easily, though I have not done so yet.

cc: @fjetter, @hendrikmakait

@github-actions
Copy link
Contributor

github-actions bot commented Mar 6, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       21 files  +       1         21 suites  +1   10h 42m 19s ⏱️ + 47m 21s
  3 783 tests ±       0    3 673 ✔️ ±       0     107 💤 ±  0  3 ±0 
36 571 runs  +1 381  34 769 ✔️ +1 303  1 799 💤 +78  3 ±0 

For more details on these failures, see this check.

Results for commit f4f05fb. ± Comparison against base commit dae3e87.

♻️ This comment has been updated with latest results.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently not too eager to introduce another hierarchical spill system. I'm worried this gets a bit too complicated and in the way of refactoring the buffer interface. for instance, we've been talking recently more about how we could elevate the interface of the buffers to not accept bytes directly but rather accept shard types, i.e. dataframes or arrays directly. This would require us to refactor how we offload compute, etc.

Long story short, by not subclassing more I think this will all be easier.

I was thinking about introducing something like a "min buffer size" (similar to what currently Buffer.max_message_size is doing. The name is a relict since this started as a comm buffer-only).

while True:
async with self._shards_available:
await self._shards_available.wait_for(_continue)
if self._inputs_done and not self.shards:
break
part_id = max(self.sizes, key=self.sizes.__getitem__)
if self.max_message_size > 0:
size = 0
shards: _List[ShardType] = _List()
while size < self.max_message_size:
try:
shard = self.shards[part_id].pop()
shards.append(shard)
s = sizeof(shard)
size += s
self.sizes[part_id] -= s
except IndexError:
break
finally:
if not self.shards[part_id]:
del self.shards[part_id]
assert not self.sizes[part_id]
del self.sizes[part_id]
else:
shards = self.shards.pop(part_id)
size = self.sizes.pop(part_id)
self._shards_available.notify_all()
await self.process(part_id, shards, size)

the code linked here is running in the background and responsible to take in-memory buffers and spill them to the disk. On the receiving side, this is happening effectively immediately as soon as new shards are available.
A possibility to keep more in memory is to modify this loop logic and the _continue condition

def _continue() -> bool:
return bool(self.shards or self._inputs_done)

with a min size or lower threshold

This would effectively cause the buffer only to write to disk when this threshold is reached, i.e. we can set this to 50% of worker memory and only utilize disk once we're above this threshold.

The downside of this approach is that we currently still require flush to be called before reading again, see

async def flush(self) -> None:
"""Wait until all writes are finished.
This closes the buffer such that no new writes are allowed
"""
async with self._flush_lock:
self._accepts_input = False
async with self._shards_available:
self._shards_available.notify_all()
await self._shards_available.wait_for(
lambda: not self.shards or self._exception or self._inputs_done
)
self._inputs_done = True
self._shards_available.notify_all()
await asyncio.gather(*self._tasks)
if not self._exception:
assert not self.bytes_memory, (type(self), self.bytes_memory)

To get rid of this flush (that can be a second step, of course) we'd need to adjust read to read from disk and memory

def read(self, id: int | str) -> bytes:
"""Read a complete file back into memory"""
self.raise_on_exception()
if not self._inputs_done:
raise RuntimeError("Tried to read from file before done.")
try:
with self.time("read"):
with open(
self.directory / str(id), mode="rb", buffering=100_000_000
) as f:
data = f.read()
size = f.tell()
except FileNotFoundError:
raise KeyError(id)
if data:
self.bytes_read += size
return data
else:
raise KeyError(id)

This is at least how we thought we'd get disk to be optional. cc @hendrikmakait


I realize that the above plan is not easy and rather complex. You mentioned it to be likely easy to put the disk buffer behind the memory buffer. If that's the case, I'm fine experimenting in this direction. I'm slightly worried that the behavior of the buffers is harder to control with a multi layered system

Comment on lines 87 to 88
# TODO: This needs rethinking because it is run on the worker
# where the config may be out of sync.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this limitation. There are many config options that need to be set on the worker, e.g. everything around memory management

@wence-
Copy link
Contributor Author

wence- commented Mar 7, 2023

I realize that the above plan is not easy and rather complex. You mentioned it to be likely easy to put the disk buffer behind the memory buffer. If that's the case, I'm fine experimenting in this direction. I'm slightly worried that the behavior of the buffers is harder to control with a multi layered system

I'll have a play around with these two different ideas. My thought with the disk behind memory buffer had been that you would fill up to some size in the memory buffer and then block and write in one go to disk (rather than trickle out) if you hit the limit. But it sounds like the approach you're suggesting would achieve something similar.

@wence-
Copy link
Contributor Author

wence- commented Mar 17, 2023

I had a go at this alternate approach, with a draft here #7664. This is actually quite a minimal change (just to the diskshardsbuffer).

@hendrikmakait hendrikmakait self-requested a review July 27, 2023 09:03
Will be used to implement in-memory buffering of the disk writes (in
the case where one can afford to keep the entire shuffle in memory).
This just says "I know my shuffle will fit in memory, please don't
touch the disk", gated behind setting the config option
`"distributed.shuffle.p2p.stage_in_memory"` to `True`. This needs to
be done on workers, but it should be the case that everything else is
agnostic to whether workers have set it or not (it does not need to be
a collective decision).
@wence- wence- force-pushed the wence/fea/p2p-buffer-config branch from 5ddd597 to 9cd3c14 Compare August 15, 2023 13:23
@wence-
Copy link
Contributor Author

wence- commented Aug 15, 2023

@hendrikmakait I ported this on top of the _core.py refactor, and think it is ready for a look as one "all or nothing" approach to avoiding disk.

@hendrikmakait
Copy link
Member

Thanks, @wence-! I'll take this for a spin on Coiled with a few benchmarks to see its effect on clusters with low-bandwidth disk I/O.

@wence-
Copy link
Contributor Author

wence- commented Aug 15, 2023

Thanks, @wence-! I'll take this for a spin on Coiled with a few benchmarks to see its effect on clusters with low-bandwidth disk I/O.

I should do some slightly less minimal testing before you burn too much compute...

@hendrikmakait
Copy link
Member

Benchmarks show a mild performance improvement when skipping disk but not as large as expected. It looks like we're held back by GIL contention.

self._disk_buffer: FileShardsBuffer
# TODO: This needs rethinking because it is run on the worker
# where the config may be out of sync.
if config.get("distributed.shuffle.p2p.stage_in_memory", False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think users would not want to have to choose between always writing to disk or staging in memory. Can we expose this as a keyword argument or annotation instead?

directory=directory,
memory_limiter=memory_limiter_disk,
)
if config.get("distributed.shuffle.p2p.stage_in_memory", False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of configuring this at shuffle execution time on the worker, we should instead configure this at graph creation time. This would allow us to specify this on a per-shuffle basis.

key = str(id)
if key not in self._memory_buffers:
raise FileNotFoundError(f"Shard with {id=} is unknown")
buf = self._memory_buffers[str(id)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
buf = self._memory_buffers[str(id)]
buf = self._memory_buffers.pop(str(id))

This will remove the BinaryIO object from the MemoryShardsBuffer upon reading to prevent memory duplication after unpacking. From what I understand, reading should be an exactly-once operation.

@hendrikmakait
Copy link
Member

@wence-: It's been a while since the last activity on this PR. Have you made any progress in the meantime - in particular on testing?

Disk-less P2P has become an important feature request for Coiled. If you don't have the capacity to finish this up in the next couple of days, I will pick this up next week to ensure progress on P2P. Sounds good?

@wence-
Copy link
Contributor Author

wence- commented Oct 16, 2023

@wence-: It's been a while since the last activity on this PR. Have you made any progress in the meantime - in particular on testing?

Disk-less P2P has become an important feature request for Coiled. If you don't have the capacity to finish this up in the next couple of days, I will pick this up next week to ensure progress on P2P. Sounds good?

Apologies, I was pulled onto other things and did not update to note this was parked. Please do pick this up (or feel free to drop it if you have a better design!)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Something is missing shuffle
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants