diff --git a/jupyter_client/__init__.py b/jupyter_client/__init__.py index f680b3d6a..f72c516d3 100644 --- a/jupyter_client/__init__.py +++ b/jupyter_client/__init__.py @@ -4,7 +4,7 @@ from .connect import * from .launcher import * from .client import KernelClient -from .manager import KernelManager, run_kernel +from .manager import KernelManager, AsyncKernelManager, run_kernel from .blocking import BlockingKernelClient from .asynchronous import AsyncKernelClient -from .multikernelmanager import MultiKernelManager +from .multikernelmanager import MultiKernelManager, AsyncMultiKernelManager diff --git a/jupyter_client/asynchronous/client.py b/jupyter_client/asynchronous/client.py index e471640da..3cf49ed4b 100644 --- a/jupyter_client/asynchronous/client.py +++ b/jupyter_client/asynchronous/client.py @@ -135,7 +135,7 @@ async def wait_for_ready(self, timeout=None): self._handle_kernel_info_reply(msg) break - if not self.is_alive(): + if not await self.is_alive(): raise RuntimeError('Kernel died before replying to kernel_info') # Check if current time is ready check time plus timeout @@ -234,6 +234,24 @@ def _output_hook_kernel(self, session, socket, parent_header, msg): else: self._output_hook_default(msg) + async def is_alive(self): + """Is the kernel process still running?""" + from ..manager import KernelManager, AsyncKernelManager + if isinstance(self.parent, KernelManager): + # This KernelClient was created by a KernelManager, + # we can ask the parent KernelManager: + if isinstance(self.parent, AsyncKernelManager): + return await self.parent.is_alive() + return self.parent.is_alive() + if self._hb_channel is not None: + # We don't have access to the KernelManager, + # so we use the heartbeat. + return self._hb_channel.is_beating() + else: + # no heartbeat and not local, we can't tell if it's running, + # so naively return True + return True + async def execute_interactive(self, code, silent=False, store_history=True, user_expressions=None, allow_stdin=None, stop_on_error=True, timeout=None, output_hook=None, stdin_hook=None, diff --git a/jupyter_client/ioloop/__init__.py b/jupyter_client/ioloop/__init__.py index d64f06d18..4203d2f82 100644 --- a/jupyter_client/ioloop/__init__.py +++ b/jupyter_client/ioloop/__init__.py @@ -1,2 +1,2 @@ -from .manager import IOLoopKernelManager -from .restarter import IOLoopKernelRestarter +from .manager import IOLoopKernelManager, AsyncIOLoopKernelManager +from .restarter import IOLoopKernelRestarter, AsyncIOLoopKernelRestarter diff --git a/jupyter_client/ioloop/manager.py b/jupyter_client/ioloop/manager.py index dacb822e8..bd64c92e8 100644 --- a/jupyter_client/ioloop/manager.py +++ b/jupyter_client/ioloop/manager.py @@ -11,8 +11,8 @@ Type, ) -from jupyter_client.manager import KernelManager -from .restarter import IOLoopKernelRestarter +from jupyter_client.manager import KernelManager, AsyncKernelManager +from .restarter import IOLoopKernelRestarter, AsyncIOLoopKernelRestarter def as_zmqstream(f): @@ -21,9 +21,11 @@ def wrapped(self, *args, **kwargs): return ZMQStream(socket, self.loop) return wrapped + class IOLoopKernelManager(KernelManager): loop = Instance('tornado.ioloop.IOLoop') + def _loop_default(self): return ioloop.IOLoop.current() @@ -59,3 +61,43 @@ def stop_restarter(self): connect_iopub = as_zmqstream(KernelManager.connect_iopub) connect_stdin = as_zmqstream(KernelManager.connect_stdin) connect_hb = as_zmqstream(KernelManager.connect_hb) + + +class AsyncIOLoopKernelManager(AsyncKernelManager): + + loop = Instance('tornado.ioloop.IOLoop') + + def _loop_default(self): + return ioloop.IOLoop.current() + + restarter_class = Type( + default_value=AsyncIOLoopKernelRestarter, + klass=AsyncIOLoopKernelRestarter, + help=( + 'Type of KernelRestarter to use. ' + 'Must be a subclass of AsyncIOLoopKernelManager.\n' + 'Override this to customize how kernel restarts are managed.' + ), + config=True, + ) + _restarter = Instance('jupyter_client.ioloop.AsyncIOLoopKernelRestarter', allow_none=True) + + def start_restarter(self): + if self.autorestart and self.has_kernel: + if self._restarter is None: + self._restarter = self.restarter_class( + kernel_manager=self, loop=self.loop, + parent=self, log=self.log + ) + self._restarter.start() + + def stop_restarter(self): + if self.autorestart: + if self._restarter is not None: + self._restarter.stop() + self._restarter = None + + connect_shell = as_zmqstream(AsyncKernelManager.connect_shell) + connect_iopub = as_zmqstream(AsyncKernelManager.connect_iopub) + connect_stdin = as_zmqstream(AsyncKernelManager.connect_stdin) + connect_hb = as_zmqstream(AsyncKernelManager.connect_hb) diff --git a/jupyter_client/ioloop/restarter.py b/jupyter_client/ioloop/restarter.py index 7c235603a..d5a5628e1 100644 --- a/jupyter_client/ioloop/restarter.py +++ b/jupyter_client/ioloop/restarter.py @@ -16,10 +16,12 @@ Instance, ) + class IOLoopKernelRestarter(KernelRestarter): """Monitor and autorestart a kernel.""" loop = Instance('tornado.ioloop.IOLoop') + def _loop_default(self): warnings.warn("IOLoopKernelRestarter.loop is deprecated in jupyter-client 5.2", DeprecationWarning, stacklevel=4, @@ -41,3 +43,39 @@ def stop(self): if self._pcallback is not None: self._pcallback.stop() self._pcallback = None + + +class AsyncIOLoopKernelRestarter(IOLoopKernelRestarter): + + async def poll(self): + if self.debug: + self.log.debug('Polling kernel...') + is_alive = await self.kernel_manager.is_alive() + if not is_alive: + if self._restarting: + self._restart_count += 1 + else: + self._restart_count = 1 + + if self._restart_count >= self.restart_limit: + self.log.warning("AsyncIOLoopKernelRestarter: restart failed") + self._fire_callbacks('dead') + self._restarting = False + self._restart_count = 0 + self.stop() + else: + newports = self.random_ports_until_alive and self._initial_startup + self.log.info('AsyncIOLoopKernelRestarter: restarting kernel (%i/%i), %s random ports', + self._restart_count, + self.restart_limit, + 'new' if newports else 'keep' + ) + self._fire_callbacks('restart') + await self.kernel_manager.restart_kernel(now=True, newports=newports) + self._restarting = True + else: + if self._initial_startup: + self._initial_startup = False + if self._restarting: + self.log.debug("AsyncIOLoopKernelRestarter: restart apparently succeeded") + self._restarting = False diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 36ae9382a..ea5530c58 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -4,6 +4,7 @@ # Distributed under the terms of the Modified BSD License. from contextlib import contextmanager +import asyncio import os import re import signal @@ -223,8 +224,8 @@ def _close_control_socket(self): self._control_socket.close() self._control_socket = None - def start_kernel(self, **kw): - """Starts a kernel on this host in a separate process. + def pre_start_kernel(self, **kw): + """Prepares a kernel for startup in a separate process. If random ports (port=0) are being used, this method must be called before the channels are created. @@ -261,12 +262,9 @@ def start_kernel(self, **kw): env.update(self._get_env_substitutions(self.kernel_spec.env, env)) elif self.extra_env: env.update(self._get_env_substitutions(self.extra_env, env)) + kw['env'] = env - # launch the kernel subprocess - self.log.debug("Starting kernel: %s", kernel_cmd) - self.kernel = self._launch_kernel(kernel_cmd, env=env, **kw) - self.start_restarter() - self._connect_control_socket() + return kernel_cmd, kw def _get_env_substitutions(self, templated_env, substitution_values): """ Walks env entries in templated_env and applies possible substitutions from current env @@ -284,6 +282,29 @@ def _get_env_substitutions(self, templated_env, substitution_values): substituted_env.update({k: Template(v).safe_substitute(substitution_values)}) return substituted_env + def post_start_kernel(self, **kw): + self.start_restarter() + self._connect_control_socket() + + def start_kernel(self, **kw): + """Starts a kernel on this host in a separate process. + + If random ports (port=0) are being used, this method must be called + before the channels are created. + + Parameters + ---------- + `**kw` : optional + keyword arguments that are passed down to build the kernel_cmd + and launching the kernel (e.g. Popen kwargs). + """ + kernel_cmd, kw = self.pre_start_kernel(**kw) + + # launch the kernel subprocess + self.log.debug("Starting kernel: %s", kernel_cmd) + self.kernel = self._launch_kernel(kernel_cmd, **kw) + self.post_start_kernel(**kw) + def request_shutdown(self, restart=False): """Send a shutdown request via control channel """ @@ -488,6 +509,232 @@ def is_alive(self): return False +class AsyncKernelManager(KernelManager): + """Manages kernels in an asynchronous manner """ + + client_factory = Type(klass='jupyter_client.asynchronous.AsyncKernelClient') + + async def _launch_kernel(self, kernel_cmd, **kw): + """actually launch the kernel + + override in a subclass to launch kernel subprocesses differently + """ + res = launch_kernel(kernel_cmd, **kw) + return res + + async def start_kernel(self, **kw): + """Starts a kernel in a separate process in an asynchronous manner. + + If random ports (port=0) are being used, this method must be called + before the channels are created. + + Parameters + ---------- + `**kw` : optional + keyword arguments that are passed down to build the kernel_cmd + and launching the kernel (e.g. Popen kwargs). + """ + kernel_cmd, kw = self.pre_start_kernel(**kw) + + # launch the kernel subprocess + self.log.debug("Starting kernel (async): %s", kernel_cmd) + self.kernel = await self._launch_kernel(kernel_cmd, **kw) + self.post_start_kernel(**kw) + + async def finish_shutdown(self, waittime=None, pollinterval=0.1): + """Wait for kernel shutdown, then kill process if it doesn't shutdown. + + This does not send shutdown requests - use :meth:`request_shutdown` + first. + """ + if waittime is None: + waittime = max(self.shutdown_wait_time, 0) + try: + await asyncio.wait_for(self._async_wait(pollinterval=pollinterval), timeout=waittime) + except asyncio.TimeoutError: + self.log.debug("Kernel is taking too long to finish, killing") + await self._kill_kernel() + else: + # Process is no longer alive, wait and clear + if self.kernel is not None: + self.kernel.wait() + self.kernel = None + + async def shutdown_kernel(self, now=False, restart=False): + """Attempts to stop the kernel process cleanly. + + This attempts to shutdown the kernels cleanly by: + + 1. Sending it a shutdown message over the shell channel. + 2. If that fails, the kernel is shutdown forcibly by sending it + a signal. + + Parameters + ---------- + now : bool + Should the kernel be forcible killed *now*. This skips the + first, nice shutdown attempt. + restart: bool + Will this kernel be restarted after it is shutdown. When this + is True, connection files will not be cleaned up. + """ + # Stop monitoring for restarting while we shutdown. + self.stop_restarter() + + if now: + await self._kill_kernel() + else: + self.request_shutdown(restart=restart) + # Don't send any additional kernel kill messages immediately, to give + # the kernel a chance to properly execute shutdown actions. Wait for at + # most 1s, checking every 0.1s. + await self.finish_shutdown() + + self.cleanup(connection_file=not restart) + + async def restart_kernel(self, now=False, newports=False, **kw): + """Restarts a kernel with the arguments that were used to launch it. + + Parameters + ---------- + now : bool, optional + If True, the kernel is forcefully restarted *immediately*, without + having a chance to do any cleanup action. Otherwise the kernel is + given 1s to clean up before a forceful restart is issued. + + In all cases the kernel is restarted, the only difference is whether + it is given a chance to perform a clean shutdown or not. + + newports : bool, optional + If the old kernel was launched with random ports, this flag decides + whether the same ports and connection file will be used again. + If False, the same ports and connection file are used. This is + the default. If True, new random port numbers are chosen and a + new connection file is written. It is still possible that the newly + chosen random port numbers happen to be the same as the old ones. + + `**kw` : optional + Any options specified here will overwrite those used to launch the + kernel. + """ + if self._launch_args is None: + raise RuntimeError("Cannot restart the kernel. " + "No previous call to 'start_kernel'.") + else: + # Stop currently running kernel. + await self.shutdown_kernel(now=now, restart=True) + + if newports: + self.cleanup_random_ports() + + # Start new kernel. + self._launch_args.update(kw) + await self.start_kernel(**self._launch_args) + return None + + async def _kill_kernel(self): + """Kill the running kernel. + + This is a private method, callers should use shutdown_kernel(now=True). + """ + if self.has_kernel: + # Signal the kernel to terminate (sends SIGKILL on Unix and calls + # TerminateProcess() on Win32). + try: + if hasattr(signal, 'SIGKILL'): + await self.signal_kernel(signal.SIGKILL) + else: + await self.kernel.kill() + except OSError as e: + # In Windows, we will get an Access Denied error if the process + # has already terminated. Ignore it. + if sys.platform == 'win32': + if e.winerror != 5: + raise + # On Unix, we may get an ESRCH error if the process has already + # terminated. Ignore it. + else: + from errno import ESRCH + if e.errno != ESRCH: + raise + + # Wait until the kernel terminates. + try: + await asyncio.wait_for(self._async_wait(), timeout=5.0) + except asyncio.TimeoutError: + # Wait timed out, just log warning but continue - not much more we can do. + self.log.warning("Wait for final termination of kernel timed out - continuing...") + pass + else: + # Process is no longer alive, wait and clear + if self.kernel is not None: + self.kernel.wait() + self.kernel = None + else: + raise RuntimeError("Cannot kill kernel. No kernel is running!") + + async def interrupt_kernel(self): + """Interrupts the kernel by sending it a signal. + + Unlike ``signal_kernel``, this operation is well supported on all + platforms. + """ + if self.has_kernel: + interrupt_mode = self.kernel_spec.interrupt_mode + if interrupt_mode == 'signal': + if sys.platform == 'win32': + from .win_interrupt import send_interrupt + send_interrupt(self.kernel.win32_interrupt_event) + else: + await self.signal_kernel(signal.SIGINT) + + elif interrupt_mode == 'message': + msg = self.session.msg("interrupt_request", content={}) + self._connect_control_socket() + self.session.send(self._control_socket, msg) + else: + raise RuntimeError("Cannot interrupt kernel. No kernel is running!") + + async def signal_kernel(self, signum): + """Sends a signal to the process group of the kernel (this + usually includes the kernel and any subprocesses spawned by + the kernel). + + Note that since only SIGTERM is supported on Windows, this function is + only useful on Unix systems. + """ + if self.has_kernel: + if hasattr(os, "getpgid") and hasattr(os, "killpg"): + try: + pgid = os.getpgid(self.kernel.pid) + os.killpg(pgid, signum) + return + except OSError: + pass + self.kernel.send_signal(signum) + else: + raise RuntimeError("Cannot signal kernel. No kernel is running!") + + async def is_alive(self): + """Is the kernel process still running?""" + if self.has_kernel: + if self.kernel.poll() is None: + return True + else: + return False + else: + # we don't have a kernel + return False + + async def _async_wait(self, pollinterval=0.1): + # Use busy loop at 100ms intervals, polling until the process is + # not alive. If we find the process is no longer alive, complete + # its cleanup via the blocking wait(). Callers are responsible for + # issuing calls to wait() using a timeout (see _kill_kernel()). + while await self.is_alive(): + await asyncio.sleep(pollinterval) + + KernelManagerABC.register(KernelManager) @@ -506,6 +753,23 @@ def start_new_kernel(startup_timeout=60, kernel_name='python', **kwargs): return km, kc + +async def start_new_async_kernel(startup_timeout=60, kernel_name='python', **kwargs): + """Start a new kernel, and return its Manager and Client""" + km = AsyncKernelManager(kernel_name=kernel_name) + await km.start_kernel(**kwargs) + kc = km.client() + kc.start_channels() + try: + await kc.wait_for_ready(timeout=startup_timeout) + except RuntimeError: + kc.stop_channels() + await km.shutdown_kernel() + raise + + return (km, kc) + + @contextmanager def run_kernel(**kwargs): """Context manager to create a kernel in a subprocess. diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 8a0ce31d7..d5ba72037 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -17,6 +17,8 @@ from ipython_genutils.py3compat import unicode_type from .kernelspec import NATIVE_KERNEL_NAME, KernelSpecManager +from .manager import AsyncKernelManager + class DuplicateKernelError(Exception): pass @@ -123,14 +125,8 @@ def __len__(self): def __contains__(self, kernel_id): return kernel_id in self._kernels - def start_kernel(self, kernel_name=None, **kwargs): - """Start a new kernel. + def pre_start_kernel(self, kernel_name, **kwargs): - The caller can pick a kernel_id by passing one in as a keyword arg, - otherwise one will be generated using new_kernel_id(). - - The kernel ID for the newly started kernel is returned. - """ kernel_id = kwargs.pop('kernel_id', self.new_kernel_id(**kwargs)) if kernel_id in self: raise DuplicateKernelError('Kernel already exists: %s' % kernel_id) @@ -148,6 +144,17 @@ def start_kernel(self, kernel_name=None, **kwargs): parent=self, log=self.log, kernel_name=kernel_name, **constructor_kwargs ) + return km, kernel_name, kernel_id + + def start_kernel(self, kernel_name=None, **kwargs): + """Start a new kernel. + + The caller can pick a kernel_id by passing one in as a keyword arg, + otherwise one will be generated using new_kernel_id(). + + The kernel ID for the newly started kernel is returned. + """ + km, kernel_name, kernel_id = self.pre_start_kernel(kernel_name, **kwargs) km.start_kernel(**kwargs) self._kernels[kernel_id] = km return kernel_id @@ -393,3 +400,113 @@ def new_kernel_id(self, **kwargs): :return: string-ized version 4 uuid """ return unicode_type(uuid.uuid4()) + + +class AsyncMultiKernelManager(MultiKernelManager): + + kernel_manager_class = DottedObjectName( + "jupyter_client.ioloop.AsyncIOLoopKernelManager", config=True, + help="""The kernel manager class. This is configurable to allow + subclassing of the AsyncKernelManager for customized behavior. + """ + ) + + async def start_kernel(self, kernel_name=None, **kwargs): + """Start a new kernel. + + The caller can pick a kernel_id by passing one in as a keyword arg, + otherwise one will be generated using new_kernel_id(). + + The kernel ID for the newly started kernel is returned. + """ + km, kernel_name, kernel_id = self.pre_start_kernel(kernel_name, **kwargs) + if not isinstance(km, AsyncKernelManager): + self.log.warning("Kernel manager class ({km_class}) is not an instance of 'AsyncKernelManager'!". + format(km_class=self.kernel_manager_class.__class__)) + await km.start_kernel(**kwargs) + self._kernels[kernel_id] = km + return kernel_id + + async def shutdown_kernel(self, kernel_id, now=False, restart=False): + """Shutdown a kernel by its kernel uuid. + + Parameters + ========== + kernel_id : uuid + The id of the kernel to shutdown. + now : bool + Should the kernel be shutdown forcibly using a signal. + restart : bool + Will the kernel be restarted? + """ + self.log.info("Kernel shutdown: %s" % kernel_id) + + km = self.get_kernel(kernel_id) + + ports = ( + km.shell_port, km.iopub_port, km.stdin_port, + km.hb_port, km.control_port + ) + + await km.shutdown_kernel(now, restart) + self.remove_kernel(kernel_id) + + if km.cache_ports and not restart: + for port in ports: + self.currently_used_ports.remove(port) + + async def finish_shutdown(self, kernel_id, waittime=None, pollinterval=0.1): + """Wait for a kernel to finish shutting down, and kill it if it doesn't + """ + km = self.get_kernel(kernel_id) + await km.finish_shutdown(waittime, pollinterval) + self.log.info("Kernel shutdown: %s" % kernel_id) + + async def interrupt_kernel(self, kernel_id): + """Interrupt (SIGINT) the kernel by its uuid. + + Parameters + ========== + kernel_id : uuid + The id of the kernel to interrupt. + """ + km = self.get_kernel(kernel_id) + await km.interrupt_kernel() + self.log.info("Kernel interrupted: %s" % kernel_id) + + async def signal_kernel(self, kernel_id, signum): + """Sends a signal to the kernel by its uuid. + + Note that since only SIGTERM is supported on Windows, this function + is only useful on Unix systems. + + Parameters + ========== + kernel_id : uuid + The id of the kernel to signal. + """ + km = self.get_kernel(kernel_id) + await km.signal_kernel(signum) + self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum)) + + async def restart_kernel(self, kernel_id, now=False): + """Restart a kernel by its uuid, keeping the same ports. + + Parameters + ========== + kernel_id : uuid + The id of the kernel to interrupt. + """ + km = self.get_kernel(kernel_id) + await km.restart_kernel(now) + self.log.info("Kernel restarted: %s" % kernel_id) + + async def shutdown_all(self, now=False): + """Shutdown all kernels.""" + kids = self.list_kernel_ids() + for kid in kids: + self.request_shutdown(kid) + for kid in kids: + await self.finish_shutdown(kid) + self.cleanup(kid) + self.remove_kernel(kid) diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index 58010bcfb..b3661b1d9 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -4,6 +4,7 @@ # Distributed under the terms of the Modified BSD License. +import asyncio import json import os pjoin = os.path.join @@ -15,15 +16,17 @@ import multiprocessing as mp import pytest from unittest import TestCase +from tornado.testing import AsyncTestCase, gen_test, gen from traitlets.config.loader import Config from jupyter_core import paths -from jupyter_client import KernelManager -from ..manager import start_new_kernel +from jupyter_client import KernelManager, AsyncKernelManager +from ..manager import start_new_kernel, start_new_async_kernel from .utils import test_env, skip_win32 TIMEOUT = 30 + class TestKernelManager(TestCase): def setUp(self): self.env_patch = test_env() @@ -89,6 +92,7 @@ def test_get_connect_info(self): def test_signal_kernel_subprocesses(self): self._install_test_kernel() km, kc = start_new_kernel(kernel_name='signaltest') + def execute(cmd): kc.execute(cmd) reply = kc.get_shell_msg(TIMEOUT) @@ -143,7 +147,7 @@ def execute(cmd): reply = execute('env') self.assertIsNotNone(reply) - self.assertEquals(reply['user_expressions']['env'], 'test_var_1:test_var_2') + self.assertEqual(reply['user_expressions']['env'], 'test_var_1:test_var_2') def test_templated_kspec_env(self): self._install_test_kernel() @@ -268,7 +272,7 @@ def test_start_sequence_process_kernels(self, config): proc.join() assert proc.exitcode == 0 - + def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs): km.start_kernel(**kwargs) kc = km.client() @@ -303,3 +307,127 @@ def execute(cmd): execute('check') km.shutdown_kernel() + + +class TestAsyncKernelManager(AsyncTestCase): + def setUp(self): + super(TestAsyncKernelManager, self).setUp() + self.env_patch = test_env() + self.env_patch.start() + + def tearDown(self): + super(TestAsyncKernelManager, self).tearDown() + self.env_patch.stop() + + def _install_test_kernel(self): + kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest') + os.makedirs(kernel_dir) + with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f: + f.write(json.dumps({ + 'argv': [sys.executable, + '-m', 'jupyter_client.tests.signalkernel', + '-f', '{connection_file}'], + 'display_name': "Signal Test Kernel", + })) + + def _get_tcp_km(self): + c = Config() + km = AsyncKernelManager(config=c) + return km + + def _get_ipc_km(self): + c = Config() + c.KernelManager.transport = 'ipc' + c.KernelManager.ip = 'test' + km = AsyncKernelManager(config=c) + return km + + async def _run_lifecycle(self, km): + await km.start_kernel(stdout=PIPE, stderr=PIPE) + self.assertTrue(await km.is_alive()) + await km.restart_kernel(now=True) + self.assertTrue(await km.is_alive()) + await km.interrupt_kernel() + self.assertTrue(isinstance(km, AsyncKernelManager)) + await km.shutdown_kernel(now=True) + self.assertFalse(await km.is_alive()) + + @gen_test + async def test_tcp_lifecycle(self): + km = self._get_tcp_km() + await self._run_lifecycle(km) + + @skip_win32 + @gen_test + async def test_ipc_lifecycle(self): + km = self._get_ipc_km() + await self._run_lifecycle(km) + + def test_get_connect_info(self): + km = self._get_tcp_km() + cinfo = km.get_connection_info() + keys = sorted(cinfo.keys()) + expected = sorted([ + 'ip', 'transport', + 'hb_port', 'shell_port', 'stdin_port', 'iopub_port', 'control_port', + 'key', 'signature_scheme', + ]) + self.assertEqual(keys, expected) + + @skip_win32 + @gen_test(timeout=10.0) + async def test_signal_kernel_subprocesses(self): + self._install_test_kernel() + km, kc = await start_new_async_kernel(kernel_name='signaltest') + + async def execute(cmd): + kc.execute(cmd) + reply = await kc.get_shell_msg(TIMEOUT) + content = reply['content'] + self.assertEqual(content['status'], 'ok') + return content + # Ensure that shutdown_kernel and stop_channels are called at the end of the test. + # Note: we cannot use addCleanup() for these since it doesn't prpperly handle + # coroutines - which km.shutdown_kernel now is. + try: + N = 5 + for i in range(N): + await execute("start") + await asyncio.sleep(1) # make sure subprocs stay up + reply = await execute('check') + self.assertEqual(reply['user_expressions']['poll'], [None] * N) + + # start a job on the kernel to be interrupted + kc.execute('sleep') + await asyncio.sleep(1) # ensure sleep message has been handled before we interrupt + await km.interrupt_kernel() + reply = await kc.get_shell_msg(TIMEOUT) + content = reply['content'] + self.assertEqual(content['status'], 'ok') + self.assertEqual(content['user_expressions']['interrupted'], True) + # wait up to 5s for subprocesses to handle signal + for i in range(50): + reply = await execute('check') + if reply['user_expressions']['poll'] != [-signal.SIGINT] * N: + await asyncio.sleep(0.1) + else: + break + # verify that subprocesses were interrupted + self.assertEqual(reply['user_expressions']['poll'], [-signal.SIGINT] * N) + finally: + await km.shutdown_kernel(now=True) + kc.stop_channels() + + @gen_test(timeout=10.0) + async def test_start_new_async_kernel(self): + self._install_test_kernel() + km, kc = await start_new_async_kernel(kernel_name='signaltest') + # Ensure that shutdown_kernel and stop_channels are called at the end of the test. + # Note: we cannot use addCleanup() for these since it doesn't properly handle + # coroutines - which km.shutdown_kernel now is. + try: + self.assertTrue(await km.is_alive()) + self.assertTrue(await kc.is_alive()) + finally: + await km.shutdown_kernel(now=True) + kc.stop_channels() diff --git a/jupyter_client/tests/test_multikernelmanager.py b/jupyter_client/tests/test_multikernelmanager.py index 08002fef7..601879f80 100644 --- a/jupyter_client/tests/test_multikernelmanager.py +++ b/jupyter_client/tests/test_multikernelmanager.py @@ -1,16 +1,16 @@ """Tests for the notebook kernel and session manager.""" -import os -import time +import asyncio import threading import multiprocessing as mp from subprocess import PIPE from unittest import TestCase -from traitlets.config.loader import Config -from jupyter_client import KernelManager -from jupyter_client.multikernelmanager import MultiKernelManager +from tornado.testing import AsyncTestCase, gen_test +from traitlets.config.loader import Config +from jupyter_client import KernelManager, AsyncKernelManager +from jupyter_client.multikernelmanager import MultiKernelManager, AsyncMultiKernelManager from .utils import skip_win32 from ..localinterfaces import localhost @@ -36,7 +36,7 @@ def _run_lifecycle(self, km): self.assertTrue(km.is_alive(kid)) self.assertTrue(kid in km) self.assertTrue(kid in km.list_kernel_ids()) - self.assertEqual(len(km),1) + self.assertEqual(len(km), 1) km.restart_kernel(kid, now=True) self.assertTrue(km.is_alive(kid)) self.assertTrue(kid in km.list_kernel_ids()) @@ -44,7 +44,7 @@ def _run_lifecycle(self, km): k = km.get_kernel(kid) self.assertTrue(isinstance(k, KernelManager)) km.shutdown_kernel(kid, now=True) - self.assertTrue(not kid in km) + self.assertNotIn(kid, km) def _run_cinfo(self, km, transport, ip): kid = km.start_kernel(stdout=PIPE, stderr=PIPE) @@ -97,18 +97,22 @@ def test_start_sequence_tcp_kernels(self): self._run_lifecycle(self._get_tcp_km()) self._run_lifecycle(self._get_tcp_km()) - - def test_start_sequence_tcp_kernels(self): + def test_start_sequence_ipc_kernels(self): """Ensure that a sequence of kernel startups doesn't break anything.""" self._run_lifecycle(self._get_ipc_km()) self._run_lifecycle(self._get_ipc_km()) self._run_lifecycle(self._get_ipc_km()) + def tcp_lifecycle_with_loop(self): + # Ensure each thread has an event loop + asyncio.set_event_loop(asyncio.new_event_loop()) + self.test_tcp_lifecycle() + def test_start_parallel_thread_kernels(self): self.test_tcp_lifecycle() - thread = threading.Thread(target=self.test_tcp_lifecycle) - thread2 = threading.Thread(target=self.test_tcp_lifecycle) + thread = threading.Thread(target=self.tcp_lifecycle_with_loop) + thread2 = threading.Thread(target=self.tcp_lifecycle_with_loop) try: thread.start() thread2.start() @@ -119,7 +123,7 @@ def test_start_parallel_thread_kernels(self): def test_start_parallel_process_kernels(self): self.test_tcp_lifecycle() - thread = threading.Thread(target=self.test_tcp_lifecycle) + thread = threading.Thread(target=self.tcp_lifecycle_with_loop) proc = mp.Process(target=self.test_tcp_lifecycle) try: @@ -130,3 +134,137 @@ def test_start_parallel_process_kernels(self): proc.join() assert proc.exitcode == 0 + + +class TestAsyncKernelManager(AsyncTestCase): + + def _get_tcp_km(self): + c = Config() + km = AsyncMultiKernelManager(config=c) + return km + + def _get_ipc_km(self): + c = Config() + c.KernelManager.transport = 'ipc' + c.KernelManager.ip = 'test' + km = AsyncMultiKernelManager(config=c) + return km + + async def _run_lifecycle(self, km): + kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + self.assertTrue(await km.is_alive(kid)) + self.assertTrue(kid in km) + self.assertTrue(kid in km.list_kernel_ids()) + self.assertEqual(len(km), 1) + await km.restart_kernel(kid, now=True) + self.assertTrue(await km.is_alive(kid)) + self.assertTrue(kid in km.list_kernel_ids()) + await km.interrupt_kernel(kid) + k = km.get_kernel(kid) + self.assertTrue(isinstance(k, AsyncKernelManager)) + await km.shutdown_kernel(kid, now=True) + self.assertNotIn(kid, km) + + async def _run_cinfo(self, km, transport, ip): + kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + k = km.get_kernel(kid) + cinfo = km.get_connection_info(kid) + self.assertEqual(transport, cinfo['transport']) + self.assertEqual(ip, cinfo['ip']) + self.assertTrue('stdin_port' in cinfo) + self.assertTrue('iopub_port' in cinfo) + stream = km.connect_iopub(kid) + stream.close() + self.assertTrue('shell_port' in cinfo) + stream = km.connect_shell(kid) + stream.close() + self.assertTrue('hb_port' in cinfo) + stream = km.connect_hb(kid) + stream.close() + await km.shutdown_kernel(kid, now=True) + self.assertNotIn(kid, km) + + @gen_test + async def test_tcp_lifecycle(self): + await self.raw_tcp_lifecycle() + + @gen_test + async def test_shutdown_all(self): + km = self._get_tcp_km() + kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + self.assertIn(kid, km) + await km.shutdown_all() + self.assertNotIn(kid, km) + # shutdown again is okay, because we have no kernels + await km.shutdown_all() + + @gen_test + async def test_tcp_cinfo(self): + km = self._get_tcp_km() + await self._run_cinfo(km, 'tcp', localhost()) + + @skip_win32 + @gen_test + async def test_ipc_lifecycle(self): + km = self._get_ipc_km() + await self._run_lifecycle(km) + + @skip_win32 + @gen_test + async def test_ipc_cinfo(self): + km = self._get_ipc_km() + await self._run_cinfo(km, 'ipc', 'test') + + @gen_test + async def test_start_sequence_tcp_kernels(self): + """Ensure that a sequence of kernel startups doesn't break anything.""" + await self._run_lifecycle(self._get_tcp_km()) + await self._run_lifecycle(self._get_tcp_km()) + await self._run_lifecycle(self._get_tcp_km()) + + @gen_test + async def test_start_sequence_ipc_kernels(self): + """Ensure that a sequence of kernel startups doesn't break anything.""" + await self._run_lifecycle(self._get_ipc_km()) + await self._run_lifecycle(self._get_ipc_km()) + await self._run_lifecycle(self._get_ipc_km()) + + def tcp_lifecycle_with_loop(self): + # Ensure each thread has an event loop + asyncio.set_event_loop(asyncio.new_event_loop()) + asyncio.get_event_loop().run_until_complete(self.raw_tcp_lifecycle()) + + async def raw_tcp_lifecycle(self): + # Since @gen_test creates an event loop, we need a raw form of + # test_tcp_lifecycle that assumes the loop already exists. + km = self._get_tcp_km() + await self._run_lifecycle(km) + + @gen_test + async def test_start_parallel_thread_kernels(self): + await self.raw_tcp_lifecycle() + + thread = threading.Thread(target=self.tcp_lifecycle_with_loop) + thread2 = threading.Thread(target=self.tcp_lifecycle_with_loop) + try: + thread.start() + thread2.start() + finally: + thread.join() + thread2.join() + + @gen_test + async def test_start_parallel_process_kernels(self): + await self.raw_tcp_lifecycle() + + thread = threading.Thread(target=self.tcp_lifecycle_with_loop) + proc = mp.Process(target=self.raw_tcp_lifecycle) + + try: + thread.start() + proc.start() + finally: + proc.join() + thread.join() + + assert proc.exitcode == 0