Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail P2PShuffle gracefully upon worker failure #7326

Merged
merged 95 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from 93 commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
46860d5
Minimal checks on closed shuffle
hendrikmakait Nov 16, 2022
43b4cf7
Close Shuffle and WorkerExtension
hendrikmakait Nov 16, 2022
c86bc04
Drop unnecessary
hendrikmakait Nov 16, 2022
f8b59d6
Fail shuffle when worker is removed
hendrikmakait Nov 17, 2022
5c24302
Do not offload if closed
hendrikmakait Nov 17, 2022
f6a2248
Serialize exception
hendrikmakait Nov 17, 2022
cac380d
Avoid test deadlocking on wait_for_state
hendrikmakait Nov 17, 2022
8c07589
Use handle_task_erred
hendrikmakait Nov 17, 2022
012c690
Add test for input-only worker
hendrikmakait Nov 17, 2022
7634a60
Improve exception
hendrikmakait Nov 17, 2022
1af605e
Remember erred shuffles
hendrikmakait Nov 17, 2022
d7db4ba
Clean up shuffle state on workers
hendrikmakait Nov 17, 2022
7fdfe60
Make tests event-based
hendrikmakait Nov 18, 2022
44452c5
Improve tests
hendrikmakait Nov 18, 2022
2e1cf51
Add tests
hendrikmakait Nov 18, 2022
184823c
Additional (deadlocking) test
hendrikmakait Nov 18, 2022
c7b84c4
raise-protected methods
hendrikmakait Nov 18, 2022
a6a9445
Refactor offloading of repartitioning
hendrikmakait Nov 18, 2022
59bc3b1
Improve tests and drop reschedule
hendrikmakait Nov 18, 2022
acfbf30
Clean up
hendrikmakait Nov 18, 2022
8e97c7c
Clean up scheduler and adjust tests
hendrikmakait Nov 21, 2022
5f92727
Idempotency
hendrikmakait Nov 21, 2022
b36862f
Remove race
hendrikmakait Nov 21, 2022
bc16a47
Fail on all participating workers
hendrikmakait Nov 21, 2022
40e5b4b
Clean up participating workers
hendrikmakait Nov 21, 2022
646c721
Properly wait for cleanup
hendrikmakait Nov 21, 2022
da6f160
Remember completed shuffle should workers fail down the line
hendrikmakait Nov 21, 2022
f9c4db3
Revert completed_shuffles
hendrikmakait Nov 21, 2022
6e34ac3
Remove warnings
hendrikmakait Nov 21, 2022
871ddb7
Additional test
hendrikmakait Nov 21, 2022
fea0c7d
Fix tests on Windows
hendrikmakait Nov 21, 2022
0ed9f61
Do not try to transition barrier to erred (it wont work)
hendrikmakait Nov 22, 2022
3a50c7e
Fix deadlock (WIP)
hendrikmakait Nov 22, 2022
fd37f3b
Add transition no-worker -> erred
hendrikmakait Nov 22, 2022
f13cead
Transitions tasks to erred
hendrikmakait Nov 22, 2022
c567651
Improve error messages
hendrikmakait Nov 22, 2022
987e3a3
Improve barrier tests
hendrikmakait Nov 22, 2022
3281152
Test deadlock on last shuffle task
hendrikmakait Nov 22, 2022
e48fe17
Drop unnecessary test
hendrikmakait Nov 22, 2022
bed5c98
TODO
hendrikmakait Nov 22, 2022
23408c3
Fix test_closed_worker_during_barrier
hendrikmakait Nov 23, 2022
3339026
Add test
hendrikmakait Nov 23, 2022
49e3a81
Relax test
hendrikmakait Nov 23, 2022
e83aceb
Remove comment
hendrikmakait Nov 23, 2022
f1c1478
Add docstring
hendrikmakait Nov 23, 2022
426c4fc
Add seed
hendrikmakait Nov 23, 2022
c851887
Relax test
hendrikmakait Nov 23, 2022
f81fff0
Remove comparison
hendrikmakait Nov 23, 2022
79c2834
Improve test runtime and remove slow markers
hendrikmakait Nov 23, 2022
57ccc17
Use raises_with_cause
hendrikmakait Nov 23, 2022
e842e02
Improve docstring
hendrikmakait Nov 23, 2022
e0482f1
Cleaner exception propagation
hendrikmakait Nov 23, 2022
f6efc70
Ensure that fail waits for close
hendrikmakait Nov 23, 2022
96d6aed
Proper shuffle_closed_events
hendrikmakait Nov 24, 2022
5cb3be5
Privatizing
hendrikmakait Nov 24, 2022
8462d93
Merge branch 'main' into close-shuffle
hendrikmakait Nov 24, 2022
a0a6881
Fixes after merge
hendrikmakait Nov 24, 2022
608dea5
Fix docstring
hendrikmakait Nov 24, 2022
307023c
Simplify
hendrikmakait Nov 24, 2022
cf3fce4
Adjust tests
hendrikmakait Nov 25, 2022
7a8f24d
Remove superfluous copy
hendrikmakait Nov 28, 2022
2f5e676
No raise_if_closed on worker extension
hendrikmakait Nov 28, 2022
720b841
Replace idempotency in ShuffleWorkerExtension.close with assertion
hendrikmakait Nov 28, 2022
e392b32
Merge branch 'main' into close-shuffle
hendrikmakait Nov 29, 2022
c9dc954
Attempt to fix shuffle resilience
fjetter Nov 29, 2022
a3229f7
WIP: Finish alternative approach
hendrikmakait Dec 1, 2022
e31b566
Simplify
hendrikmakait Dec 1, 2022
c6b1d30
Remove chaining
hendrikmakait Dec 2, 2022
4865cbe
Fix cleanup
hendrikmakait Dec 2, 2022
588306f
Add tests
hendrikmakait Dec 2, 2022
b44ebe2
Improve tests
hendrikmakait Dec 2, 2022
550dada
Drop _closed_events on scheduler extension
hendrikmakait Dec 2, 2022
7a186c9
Fix cleanup and its testing
hendrikmakait Dec 2, 2022
91516ac
Ignore stale heartbeats
hendrikmakait Dec 2, 2022
9a28675
Fix bug in state machine
hendrikmakait Dec 2, 2022
3c09d8f
Fix race
hendrikmakait Dec 2, 2022
7d7ec2f
Simplify
hendrikmakait Dec 5, 2022
35ca74b
Single-source of barrier_key
hendrikmakait Dec 5, 2022
deaa9a4
Optimize barrier
hendrikmakait Dec 5, 2022
06c0cdc
Fix typo
hendrikmakait Dec 5, 2022
5b9ea61
Add test for early forgetting
hendrikmakait Dec 6, 2022
fd7451b
Clean worker state once forgotten
hendrikmakait Dec 6, 2022
cec76b9
Add tombstone
hendrikmakait Dec 6, 2022
8a447d5
More explicit variable naming
hendrikmakait Dec 6, 2022
d3f8fe8
Fix overwritten worker
hendrikmakait Dec 6, 2022
6e4273a
XFAIL tests
hendrikmakait Dec 6, 2022
e4de791
Increase test size
hendrikmakait Dec 6, 2022
a33ce0e
Ignore leaked subprocess
hendrikmakait Dec 6, 2022
3163b3b
Fix test
hendrikmakait Dec 6, 2022
f8c4adb
Merge branch 'main' into close-shuffle
hendrikmakait Dec 6, 2022
7d54aac
Add test for removing bystander
hendrikmakait Dec 8, 2022
5f1a41f
Rename
hendrikmakait Dec 8, 2022
3c90f50
Remove indirection
hendrikmakait Dec 9, 2022
9040137
Unskip test
hendrikmakait Dec 9, 2022
bc317e2
Skip again
hendrikmakait Dec 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions distributed/shuffle/_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class ShardsBuffer(Generic[ShardType]):
bytes_written: int
bytes_read: int

