-
Notifications
You must be signed in to change notification settings - Fork 285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle warnings in tests #751
Closed
Closed
Changes from 33 commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
d7b50a9
Handle warnings in tests
blink1073 b1484ed
set strict asyncio mode
blink1073 eaf2195
switch to auto mode
blink1073 c068a82
fix multikernelmanager tests
blink1073 7ccc0cf
fix provisioning
blink1073 4db6e4e
cleanup
blink1073 8de3e86
fix for min version
blink1073 b8935af
fix toml
blink1073 c729b23
try pyzmq 18
blink1073 40d3890
use bugfix version
blink1073 afdacfb
do not fail on warnings for min deps
blink1073 87832a2
fix event loop warning
blink1073 c3b432a
py10 fixups
blink1073 841c6bc
remove problematic test
blink1073 602f361
handle loop warning
blink1073 8945cc2
fix windows tempdir handling
blink1073 1edb75f
more cleanup
blink1073 2db6359
more cleanup
blink1073 77f5e4b
use latest nest-asyncio
blink1073 d217f59
try updated tornado
blink1073 2c506ec
try newer asyncio-test
blink1073 7a2f6d1
try newer pyzmq
blink1073 9868fa8
try newer ipykernel
blink1073 9251cc1
try removing zmq warning guard
blink1073 457e344
restore future warning
blink1073 0a75462
address review
blink1073 b0a321d
bump tornado
blink1073 ff8e482
bump zmq version
blink1073 d670d8c
fix version
blink1073 9ec08a8
relax a couple versions
blink1073 968b9c7
clean up ci
blink1073 35645f8
try a newer ipykernel
blink1073 8c4c472
fix shutdown all behavior
blink1073 d108c05
wip refactor
blink1073 66c6dab
more cleanup
blink1073 40bacf0
fix test
blink1073 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,33 +52,38 @@ class _ShutdownStatus(Enum): | |
SigkillRequest = "SigkillRequest" | ||
|
||
|
||
def in_pending_state(method): | ||
def in_pending_state(attr='_ready'): | ||
"""Sets the kernel to a pending state by | ||
creating a fresh Future for the KernelManager's `ready` | ||
attribute. Once the method is finished, set the Future's results. | ||
""" | ||
|
||
@functools.wraps(method) | ||
async def wrapper(self, *args, **kwargs): | ||
# Create a future for the decorated method | ||
try: | ||
self._ready = Future() | ||
except RuntimeError: | ||
# No event loop running, use concurrent future | ||
self._ready = CFuture() | ||
try: | ||
# call wrapped method, await, and set the result or exception. | ||
out = await method(self, *args, **kwargs) | ||
# Add a small sleep to ensure tests can capture the state before done | ||
await asyncio.sleep(0.01) | ||
self._ready.set_result(None) | ||
return out | ||
except Exception as e: | ||
self._ready.set_exception(e) | ||
self.log.exception(self._ready.exception()) | ||
raise e | ||
def inner(method): | ||
@functools.wraps(method) | ||
async def wrapper(self, *args, **kwargs): | ||
# Create a future for the decorated method | ||
try: | ||
asyncio.get_running_loop() | ||
fut = Future() | ||
except RuntimeError: | ||
# No event loop running, use concurrent future | ||
fut = CFuture() | ||
setattr(self, attr, fut) | ||
try: | ||
# call wrapped method, await, and set the result or exception. | ||
out = await method(self, *args, **kwargs) | ||
# Add a small sleep to ensure tests can capture the state before done | ||
await asyncio.sleep(0.01) | ||
fut.set_result(None) | ||
return out | ||
except Exception as e: | ||
fut.set_exception(e) | ||
self.log.exception(fut.exception()) | ||
raise e | ||
|
||
return wrapper | ||
|
||
return wrapper | ||
return inner | ||
|
||
|
||
class KernelManager(ConnectionFileMixin): | ||
|
@@ -90,8 +95,10 @@ class KernelManager(ConnectionFileMixin): | |
def __init__(self, *args, **kwargs): | ||
super().__init__(**kwargs) | ||
self._shutdown_status = _ShutdownStatus.Unset | ||
self._shutdown_ready = None | ||
# Create a place holder future. | ||
try: | ||
asyncio.get_running_loop() | ||
self._ready = Future() | ||
except RuntimeError: | ||
# No event loop running, use concurrent future | ||
|
@@ -182,6 +189,11 @@ def ready(self) -> Future: | |
"""A future that resolves when the kernel process has started for the first time""" | ||
return self._ready | ||
|
||
@property | ||
def shutdown_ready(self) -> Future: | ||
"""A future that resolves when a shutdown has completed""" | ||
return self._shutdown_ready | ||
|
||
@property | ||
def ipykernel(self) -> bool: | ||
return self.kernel_name in {"python", "python2", "python3"} | ||
|
@@ -360,7 +372,7 @@ async def _async_post_start_kernel(self, **kw) -> None: | |
|
||
post_start_kernel = run_sync(_async_post_start_kernel) | ||
|
||
@in_pending_state | ||
@in_pending_state() | ||
async def _async_start_kernel(self, **kw): | ||
"""Starts a kernel on this host in a separate process. | ||
|
||
|
@@ -387,8 +399,9 @@ async def _async_request_shutdown(self, restart: bool = False) -> None: | |
content = dict(restart=restart) | ||
msg = self.session.msg("shutdown_request", content=content) | ||
# ensure control socket is connected | ||
self._connect_control_socket() | ||
self.session.send(self._control_socket, msg) | ||
if self._control_socket and not self._control_socket.closed: | ||
self._connect_control_socket() | ||
self.session.send(self._control_socket, msg) | ||
assert self.provisioner is not None | ||
await self.provisioner.shutdown_requested(restart=restart) | ||
self._shutdown_status = _ShutdownStatus.ShutdownRequest | ||
|
@@ -453,7 +466,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None: | |
|
||
cleanup_resources = run_sync(_async_cleanup_resources) | ||
|
||
@in_pending_state | ||
@in_pending_state('_shutdown_ready') | ||
async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False): | ||
"""Attempts to stop the kernel process cleanly. | ||
|
||
|
@@ -476,7 +489,10 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) | |
# Stop monitoring for restarting while we shutdown. | ||
self.stop_restarter() | ||
|
||
await ensure_async(self.interrupt_kernel()) | ||
# If the kernel has already started, interrupt it to give it a | ||
# chance to clean up. | ||
if self.has_kernel: | ||
await ensure_async(self.interrupt_kernel()) | ||
Comment on lines
+505
to
+506
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice - make this call to interrupt() conditional on start status. Good catch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍🏼 |
||
|
||
if now: | ||
await ensure_async(self._kill_kernel()) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is inside an
async def
, can this except branch ever occur? There should always be a running loop from inside a coroutine.The previous strategy of setting
_ready
outside a coroutine might not have a running loop, but now that you've moved it into the coroutine (good, I think), there should always be a running loop.Returning two types of Future depending on context would be tricky because they aren't interchangeable (CFuture is not awaitable without calling
asyncio.wrap_future(cfuture)
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An important difference of this strategy, though: the attribute will not be set until an arbitrarily-later point in time due to asyncness. The previous strategy ensured
._ready
was defined immediately before any waiting, whereas putting it in the coroutine means the attribute will not be defined immediately.Two new things to consider (may well both be covered):
self._ready
that may now be called when_ready
is not yet_ready
be set and then re-set? If so, waits for_ready
may get a 'stale' state before the new_ready
future is attached. This can be avoided with a non-asyncdelattr
prior to the async bit.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally
_ready
was only defined in the constructor, which is why we had the fallback option. It seems like these futures are becoming problematic in general. Maybe what we really want is a state and a signal when the state changes, like we have on the JavaScript side.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, if we want to define
ready
andshutdown_ready
as "the most recent" starting and shutting down futures, we could also add anis_ready
andis_shutdown_ready
for when there are no pending of either. We would use this trick that tornado used to work around the new deprecation warnings:We could then make
.ready
and.shutdown
ready futures only available when using theAsyncKernelManager
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha oh my, nevermind, I'm digesting https://bugs.python.org/issue39529
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think part of the problem is that
jupyter_client
is a library, not an application, in the same way thattornado
is. It looks like we also can't rely onasyncio_run
in the future, since it relies onget_event_loop
andset_event_loop
. I think here's a sketch of what we need to do:return asyncio.new_event_loop().run_until_complete(async_method)
..ready
future and add new methods calledasync def wait_for_pending_start()
andasync def wait_for_pending_shutdown()
to handle pending kernels.With this new structure, the synchronous managers could also support pending kernels, breaking it into two synchronous steps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can handle the ready future in this PR and defer the other parts to a separate refactor PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, concurrent.futures are generally more compatible and flexible than asyncio futures - they are threadsafe, reusable across loops, etc. If you always use a cf.Future, you can await it in a coroutine with
asyncio.wrap_future(cf_future)
. That may be the simplest solution here. It's the inconsistency that I saw as a problem (self._ready
may be awaitable). Something like:ought to be more reliable.
To always use new event loops (or
asyncio.run
) for each sync method call, one important consideration is that all your methods completely resolve by the end of the coroutine (i.e. not hold references to the current loop for later calls, which means requiring pyzmq 22.2, among other things). A possibly simpler alternative is to maintain your own event loop reference.asyncio.get_event_loop()
is deprecated because it did too many things, but that doesn't mean nobody should hold onto persistent loop references. It just means that they don't want to track a global not-running loop. If each instance had aself._loop = asyncio.new_event_loop()
and always ran the sync wrappers in that loop (self._loop.run_until_complete(coro())
), it should work fine. That avoids the multiple-loop problem for sync uses because it's still a single persistent loop. Each instance may happen to use a different loop, but that should not be a problem in the sync case.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet, thanks for the insights Min. That sounds like a good approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More thoughts on ready future handling:
is_start_pending
andis_shutdown_pending
flags.start()
, check for_start_pending
and error if true. Set_start_pending
and set a new_start_ready
future. Clear_start_pending
when ready.shutdown()
, check for_shutdown_pending
and bail if true. Set_shutdown_pending
and set a new_shutdown_ready
future. Clear_shutdown_pending
when ready.wait_for_start_ready()
andwait_for_shutdown_ready()
store current ready, and wait for it. If new current ready is different from the stored one, recurse.