Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix eventloop integration with anyio #1265

Merged
merged 4 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ def process_stream_events():
# due to our consuming of the edge-triggered FD
# flush returns the number of events consumed.
# if there were any, wake it up
if kernel.shell_stream.flush(limit=1):
if (kernel.shell_socket.get(zmq.EVENTS) & zmq.POLLIN) > 0:
exit_loop()

if not hasattr(kernel, "_qt_notifier"):
fd = kernel.shell_stream.getsockopt(zmq.FD)
fd = kernel.shell_socket.getsockopt(zmq.FD)
kernel._qt_notifier = QtCore.QSocketNotifier(
fd, enum_helper("QtCore.QSocketNotifier.Type").Read, kernel.app.qt_event_loop
)
Expand Down Expand Up @@ -179,7 +179,7 @@ def loop_wx(kernel):

def wake():
"""wake from wx"""
if kernel.shell_stream.flush(limit=1):
if (kernel.shell_socket.get(zmq.EVENTS) & zmq.POLLIN) > 0:
kernel.app.ExitMainLoop()
return

Expand Down Expand Up @@ -248,14 +248,14 @@ def __init__(self, app):

def exit_loop():
"""fall back to main loop"""
app.tk.deletefilehandler(kernel.shell_stream.getsockopt(zmq.FD))
app.tk.deletefilehandler(kernel.shell_socket.getsockopt(zmq.FD))
app.quit()
app.destroy()
del kernel.app_wrapper

def process_stream_events(*a, **kw):
"""fall back to main loop when there's a socket event"""
if kernel.shell_stream.flush(limit=1):
if (kernel.shell_socket.get(zmq.EVENTS) & zmq.POLLIN) > 0:
exit_loop()

# allow for scheduling exits from the loop in case a timeout needs to
Expand All @@ -269,7 +269,7 @@ def _schedule_exit(delay):
# For Tkinter, we create a Tk object and call its withdraw method.
kernel.app_wrapper = BasicAppWrapper(app)
app.tk.createfilehandler(
kernel.shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events
kernel.shell_socket.getsockopt(zmq.FD), READABLE, process_stream_events
)
# schedule initial call after start
app.after(0, process_stream_events)
Expand Down Expand Up @@ -377,7 +377,7 @@ def handle_int(etype, value, tb):
# don't let interrupts during mainloop invoke crash_handler:
sys.excepthook = handle_int
mainloop(kernel._poll_interval)
if kernel.shell_stream.flush(limit=1):
if (kernel.shell_socket.get(zmq.EVENTS) & zmq.POLLIN) > 0:
# events to process, return control to kernel
return
except BaseException:
Expand Down Expand Up @@ -604,3 +604,11 @@ def enable_gui(gui, kernel=None):
kernel.eventloop = loop
# We set `eventloop`; the function the user chose is executed in `Kernel.enter_eventloop`, thus
# any exceptions raised during the event loop will not be shown in the client.

# If running in async loop then set anyio event to trigger starting the eventloop.
# If not running in async loop do nothing as this will be handled in IPKernelApp.main().
try:
kernel._eventloop_set.set()
except RuntimeError:
# Expecting sniffio.AsyncLibraryNotFoundError but don't want to import sniffio just for that
pass
10 changes: 8 additions & 2 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,12 +729,18 @@ def start(self) -> None:
run(self.main, backend=backend)
return

async def _wait_to_enter_eventloop(self):
await self.kernel._eventloop_set.wait()
await self.kernel.enter_eventloop()

async def main(self):
async with create_task_group() as tg:
if self.kernel.eventloop:
tg.start_soon(self.kernel.enter_eventloop)
tg.start_soon(self._wait_to_enter_eventloop)
tg.start_soon(self.kernel.start)

if self.kernel.eventloop:
self.kernel._eventloop_set.set()

def stop(self):
"""Stop the kernel, thread-safe."""
self.kernel.stop()
Expand Down
22 changes: 12 additions & 10 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

import psutil
import zmq
from anyio import TASK_STATUS_IGNORED, create_task_group, sleep, to_thread
from anyio import TASK_STATUS_IGNORED, Event, create_task_group, sleep, to_thread
from anyio.abc import TaskStatus
from IPython.core.error import StdinNotImplementedError
from jupyter_client.session import Session
Expand Down Expand Up @@ -229,6 +229,8 @@ def _parent_header(self):
"usage_request",
]

_eventloop_set: Event = Event()

def __init__(self, **kwargs):
"""Initialize the kernel."""
super().__init__(**kwargs)
Expand Down Expand Up @@ -321,7 +323,9 @@ async def enter_eventloop(self):
# record handle, so we can check when this changes
eventloop = self.eventloop
if eventloop is None:
self.log.info("Exiting as there is no eventloop")
# Do not warn if shutting down.
if not (hasattr(self, "shell") and self.shell.exit_now):
self.log.info("Exiting as there is no eventloop")
return

async def advance_eventloop():
Expand All @@ -335,21 +339,15 @@ async def advance_eventloop():
except KeyboardInterrupt:
# Ctrl-C shouldn't crash the kernel
self.log.error("KeyboardInterrupt caught in kernel")
if self.eventloop is eventloop:
# schedule advance again
await schedule_next()

async def schedule_next():
"""Schedule the next advance of the eventloop"""
# begin polling the eventloop
while self.eventloop is eventloop:
# flush the eventloop every so often,
# giving us a chance to handle messages in the meantime
self.log.debug("Scheduling eventloop advance")
await sleep(0.001)
await advance_eventloop()

# begin polling the eventloop
await schedule_next()

_message_counter = Any(
help="""Monotonic counter of messages
""",
Expand Down Expand Up @@ -481,6 +479,10 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
tg.start_soon(self.shell_main)

def stop(self):
if not self._eventloop_set.is_set():
# Stop the async task that is waiting for the eventloop to be set.
self._eventloop_set.set()

self.shell_stop.set()
self.control_stop.set()

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies = [
"pyzmq>=25.0",
"psutil>=5.7",
"packaging>=22",
"anyio>=4.0.0",
"anyio>=4.2.0",
]

[project.urls]
Expand Down
3 changes: 2 additions & 1 deletion tests/test_kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def test_merge_connection_file():
os.remove(cf)


@pytest.mark.skipif(trio is None, reason="requires trio")
# FIXME: @pytest.mark.skipif(trio is None, reason="requires trio")
@pytest.mark.skip()
def test_trio_loop():
app = IPKernelApp(trio_loop=True)

Expand Down
Loading