Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail P2PShuffle gracefully upon worker failure #7326

Merged
merged 95 commits into from
Dec 9, 2022

Conversation

hendrikmakait
Copy link
Member

@hendrikmakait hendrikmakait commented Nov 17, 2022

  • Tests added / passed
  • Passes pre-commit run --all-files

@github-actions
Copy link
Contributor

github-actions bot commented Nov 17, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       18 files  ±    0         18 suites  ±0   8h 19m 55s ⏱️ + 42m 39s
  3 254 tests +  16    3 165 ✔️ +  12       85 💤 +  2  4 +2 
29 295 runs  +144  28 064 ✔️ +125  1 227 💤 +18  4 +1 

For more details on these failures, see this check.

Results for commit bc317e2. ± Comparison against base commit 8c81d03.

♻️ This comment has been updated with latest results.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the code this morning. Apparently VSCode didn't push the review so comments might be stale :(

distributed/shuffle/_shuffle_extension.py Outdated Show resolved Hide resolved
distributed/shuffle/_shuffle_extension.py Outdated Show resolved Hide resolved
distributed/shuffle/_shuffle_extension.py Outdated Show resolved Hide resolved
@@ -219,11 +227,20 @@ async def _receive(self, data: list[bytes]) -> None:
for k, v in groups.items()
}
)
self.raise_if_closed()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting place. Why would we need to raise here but not between any of the other awaits?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think we should combine the above calls into a single offload anyhow which would render this comment moot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offload itself is protected with raise_if_closed(). I've been thinking whether I should wrap any async functionality that needs to be protected with raise_if_closed() into individual functions. That would probably make reasoning about these easier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think we should combine the above calls into a single offload anyhow which would render this comment moot

Good point, done.

except Exception:
self.executor.shutdown()
self.closed = True
async with self._close_lock:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffle.close is called in register_complete and set_exception

How come this is overlapping? why do we need a lock? Are we sure that everything we do below is idempotent?

If we do not want to guarantee idempotency, the pattern we take in the server classes might be better suited than a lock, i.e.

async def close(self) -> None:
    if self.closed:
        await self._event_close.wait()
    self.closed = True
    await close_all_stuff()
    self._event_close.set()

this locks + makes it idempotent even without relying on the buffers/executors / whatever else to come to be.

The only important thing is that nobody must reset the closed attribute. This is a one way street, otherwise this pattern breaks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffle.close is called in ShuffleWorkerExtension._register_complete,ShuffleWorkerExtension.close and Shuffle.fail, so we could see scenarios where an extension is closing and a shuffle is either completing or failing.

Fair point about idempotency. While everything should be idempotent at the moment, I'll adjust this to use the more cautious pattern of waiting on an event. Since shuffles should never reopen once closed, this should be fine.

distributed/shuffle/tests/test_shuffle.py Outdated Show resolved Hide resolved
distributed/shuffle/tests/test_shuffle.py Outdated Show resolved Hide resolved
distributed/shuffle/tests/test_shuffle.py Outdated Show resolved Hide resolved
Comment on lines +170 to +172
with mock.patch(
"distributed.shuffle._shuffle_extension.get_worker_for", mock_get_worker_for
):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this mock. Right now, there are actually only two ways to trigger a situation where we have input workers that are not participating as output workers

  1. What you are currently emulating. That would be an unbelievable hash collision. We could possibly emulate this by using constant input data or smth like this
  2. Worker enters after the shuffle started. Stealing assigns a transfer task (which is very unlikely), this new worker executes at least one key and then it dies s.t. this key is rescheduled

I actually tried to manipulate stealing to do this and even manually this is non-trivial.

I also tried with shuffle(npartitions=1) s.t. we have more input than output partitoins. However, dask automatically triggers a repartition before the shuffle in this case s.t. we fall back to the stealing case.
Interestingly, this is an interesting performance hint. With p2p we should no longer require the repartition!

No actions here. Just wanted to share some info

distributed/shuffle/tests/test_shuffle.py Outdated Show resolved Hide resolved
@@ -536,12 +609,17 @@ class ShuffleSchedulerExtension:
columns: dict[ShuffleId, str]
output_workers: dict[ShuffleId, set[str]]
completed_workers: dict[ShuffleId, set[str]]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to remove self.completed_workers and register_complete, but that's work for another PR.

@fjetter
Copy link
Member

fjetter commented Dec 6, 2022

There is a related test failure test_crashed_worker_after_shuffle_persisted

@hendrikmakait
Copy link
Member Author

There is a related test failure test_crashed_worker_after_shuffle_persisted

Should be solved.

@hendrikmakait
Copy link
Member Author

No related failures on CI.

@hendrikmakait
Copy link
Member Author

No related failures on CI

Comment on lines 1014 to 1016
# await clean_worker(a)
# await clean_worker(b)
# await clean_scheduler(s)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional? Shouldn't this test work already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lost track of that, I adjusted the test and unskipped it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out CI isn't happy with this one. Unfortunately, I have not been able to reproduce the issue locally so far.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is not exactly where we would like it to be but I still believe this is worth merging s.t. we can focus on follow up tasks.

Great job @hendrikmakait . This was much tougher than I initially suspected

@fjetter fjetter merged commit 653b006 into dask:main Dec 9, 2022
@mrocklin
Copy link
Member

mrocklin commented Dec 9, 2022

This was much tougher than I initially suspected

Screen Shot 2022-12-09 at 10 43 48 AM

Ha!

Great job @hendrikmakait

👍 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants