-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Notify worker plugins when a task is released #3817
Conversation
e5493a3
to
65316c2
Compare
I have used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, I think the change is nice, but I worry about the API breakage for the notification, and also had a couple of style thoughts.
@quasiben or @mrocklin should probably give a moment to consider whether the name change for the _notify_transition (which ought to be an internal method) could have ramifications.
@@ -147,3 +148,34 @@ def transition(self, key, start, finish, **kwargs): | |||
Final state of the transition. | |||
kwargs: More options passed when transitioning | |||
""" | |||
|
|||
def release_key(self, key, state, cause, reason, report): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the variables are types in the docstring example, but not here. I know this is true of the existing methods, but seems a little odd. Also, cause
does appear to have a type below.
reason
should have a description.
self.observed_notifications.append(ReleasedDep(dep, state)) | ||
|
||
|
||
Transition = namedtuple("Transition", "key, start, finish") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think these definitions might be useful outside of tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we should generalize these further. In fact, I wouldn't mind them being scoped to just this one test if that's alright. I'd like to reduce the number of abstractions we create if possible.
Personally, I'd prefer just having a dictionary here rather than three new namedtuples, but that's purely subjective, and not something that should hold up this PR.
|
||
await c.register_worker_plugin(plugin) | ||
await c.submit(lambda x: x, 1, key="task") | ||
await asyncio.sleep(DELAY_BEFORE_TEARDOWN) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps use distributed.utils_test.async_wait_for
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 . In general we avoid writing tests that depend on sleeping a certain amount of time.
Also, if a global value like this is only being used once then I would greatly prefer to put the value directly in the test. Otherwise reviewers and future devs will have to dive through more code to see what is happening. See http://matthewrocklin.com/blog/work/2019/06/23/avoid-indirection for more thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the desire to avoid flaky tests, however if the test depends on a sleep, then it may be sensitive to delays. For example, on small CI machines we frequently experience GC cleanups on the order of seconds. So if this test isn't robust to random multi-second-long pauses with no work then it is likely to fail eventually.
): | ||
assert expected == real | ||
assert type(expected) is type(real) and expected == real |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me a little weird to have the test as part of the teardown; could you not make an assert_ok
method (or better name) to be called in the body of the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, this is the way that it was already done (this is my fault, not @nre's)
I agree that it's atypical, but it does isolate the error effectively. This is also one of the benefits of running everything in the same thread.
@@ -2825,15 +2829,16 @@ def get_call_stack(self, comm=None, keys=None): | |||
result = {k: profile.call_stack(frame) for k, frame in frames.items()} | |||
return result | |||
|
|||
def _notify_transition(self, key, start, finish, **kwargs): | |||
def _notify_plugins(self, method_name, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the potential breaking change, due to renaming the method. I'm not certain the rename is required, since the release modes could be seen as transitions too. I understand you also want to pass different arguments here. Should there be a deprecation cycle for the old name/signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that user-created WorkerPlugins were aware of the use of the _notify_transition
method. This was a purely internal detail. The user API of having a def transition(...)
method on the plugin itself remains the same. I think that that is what we care about for API stability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR @nre. I apologize for the delay in review.
I've left several comments here. If you have time to look through them and share your thoughts that would be welcome.
): | ||
assert expected == real | ||
assert type(expected) is type(real) and expected == real |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, this is the way that it was already done (this is my fault, not @nre's)
I agree that it's atypical, but it does isolate the error effectively. This is also one of the benefits of running everything in the same thread.
self.observed_notifications.append(ReleasedDep(dep, state)) | ||
|
||
|
||
Transition = namedtuple("Transition", "key, start, finish") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we should generalize these further. In fact, I wouldn't mind them being scoped to just this one test if that's alright. I'd like to reduce the number of abstractions we create if possible.
Personally, I'd prefer just having a dictionary here rather than three new namedtuples, but that's purely subjective, and not something that should hold up this PR.
|
||
await c.register_worker_plugin(plugin) | ||
await c.submit(lambda x: x, 1, key="task") | ||
await asyncio.sleep(DELAY_BEFORE_TEARDOWN) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 . In general we avoid writing tests that depend on sleeping a certain amount of time.
Also, if a global value like this is only being used once then I would greatly prefer to put the value directly in the test. Otherwise reviewers and future devs will have to dive through more code to see what is happening. See http://matthewrocklin.com/blog/work/2019/06/23/avoid-indirection for more thoughts.
expected_notifications = [ | ||
Transition("task", "waiting", "ready"), | ||
Transition("task", "ready", "executing"), | ||
Transition("task", "executing", "error"), | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer these to not be Transition types if that's ok. I'd prefer us to stick to core data structures like dicts if possible. As a reviewer I don't know what this Transition
type is or what I should expect of it, but I will immediately understand how to manage a dict.
@@ -2825,15 +2829,16 @@ def get_call_stack(self, comm=None, keys=None): | |||
result = {k: profile.call_stack(frame) for k, frame in frames.items()} | |||
return result | |||
|
|||
def _notify_transition(self, key, start, finish, **kwargs): | |||
def _notify_plugins(self, method_name, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that user-created WorkerPlugins were aware of the use of the _notify_transition
method. This was a purely internal detail. The user API of having a def transition(...)
method on the plugin itself remains the same. I think that that is what we care about for API stability.
distributed/worker.py
Outdated
try: | ||
plugin_method = getattr(plugin, method_name) | ||
except AttributeError: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a slight preference for if hasattr(plugin, method_name)
over a try-except. It seems more clear, and it's also marginally faster when we expect many misses (try-except is faster than if-else if the try block succeeds, but about 5x slower if the except path is taken).
a45ef8b
to
61592de
Compare
@mrocklin Thank you for the Bokeh fix. Sorry for this force-push but the tests need the Bokeh fix. The test failure is unrelated to this PR: Changes made:
Changes not made but I am happy to do if required:
|
Not a problem. We squash-and-merge anyway. |
This is in. Thanks @nre ! |
Fixes #3815