Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify worker plugins when a task is released #3817

Merged
merged 5 commits into from
Jun 18, 2020

Conversation

nre
Copy link
Contributor

@nre nre commented May 20, 2020

Fixes #3815

@nre nre force-pushed the worker_plugin_release_key branch 3 times, most recently from e5493a3 to 65316c2 Compare June 1, 2020 12:57
@nre nre marked this pull request as ready for review June 1, 2020 14:32
@nre
Copy link
Contributor Author

nre commented Jun 1, 2020

I have used asyncio.sleep to prevent the tests from being flaky because I see it being used in the other tests but please let me know if there is a better way.

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, I think the change is nice, but I worry about the API breakage for the notification, and also had a couple of style thoughts.
@quasiben or @mrocklin should probably give a moment to consider whether the name change for the _notify_transition (which ought to be an internal method) could have ramifications.

@@ -147,3 +148,34 @@ def transition(self, key, start, finish, **kwargs):
Final state of the transition.
kwargs: More options passed when transitioning
"""

def release_key(self, key, state, cause, reason, report):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the variables are types in the docstring example, but not here. I know this is true of the existing methods, but seems a little odd. Also, cause does appear to have a type below.
reason should have a description.

self.observed_notifications.append(ReleasedDep(dep, state))


Transition = namedtuple("Transition", "key, start, finish")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think these definitions might be useful outside of tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we should generalize these further. In fact, I wouldn't mind them being scoped to just this one test if that's alright. I'd like to reduce the number of abstractions we create if possible.

Personally, I'd prefer just having a dictionary here rather than three new namedtuples, but that's purely subjective, and not something that should hold up this PR.


await c.register_worker_plugin(plugin)
await c.submit(lambda x: x, 1, key="task")
await asyncio.sleep(DELAY_BEFORE_TEARDOWN)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps use distributed.utils_test.async_wait_for ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 . In general we avoid writing tests that depend on sleeping a certain amount of time.

Also, if a global value like this is only being used once then I would greatly prefer to put the value directly in the test. Otherwise reviewers and future devs will have to dive through more code to see what is happening. See http://matthewrocklin.com/blog/work/2019/06/23/avoid-indirection for more thoughts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the desire to avoid flaky tests, however if the test depends on a sleep, then it may be sensitive to delays. For example, on small CI machines we frequently experience GC cleanups on the order of seconds. So if this test isn't robust to random multi-second-long pauses with no work then it is likely to fail eventually.

):
assert expected == real
assert type(expected) is type(real) and expected == real
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me a little weird to have the test as part of the teardown; could you not make an assert_ok method (or better name) to be called in the body of the test?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, this is the way that it was already done (this is my fault, not @nre's)

I agree that it's atypical, but it does isolate the error effectively. This is also one of the benefits of running everything in the same thread.

@@ -2825,15 +2829,16 @@ def get_call_stack(self, comm=None, keys=None):
result = {k: profile.call_stack(frame) for k, frame in frames.items()}
return result

def _notify_transition(self, key, start, finish, **kwargs):
def _notify_plugins(self, method_name, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the potential breaking change, due to renaming the method. I'm not certain the rename is required, since the release modes could be seen as transitions too. I understand you also want to pass different arguments here. Should there be a deprecation cycle for the old name/signature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that user-created WorkerPlugins were aware of the use of the _notify_transition method. This was a purely internal detail. The user API of having a def transition(...) method on the plugin itself remains the same. I think that that is what we care about for API stability.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR @nre. I apologize for the delay in review.

I've left several comments here. If you have time to look through them and share your thoughts that would be welcome.

):
assert expected == real
assert type(expected) is type(real) and expected == real
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, this is the way that it was already done (this is my fault, not @nre's)

I agree that it's atypical, but it does isolate the error effectively. This is also one of the benefits of running everything in the same thread.

self.observed_notifications.append(ReleasedDep(dep, state))


Transition = namedtuple("Transition", "key, start, finish")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we should generalize these further. In fact, I wouldn't mind them being scoped to just this one test if that's alright. I'd like to reduce the number of abstractions we create if possible.

Personally, I'd prefer just having a dictionary here rather than three new namedtuples, but that's purely subjective, and not something that should hold up this PR.


await c.register_worker_plugin(plugin)
await c.submit(lambda x: x, 1, key="task")
await asyncio.sleep(DELAY_BEFORE_TEARDOWN)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 . In general we avoid writing tests that depend on sleeping a certain amount of time.

Also, if a global value like this is only being used once then I would greatly prefer to put the value directly in the test. Otherwise reviewers and future devs will have to dive through more code to see what is happening. See http://matthewrocklin.com/blog/work/2019/06/23/avoid-indirection for more thoughts.

expected_notifications = [
Transition("task", "waiting", "ready"),
Transition("task", "ready", "executing"),
Transition("task", "executing", "error"),
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer these to not be Transition types if that's ok. I'd prefer us to stick to core data structures like dicts if possible. As a reviewer I don't know what this Transition type is or what I should expect of it, but I will immediately understand how to manage a dict.

@@ -2825,15 +2829,16 @@ def get_call_stack(self, comm=None, keys=None):
result = {k: profile.call_stack(frame) for k, frame in frames.items()}
return result

def _notify_transition(self, key, start, finish, **kwargs):
def _notify_plugins(self, method_name, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that user-created WorkerPlugins were aware of the use of the _notify_transition method. This was a purely internal detail. The user API of having a def transition(...) method on the plugin itself remains the same. I think that that is what we care about for API stability.

Comment on lines 2834 to 2837
try:
plugin_method = getattr(plugin, method_name)
except AttributeError:
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for if hasattr(plugin, method_name) over a try-except. It seems more clear, and it's also marginally faster when we expect many misses (try-except is faster than if-else if the try block succeeds, but about 5x slower if the except path is taken).

@nre nre force-pushed the worker_plugin_release_key branch from a45ef8b to 61592de Compare June 17, 2020 09:08
@nre
Copy link
Contributor Author

nre commented Jun 17, 2020

@mrocklin Thank you for the Bokeh fix.

Sorry for this force-push but the tests need the Bokeh fix.

The test failure is unrelated to this PR: test_tcp_comm_closed_implicit

Changes made:

  • Add missing type annotations and parameter description to docstrings.
  • Use async_wait_for instead of sleep.
  • Use dict instead of namedtuple.
  • Use hasattr instead of try.

Changes not made but I am happy to do if required:

  • Add type annotations to worker plugin methods.
  • Restore the _notify_transition method.

@mrocklin
Copy link
Member

Sorry for this force-push but the tests need the Bokeh fix.

Not a problem. We squash-and-merge anyway.

@mrocklin mrocklin merged commit 5172678 into dask:master Jun 18, 2020
@mrocklin
Copy link
Member

This is in. Thanks @nre !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Worker plugin is not notified when worker releases task
3 participants