-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use pickle for graph submissions from client to scheduler #7564
Conversation
distributed/tests/test_client.py
Outdated
@pytest.mark.xfail(reason="Is this a sane thing to do?") | ||
@gen_cluster(client=True) | ||
async def test_compute_persisted_retries(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a behavioral change. This test tests a condition where a persisted result is annotated by a subsequent compute call.
I don't believe something like this should be allowed since annotations have non-trivial effects on scheduling and we're very likely not handling all of these cases. For instance, this test is annotating a retry. The only reason why this works is because persist+compute are submitted basically in the same event loop tick such that the retry annotation is set before the task even has a chance to be computed.
async def test_mixing_clients_same_scheduler(s, a, b): | ||
async with Client(s.address, asynchronous=True) as c1, Client( | ||
s.address, asynchronous=True | ||
) as c2: | ||
future = c1.submit(inc, 1) | ||
with pytest.raises(ValueError): | ||
c2.submit(inc, future) | ||
assert await c2.submit(inc, future) == 3 | ||
assert not s.tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is new. I simplified the serialization of the future and if the key is known to the scheduler, a future may even be shared by clients.
This is not necessary for this change just something I noticed when playing with the futures reducer.
Different scheduler still raises, see below (but only on await)
Cool. Starting to review this now. Thank you for pushing this up. cc @rjzamora |
distributed/protocol/pickle.py
Outdated
class _DaskPickler(pickle.Pickler): | ||
def reducer_override(self, obj): | ||
try: | ||
serialize = dask_serialize.dispatch(type(obj)) | ||
deserialize = dask_deserialize.dispatch(type(obj)) | ||
return deserialize, serialize(obj) | ||
except TypeError: | ||
return NotImplemented |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new pickler class. That is required to allow arguments like H5Py objects to be passed. Otherwise, we'd need to walk and iterate the HLGs/Layers ourselves (effectively a HLG version of our serialize protocol). I figured this is much simpler.
I guess there are more performant ways to do this. For instance, I'm not storing the frames in the buffer callback but I'm happy to defer to a later PR for this unless I'm missing something important
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For those not deeply familiar with this API, see https://docs.python.org/3/library/pickle.html#pickle.Pickler.reducer_override
distributed/scheduler.py
Outdated
plugin.update_graph( | ||
self, | ||
client=client, | ||
tasks=[ts.key for ts in touched_tasks], | ||
keys=requested_keys, | ||
dependencies=dependencies, | ||
annotations=final_annotations, | ||
priority=priority, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The plugin API is something I'm not entirely sure what to do with. I wrote it now such that our tests pass but the API did change, I'm not passing all arguments as before and unless we specifiy more explicitly what the arguments are, I believe we should pass them at all
The docs don't say anything https://distributed.dask.org/en/stable/plugins.html#distributed.diagnostics.plugin.SchedulerPlugin.update_graph
For instance, I was surprised to see that tasks
are actually keys...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't object to changing the API. However we should check with downstream users. One user in particular I care about is @ntabris who might use this for analytics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One user in particular I care about is @ntabris who might use this for analytics.
I checked. From what I can tell, the plugin uses only the transition hook but not the update_graph hook
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No concerns from me, like @fjetter says we aren't using graph updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass of review. I've breifly scanned everything except for scheduler.py. I'll handle that separately.
distributed/shuffle/_shuffle.py
Outdated
"parts_out", | ||
"annotations", | ||
] | ||
return (P2PShuffleLayer, tuple(getattr(self, attr) for attr in attrs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I ask why this was needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I subclassed the SimpleShuffleLayer
mostly to avoid code duplication back then. I believe with this change we could avoid the subclassing entirely.
The reason why this is necessary is because SimpleShuffleLayer
implements the same reduction but with the SimpleShuffleLayer
type. We could generalize this to use type(self)
but I believe avoiding subclassing is the overall better approach
distributed/tests/test_client.py
Outdated
|
||
assert "Scheduler cancels key" in slogs.getvalue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really care here, but this seems like a significant relaxation of the test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
absolutely. this was an accident
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't entirely understand why I need to wait here since I'd expect the logging module to go through the handlers synchronously.
FWIW the usage of caplog is unrelated. Same problem appears with the captured_logger
ctx
"client": "f", | ||
"keys": ["z"], | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is why I dislike white-box tests. There was a lot of this in early days.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I encountered the same test over #7502 (comment) already
I believe the condition this is constructing is actually not possible when using the Client API
This is why I dislike white-box tests
I actually don't consider this a "white box" test. We're not asserting on internal state of the object but are rather using a very low level API where instead a high level API would've been more useful. A different way to look at this is that we actually don't care about the low level API that much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fjetter is the primary change to scheduler.py refactoring it out into multiple smaller methods and the changes already in the original PR? Are there other sections I should be sure to review? It's hard to tell what has changed based only on the lines-changed.
distributed/scheduler.py
Outdated
plugin.update_graph( | ||
self, | ||
client=client, | ||
tasks=[ts.key for ts in touched_tasks], | ||
keys=requested_keys, | ||
dependencies=dependencies, | ||
annotations=final_annotations, | ||
priority=priority, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't object to changing the API. However we should check with downstream users. One user in particular I care about is @ntabris who might use this for analytics.
distributed/scheduler.py
Outdated
self._pop_known_tasks(dsk, dependencies) | ||
|
||
# Remove aliases | ||
for k in list(tasks): | ||
if tasks[k] is k: | ||
del tasks[k] | ||
if lost_keys := self._pop_lost_tasks(dsk, dependencies, requested_keys): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I dislike pulling non-repeated functionality into methods like this. I find that the non-linearity it produces makes it harder for me to understand code. This is subjective, I acknowledge.
More thoughts here: https://matthewrocklin.com/avoid-indirection.html
Also, bonus points if you remove the walrus operator. My guess is that most devs today don't intuitively know what it does. It would be better, I think, to use another line and more basic syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are just preferences, not constraints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I dislike pulling non-repeated functionality into methods like this. I find that the non-linearity it produces makes it harder for me to understand code. This is subjective, I acknowledge.
I tend to agree here and I was torn myself but the update_graph
method is otherwise 400 lines of highly nontrivial code and I believe that hiding a couple of deeply nested loops and conditionals is not hurting readability. This also helps with namespace pollution.
This way I can describe what update_graph does in fewer than ten lines
- deserialize the graph
- materialize the graph
- pop known and lost keys
- generate the internal state
- apply annotations
- compute priorities
- generate recommendations
I agree there is some nuance in what "popping known and lost keys" means (or other parts) but I believe this makes the code much more accessible.
My guess is that most devs today don't intuitively know what it does.
I think I do not agree with the statement that "most devs don't have an intuition about this". At the very least I don't think this is making accessibility to the code base any worse.
FWIW without my usage of walrus in this PR, there are already 16 occurrences of the operator in our code base
Ouch, that doesn't look good. That might be a segfault 😱 |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 26 files ± 0 26 suites ±0 12h 6m 17s ⏱️ - 38m 2s For more details on these failures, see this check. Results for commit e003ec6. ± Comparison against base commit 9bd90c0. This pull request removes 8 and adds 5 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
The most interesting change, I believe, is the new |
Indeed, I am seeing a segmentation fault....
This appears to happen only on ubuntu + python3.8 (so far) |
I factored out the pickler change, see #7567 |
I will follow up with another pass and try to reduce the diff tomorrow. If applicable, I'll break out smaller changes. I believe the most controversial change may be #7567 This is something I only stumbled over today and would appreciate some input from somebody with more historic exposure to our serialization protocol. |
@fjetter anything else I should be doing with this PR today? |
Factoring out changes to future serialization #7580 |
CI is looking good on both dask/dask and dask/distributed side. I am struggling still a bit with benchmark results. There was another priority mixup in the shuffle layer. Will update and rerun benchmarks with new results asap |
From what I can tell, I found and fixed the last regression. Currently running another set of benchmarks to confirm. If they don't flag anything suspicious, I will move forward with merging this Monday morning unless there are any objections until then. |
def get_output_keys(self) -> set[_T_Key]: | ||
return {(self.name, part) for part in self.parts_out} | ||
|
||
def is_materialized(self) -> bool: | ||
return hasattr(self, "_cached_dict") | ||
|
||
@property | ||
def _dict(self) -> _T_LowLevelGraph: | ||
"""Materialize full dict representation""" | ||
self._cached_dict: _T_LowLevelGraph | ||
dsk: _T_LowLevelGraph | ||
if hasattr(self, "_cached_dict"): | ||
return self._cached_dict | ||
else: | ||
dsk = self._construct_graph() | ||
self._cached_dict = dsk | ||
return self._cached_dict | ||
|
||
def __getitem__(self, key: _T_Key) -> tuple: | ||
return self._dict[key] | ||
|
||
def __iter__(self) -> Iterator[_T_Key]: | ||
return iter(self._dict) | ||
|
||
def __len__(self) -> int: | ||
return len(self._dict) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This subclassing caused another problem so I just copied the relevant code. IMHO most of these methods should be included in some kind of baseclass and not necessarily in the SimpleShuffleLayer
. I see this being repeated often but fixing this right now is out of scope.
Ok, all test jobs passed. Note: before merging I will need to revert the environment.yaml files. This will cause the builds to fail again since this needs to be merged together with the dask change in dask/dask#9988 |
Ok, benchmark results are finally in as well. This time w/out any regressions Wall Time (right side is null hypothesis, i.e. main vs. main to measure noise; Left side is this PR). What we can see is that there is not much to see. This change is not intended to change scheduling behavior or speed anything up. These benchmarks mostly confirm that we're dispatching the proper computations. There is one sizable performance improvement in the test case Memory comparisons do not show any differences beyond noise. Benchmark results available at |
alright, all tests passed, I reverted the environment files. I think we're good to go |
🎉
…On Mon, Mar 27, 2023 at 10:50 AM Florian Jetter ***@***.***> wrote:
Merged #7564 <#7564> into main.
—
Reply to this email directly, view it on GitHub
<#7564 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTH7PCC5L24JODSUNILW6GZLZANCNFSM6AAAAAAVDDAVII>
.
You are receiving this because your review was requested.Message ID:
***@***.***>
|
Avoid problems with serialising h5py objects in task graphs, introduced in 2023.4.0. See https://distributed.dask.org/en/stable/changelog.html#v2023-4-0 and dask/distributed#7564 for details.
* Remove redundant setuptools constraint * Limit Dask Distributed version to <2023.4 Avoid problems with serialising h5py objects in task graphs, introduced in 2023.4.0. See https://distributed.dask.org/en/stable/changelog.html#v2023-4-0 and dask/distributed#7564 for details.
This is unfortunately much larger than I was hoping. There are couple of notable changes in here
update_graph
itself is now much easier to read. Functionally speaking, there are only a few intentional behavioral changes. I'll point them out as dedicated commentsPickler
class since otherwise user arguments to graphs may end up being not serializable. One example are H5Py objects which intentionally disallow pickling. All of these cases are already handled by ourdask_(de)serialize
dispatch so the new pickler simply uses the dask serializer if one is available and falls back to ordinary pickle instead.Minor things
Supersedes
Requires dask branch