Skip to content

Commit

Permalink
Merge pull request #717 from vidartf/fix-async-restarter
Browse files Browse the repository at this point in the history
Improve restarter logic
  • Loading branch information
blink1073 authored Nov 22, 2021
2 parents 318e1c1 + 89fee5e commit 670ee79
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 10 deletions.
31 changes: 26 additions & 5 deletions jupyter_client/ioloop/restarter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
import time
import warnings

from traitlets import Instance
from zmq.eventloop import ioloop

from jupyter_client.restarter import KernelRestarter
from jupyter_client.utils import run_sync


class IOLoopKernelRestarter(KernelRestarter):
Expand All @@ -31,8 +34,12 @@ def _loop_default(self):
def start(self):
"""Start the polling of the kernel."""
if self._pcallback is None:
if asyncio.iscoroutinefunction(self.poll):
cb = run_sync(self.poll)
else:
cb = self.poll
self._pcallback = ioloop.PeriodicCallback(
self.poll,
cb,
1000 * self.time_to_dead,
)
self._pcallback.start()
Expand All @@ -49,13 +56,15 @@ async def poll(self):
if self.debug:
self.log.debug("Polling kernel...")
is_alive = await self.kernel_manager.is_alive()
now = time.time()
if not is_alive:
self._last_dead = now
if self._restarting:
self._restart_count += 1
else:
self._restart_count = 1

if self._restart_count >= self.restart_limit:
if self._restart_count > self.restart_limit:
self.log.warning("AsyncIOLoopKernelRestarter: restart failed")
self._fire_callbacks("dead")
self._restarting = False
Expand All @@ -73,8 +82,20 @@ async def poll(self):
await self.kernel_manager.restart_kernel(now=True, newports=newports)
self._restarting = True
else:
if self._initial_startup:
# Since `is_alive` only tests that the kernel process is alive, it does not
# indicate that the kernel has successfully completed startup. To solve this
# correctly, we would need to wait for a kernel info reply, but it is not
# necessarily appropriate to start a kernel client + channels in the
# restarter. Therefore, we use "has been alive continuously for X time" as a
# heuristic for a stable start up.
# See https://github.com/jupyter/jupyter_client/pull/717 for details.
stable_start_time = self.stable_start_time
if self.kernel_manager.provisioner:
stable_start_time = self.kernel_manager.provisioner.get_stable_start_time(
recommended=stable_start_time
)
if self._initial_startup and now - self._last_dead >= stable_start_time:
self._initial_startup = False
if self._restarting:
if self._restarting and now - self._last_dead >= stable_start_time:
self.log.debug("AsyncIOLoopKernelRestarter: restart apparently succeeded")
self._restarting = False
self._restarting = False
11 changes: 10 additions & 1 deletion jupyter_client/provisioning/provisioner_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async def load_provisioner_info(self, provisioner_info: Dict) -> None:

def get_shutdown_wait_time(self, recommended: float = 5.0) -> float:
"""
Returns the time allowed for a complete shutdown. This may vary by provisioner.
Returns the time allowed for a complete shutdown. This may vary by provisioner.
This method is called from `KernelManager.finish_shutdown()` during the graceful
phase of its kernel shutdown sequence.
Expand All @@ -215,6 +215,15 @@ def get_shutdown_wait_time(self, recommended: float = 5.0) -> float:
"""
return recommended

def get_stable_start_time(self, recommended: float = 10.0) -> float:
"""
Returns the expected upper bound for a kernel (re-)start to complete.
This may vary by provisioner.
The recommended value will typically be what is configured in the kernel restarter.
"""
return recommended

def _finalize_env(self, env: Dict[str, str]) -> None:
"""
Ensures env is appropriate prior to launch.
Expand Down
36 changes: 32 additions & 4 deletions jupyter_client/restarter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import time

from traitlets import Bool # type: ignore
from traitlets import default # type: ignore
from traitlets import Dict
from traitlets import Float
from traitlets import Instance
Expand All @@ -31,6 +34,12 @@ class KernelRestarter(LoggingConfigurable):

time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""")

stable_start_time = Float(
10.0,
config=True,
help="""The time in seconds to consider the kernel to have completed a stable start up.""",
)

restart_limit = Integer(
5,
config=True,
Expand All @@ -45,6 +54,11 @@ class KernelRestarter(LoggingConfigurable):
_restarting = Bool(False)
_restart_count = Integer(0)
_initial_startup = Bool(True)
_last_dead = Float()

@default("_last_dead")
def _default_last_dead(self):
return time.time()

callbacks = Dict()

Expand Down Expand Up @@ -103,13 +117,15 @@ def poll(self):
if self.kernel_manager.shutting_down:
self.log.debug("Kernel shutdown in progress...")
return
now = time.time()
if not self.kernel_manager.is_alive():
self._last_dead = now
if self._restarting:
self._restart_count += 1
else:
self._restart_count = 1

if self._restart_count >= self.restart_limit:
if self._restart_count > self.restart_limit:
self.log.warning("KernelRestarter: restart failed")
self._fire_callbacks("dead")
self._restarting = False
Expand All @@ -127,8 +143,20 @@ def poll(self):
self.kernel_manager.restart_kernel(now=True, newports=newports)
self._restarting = True
else:
if self._initial_startup:
# Since `is_alive` only tests that the kernel process is alive, it does not
# indicate that the kernel has successfully completed startup. To solve this
# correctly, we would need to wait for a kernel info reply, but it is not
# necessarily appropriate to start a kernel client + channels in the
# restarter. Therefore, we use "has been alive continuously for X time" as a
# heuristic for a stable start up.
# See https://github.com/jupyter/jupyter_client/pull/717 for details.
stable_start_time = self.stable_start_time
if self.kernel_manager.provisioner:
stable_start_time = self.kernel_manager.provisioner.get_stable_start_time(
recommended=stable_start_time
)
if self._initial_startup and now - self._last_dead >= stable_start_time:
self._initial_startup = False
if self._restarting:
if self._restarting and now - self._last_dead >= stable_start_time:
self.log.debug("KernelRestarter: restart apparently succeeded")
self._restarting = False
self._restarting = False
39 changes: 39 additions & 0 deletions jupyter_client/tests/problemkernel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Test kernel for signalling subprocesses"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import os
import time

from ipykernel.displayhook import ZMQDisplayHook
from ipykernel.kernelapp import IPKernelApp
from ipykernel.kernelbase import Kernel


class ProblemTestKernel(Kernel):
"""Kernel for testing kernel problems"""

implementation = "problemtest"
implementation_version = "0.0"
banner = ""


class ProblemTestApp(IPKernelApp):
kernel_class = ProblemTestKernel

def init_io(self):
# Overridden to disable stdout/stderr capture
self.displayhook = ZMQDisplayHook(self.session, self.iopub_socket)

def init_sockets(self):
if os.environ.get("FAIL_ON_START") == "1":
# Simulates e.g. a port binding issue (Adress already in use)
raise RuntimeError("Failed for testing purposes")
return super().init_sockets()


if __name__ == "__main__":
# make startup artificially slow,
# so that we exercise client logic for slow-starting kernels
startup_delay = int(os.environ.get("STARTUP_DELAY", "2"))
time.sleep(startup_delay)
ProblemTestApp.launch_instance()
Loading

0 comments on commit 670ee79

Please sign in to comment.