Skip to content

Commit

Permalink
Nudge kernel with info request until we receive IOPub messages
Browse files Browse the repository at this point in the history
  • Loading branch information
SylvainCorlay committed Dec 12, 2020
1 parent 6fef2a8 commit aca54f7
Showing 1 changed file with 81 additions and 9 deletions.
90 changes: 81 additions & 9 deletions jupyter_server/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,65 @@ def create_stream(self):
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
stream.channel = channel

def nudge(self):
shell_channel = self.channels['shell']
iopub_channel = self.channels['iopub']

future = Future()
info_future = Future()
iopub_future = Future()

def finish():
"""Common cleanup"""
loop.remove_timeout(timeout)
loop.remove_timeout(nudge_handle)
iopub_channel.stop_on_recv()
shell_channel.stop_on_recv()

def on_shell_reply(msg):
if not info_future.done():
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
shell_channel.stop_on_recv()
self.log.debug("Nudge: resolving shell future")
info_future.set_result(msg)
if iopub_future.done():
finish()
self.log.debug("Nudge: resolving main future in shell handler")
future.set_result(info_future.result())

def on_iopub(msg):
if not iopub_future.done():
self.log.debug("Nudge: first IOPub received: %s", self.kernel_id)
iopub_channel.stop_on_recv()
self.log.debug("Nudge: resolving iopub future")
iopub_future.set_result(None)
if info_future.done():
finish()
self.log.debug("Nudge: resolving main future in iopub handler")
future.set_result(info_future.result())

def on_timeout():
self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id)
finish()
if not future.done():
future.set_exception(TimeoutError("Timeout waiting for nudge"))

iopub_channel.on_recv(on_iopub)
shell_channel.on_recv(on_shell_reply)
loop = IOLoop.current()

# Nudge the kernel with kernel info requests until we get an IOPub message
def nudge():
self.log.debug("Nudge")
if not future.done():
self.log.debug("nudging")
self.session.send(shell_channel, "kernel_info_request")
nudge_handle = loop.call_later(0.5, nudge)
nudge_handle = loop.call_later(0, nudge)

timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
return future

def request_kernel_info(self):
"""send a request for kernel_info"""
km = self.kernel_manager
Expand Down Expand Up @@ -249,7 +308,7 @@ async def _register_session(self):
await stale_handler.close()
self._open_sessions[self.session_key] = self

def open(self, kernel_id):
async def open(self, kernel_id):
super(ZMQChannelsHandler, self).open()
km = self.kernel_manager
km.notify_connect(kernel_id)
Expand All @@ -259,15 +318,22 @@ def open(self, kernel_id):
if buffer_info and buffer_info['session_key'] == self.session_key:
self.log.info("Restoring connection for %s", self.session_key)
self.channels = buffer_info['channels']
replay_buffer = buffer_info['buffer']
if replay_buffer:
self.log.info("Replaying %s buffered messages", len(replay_buffer))
for channel, msg_list in replay_buffer:
stream = self.channels[channel]
self._on_zmq_reply(stream, msg_list)

connected = self.nudge()

def replay(value):
replay_buffer = buffer_info['buffer']
if replay_buffer:
self.log.info("Replaying %s buffered messages", len(replay_buffer))
for channel, msg_list in replay_buffer:
stream = self.channels[channel]
self._on_zmq_reply(stream, msg_list)

connected.add_done_callback(replay)
else:
try:
self.create_stream()
connected = self.nudge()
except web.HTTPError as e:
self.log.error("Error opening stream: %s", e)
# WebSockets don't response to traditional error codes so we
Expand All @@ -281,8 +347,14 @@ def open(self, kernel_id):
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')

for channel, stream in self.channels.items():
stream.on_recv_stream(self._on_zmq_reply)
def subscribe(value):
for channel, stream in self.channels.items():
stream.on_recv_stream(self._on_zmq_reply)

connected.add_done_callback(subscribe)

return connected


def on_message(self, msg):
if not self.channels:
Expand Down

0 comments on commit aca54f7

Please sign in to comment.