_closed: bool
_done: bool
_accepts_input: bool
_inputs_done: bool
_exception: None | Exception
_tasks: list[asyncio.Task]
_shards_available: asyncio.Condition
Expand All @@ -64,12 +64,12 @@ def __init__(
concurrency_limit: int = 2,
max_message_size: int = -1,
) -> None:
self._closed = False
self._accepts_input = True
self.shards = defaultdict(list)
self.sizes = defaultdict(int)
self._exception = None
self.concurrency_limit = concurrency_limit
self._done = False
self._inputs_done = False
self.memory_limiter = memory_limiter
self.diagnostics: dict[str, float] = defaultdict(float)
self._tasks = [
Expand Down Expand Up @@ -105,7 +105,7 @@ async def process(self, id: str, shards: list[pa.Table], size: int) -> None:

except Exception as e:
self._exception = e
self._done = True
self._inputs_done = True
stop = time()

self.diagnostics["avg_size"] = (
Expand All @@ -131,12 +131,12 @@ def empty(self) -> bool:

async def _background_task(self) -> None:
def _continue() -> bool:
return bool(self.shards or self._done)
return bool(self.shards or self._inputs_done)

while True:
async with self._shards_available:
await self._shards_available.wait_for(_continue)
if self._done and not self.shards:
if self._inputs_done and not self.shards:
break
part_id = max(self.sizes, key=self.sizes.__getitem__)
if self.max_message_size > 0:
Expand Down Expand Up @@ -175,7 +175,7 @@ async def write(self, data: dict[str, list[ShardType]]) -> None:

if self._exception:
raise self._exception
if self._closed or self._done:
if not self._accepts_input or self._inputs_done:
raise RuntimeError(f"Trying to put data in closed {self}.")

if not data:
Expand Down Expand Up @@ -215,13 +215,13 @@ async def flush(self) -> None:
This closes the buffer such that no new writes are allowed
"""
async with self._flush_lock:
self._closed = True
self._accepts_input = False
async with self._shards_available:
self._shards_available.notify_all()
await self._shards_available.wait_for(
lambda: not self.shards or self._exception or self._done
lambda: not self.shards or self._exception or self._inputs_done
)
self._done = True
self._inputs_done = True
self._shards_available.notify_all()

await asyncio.gather(*self._tasks)
Expand All @@ -238,8 +238,8 @@ async def close(self) -> None:
assert not self.bytes_memory, (type(self), self.bytes_memory)
for t in self._tasks:
t.cancel()
self._closed = True
self._done = True
self._accepts_input = False
self._inputs_done = True
self.shards.clear()
self.bytes_memory = 0
async with self._shards_available:
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/_disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def _process(self, id: str, shards: list[pa.Buffer]) -> None:
def read(self, id: int | str) -> pa.Table:
"""Read a complete file back into memory"""
self.raise_on_exception()
if not self._done:
if not self._inputs_done:
raise RuntimeError("Tried to read from file before done.")
parts = []

Expand Down
19 changes: 14 additions & 5 deletions distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,28 @@ def shuffle_transfer(
npartitions: int,
column: str,
) -> None:
_get_worker_extension().add_partition(
input, id, npartitions=npartitions, column=column
)
try:
_get_worker_extension().add_partition(
input, id, npartitions=npartitions, column=column
)
except Exception:
raise RuntimeError(f"shuffle_transfer failed during shuffle {id}")


def shuffle_unpack(
id: ShuffleId, output_partition: int, barrier: object
) -> pd.DataFrame:
return _get_worker_extension().get_output_partition(id, output_partition)
try:
return _get_worker_extension().get_output_partition(id, output_partition)
except Exception:
raise RuntimeError(f"shuffle_unpack failed during shuffle {id}")


def shuffle_barrier(id: ShuffleId, transfers: list[None]) -> None:
return _get_worker_extension().barrier(id)
try:
return _get_worker_extension().barrier(id)
except Exception:
raise RuntimeError(f"shuffle_barrier failed during shuffle {id}")


def rearrange_by_column_p2p(
Expand Down
Loading