Skip to content

Commit

Permalink
Rename MappingKernelManage to AsyncMappingKernelManage, convert gen.c…
Browse files Browse the repository at this point in the history
…oroutine/yield to async/await, remove run_blocking
  • Loading branch information
davidbrochart committed Mar 16, 2020
1 parent 272ebab commit 93180d3
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 143 deletions.
13 changes: 6 additions & 7 deletions docs/source/extending/bundler_extensions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,19 @@ respond in any manner. For example, it may read additional query parameters
from the request, issue a redirect to another site, run a local process (e.g.,
`nbconvert`), make a HTTP request to another service, etc.

The caller of the `bundle` function is `@tornado.gen.coroutine` decorated and
wraps its call with `torando.gen.maybe_future`. This behavior means you may
The caller of the `bundle` function is `async` and wraps its call with
`jupyter_server.utils.ensure_async`. This behavior means you may
handle the web request synchronously, as in the example above, or
asynchronously using `@tornado.gen.coroutine` and `yield`, as in the example
asynchronously using `async` and `await`, as in the example
below.

.. code:: python
from tornado import gen
import asyncio
@gen.coroutine
def bundle(handler, model):
async def bundle(handler, model):
# simulate a long running IO op (e.g., deploying to a remote host)
yield gen.sleep(10)
await asyncio.sleep(10)
# now respond
handler.finish('I spent 10 seconds bundling {}!'.format(model['path']))
Expand Down
20 changes: 8 additions & 12 deletions jupyter_server/gateway/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..base.handlers import APIHandler, JupyterHandler
from ..utils import url_path_join

from tornado import gen, web
from tornado import web
from tornado.concurrent import Future
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.websocket import WebSocketHandler, websocket_connect
Expand Down Expand Up @@ -61,11 +61,10 @@ def initialize(self):
self.session = Session(config=self.config)
self.gateway = GatewayWebSocketClient(gateway_url=GatewayClient.instance().url)

@gen.coroutine
def get(self, kernel_id, *args, **kwargs):
async def get(self, kernel_id, *args, **kwargs):
self.authenticate()
self.kernel_id = cast_unicode(kernel_id, 'ascii')
yield super(WebSocketChannelsHandler, self).get(kernel_id=kernel_id, *args, **kwargs)
await super(WebSocketChannelsHandler, self).get(kernel_id=kernel_id, *args, **kwargs)

def send_ping(self):
if self.ws_connection is None and self.ping_callback is not None:
Expand Down Expand Up @@ -132,8 +131,7 @@ def __init__(self, **kwargs):
self.ws_future = Future()
self.disconnected = False

@gen.coroutine
def _connect(self, kernel_id):
async def _connect(self, kernel_id):
# websocket is initialized before connection
self.ws = None
self.kernel_id = kernel_id
Expand Down Expand Up @@ -168,14 +166,13 @@ def _disconnect(self):
self.ws_future.cancel()
self.log.debug("_disconnect: future cancelled, disconnected: {}".format(self.disconnected))

@gen.coroutine
def _read_messages(self, callback):
async def _read_messages(self, callback):
"""Read messages from gateway server."""
while self.ws is not None:
message = None
if not self.disconnected:
try:
message = yield self.ws.read_message()
message = await self.ws.read_message()
except Exception as e:
self.log.error("Exception reading message from websocket: {}".format(e)) # , exc_info=True)
if message is None:
Expand Down Expand Up @@ -229,10 +226,9 @@ class GatewayResourceHandler(APIHandler):
"""Retrieves resources for specific kernelspec definitions from kernel/enterprise gateway."""

@web.authenticated
@gen.coroutine
def get(self, kernel_name, path, include_body=True):
async def get(self, kernel_name, path, include_body=True):
ksm = self.kernel_spec_manager
kernel_spec_res = yield ksm.get_kernel_spec_resource(kernel_name, path)
kernel_spec_res = await ksm.get_kernel_spec_resource(kernel_name, path)
if kernel_spec_res is None:
self.log.warning("Kernelspec resource '{}' for '{}' not found. Gateway may not support"
" resource serving.".format(path, kernel_name))
Expand Down
95 changes: 41 additions & 54 deletions jupyter_server/gateway/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import json

from socket import gaierror
from tornado import gen, web
from tornado import web
from tornado.escape import json_encode, json_decode, url_escape
from tornado.httpclient import HTTPClient, AsyncHTTPClient, HTTPError

from ..services.kernels.kernelmanager import MappingKernelManager
from ..services.kernels.kernelmanager import AsyncMappingKernelManager
from ..services.sessions.sessionmanager import SessionManager

from jupyter_client.kernelspec import KernelSpecManager
Expand Down Expand Up @@ -269,19 +269,18 @@ def load_connection_args(self, **kwargs):
return kwargs


@gen.coroutine
def gateway_request(endpoint, **kwargs):
async def gateway_request(endpoint, **kwargs):
"""Make an async request to kernel gateway endpoint, returns a response """
client = AsyncHTTPClient()
kwargs = GatewayClient.instance().load_connection_args(**kwargs)
try:
response = yield client.fetch(endpoint, **kwargs)
response = await client.fetch(endpoint, **kwargs)
# Trap a set of common exceptions so that we can inform the user that their Gateway url is incorrect
# or the server is not running.
# NOTE: We do this here since this handler is called during the Notebook's startup and subsequent refreshes
# of the tree view.
except ConnectionRefusedError:
raise web.HTTPError(503, "Connection refused from Gateway server url '{}'. "
raise web.HTTPError(503, "Connection refused from Gateway server url '{}'. "
"Check to be sure the Gateway instance is running.".format(GatewayClient.instance().url))
except HTTPError as e:
# This can occur if the host is valid (e.g., foo.com) but there's nothing there.
Expand All @@ -293,10 +292,10 @@ def gateway_request(endpoint, **kwargs):
"Ensure gateway url is valid and the Gateway instance is running.".
format(GatewayClient.instance().url))

raise gen.Return(response)
return response


class GatewayKernelManager(MappingKernelManager):
class GatewayKernelManager(AsyncMappingKernelManager):
"""Kernel manager that supports remote kernels hosted by Jupyter Kernel or Enterprise Gateway."""

# We'll maintain our own set of kernel ids
Expand Down Expand Up @@ -328,8 +327,7 @@ def _get_kernel_endpoint_url(self, kernel_id=None):

return self.base_endpoint

@gen.coroutine
def start_kernel(self, kernel_id=None, path=None, **kwargs):
async def start_kernel(self, kernel_id=None, path=None, **kwargs):
"""Start a kernel for a session and return its kernel_id.
Parameters
Expand Down Expand Up @@ -364,21 +362,20 @@ def start_kernel(self, kernel_id=None, path=None, **kwargs):

json_body = json_encode({'name': kernel_name, 'env': kernel_env})

response = yield gateway_request(kernel_url, method='POST', body=json_body)
response = await gateway_request(kernel_url, method='POST', body=json_body)
kernel = json_decode(response.body)
kernel_id = kernel['id']
self.log.info("Kernel started: %s" % kernel_id)
self.log.debug("Kernel args: %r" % kwargs)
else:
kernel = yield self.get_kernel(kernel_id)
kernel = await self.get_kernel(kernel_id)
kernel_id = kernel['id']
self.log.info("Using existing kernel: %s" % kernel_id)

self._kernels[kernel_id] = kernel
raise gen.Return(kernel_id)
return kernel_id

@gen.coroutine
def get_kernel(self, kernel_id=None, **kwargs):
async def get_kernel(self, kernel_id=None, **kwargs):
"""Get kernel for kernel_id.
Parameters
Expand All @@ -389,7 +386,7 @@ def get_kernel(self, kernel_id=None, **kwargs):
kernel_url = self._get_kernel_endpoint_url(kernel_id)
self.log.debug("Request kernel at: %s" % kernel_url)
try:
response = yield gateway_request(kernel_url, method='GET')
response = await gateway_request(kernel_url, method='GET')
except web.HTTPError as error:
if error.status_code == 404:
self.log.warn("Kernel not found at: %s" % kernel_url)
Expand All @@ -401,10 +398,9 @@ def get_kernel(self, kernel_id=None, **kwargs):
kernel = json_decode(response.body)
self._kernels[kernel_id] = kernel
self.log.debug("Kernel retrieved: %s" % kernel)
raise gen.Return(kernel)
return kernel

@gen.coroutine
def kernel_model(self, kernel_id):
async def kernel_model(self, kernel_id):
"""Return a dictionary of kernel information described in the
JSON standard model.
Expand All @@ -414,21 +410,19 @@ def kernel_model(self, kernel_id):
The uuid of the kernel.
"""
self.log.debug("RemoteKernelManager.kernel_model: %s", kernel_id)
model = yield self.get_kernel(kernel_id)
raise gen.Return(model)
model = await self.get_kernel(kernel_id)
return model

@gen.coroutine
def list_kernels(self, **kwargs):
async def list_kernels(self, **kwargs):
"""Get a list of kernels."""
kernel_url = self._get_kernel_endpoint_url()
self.log.debug("Request list kernels: %s", kernel_url)
response = yield gateway_request(kernel_url, method='GET')
response = await gateway_request(kernel_url, method='GET')
kernels = json_decode(response.body)
self._kernels = {x['id']: x for x in kernels}
raise gen.Return(kernels)
return kernels

@gen.coroutine
def shutdown_kernel(self, kernel_id, now=False, restart=False):
async def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""Shutdown a kernel by its kernel uuid.
Parameters
Expand All @@ -442,12 +436,11 @@ def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""
kernel_url = self._get_kernel_endpoint_url(kernel_id)
self.log.debug("Request shutdown kernel at: %s", kernel_url)
response = yield gateway_request(kernel_url, method='DELETE')
response = await gateway_request(kernel_url, method='DELETE')
self.log.debug("Shutdown kernel response: %d %s", response.code, response.reason)
self.remove_kernel(kernel_id)

@gen.coroutine
def restart_kernel(self, kernel_id, now=False, **kwargs):
async def restart_kernel(self, kernel_id, now=False, **kwargs):
"""Restart a kernel by its kernel uuid.
Parameters
Expand All @@ -457,11 +450,10 @@ def restart_kernel(self, kernel_id, now=False, **kwargs):
"""
kernel_url = self._get_kernel_endpoint_url(kernel_id) + '/restart'
self.log.debug("Request restart kernel at: %s", kernel_url)
response = yield gateway_request(kernel_url, method='POST', body=json_encode({}))
response = await gateway_request(kernel_url, method='POST', body=json_encode({}))
self.log.debug("Restart kernel response: %d %s", response.code, response.reason)

@gen.coroutine
def interrupt_kernel(self, kernel_id, **kwargs):
async def interrupt_kernel(self, kernel_id, **kwargs):
"""Interrupt a kernel by its kernel uuid.
Parameters
Expand All @@ -471,7 +463,7 @@ def interrupt_kernel(self, kernel_id, **kwargs):
"""
kernel_url = self._get_kernel_endpoint_url(kernel_id) + '/interrupt'
self.log.debug("Request interrupt kernel at: %s", kernel_url)
response = yield gateway_request(kernel_url, method='POST', body=json_encode({}))
response = await gateway_request(kernel_url, method='POST', body=json_encode({}))
self.log.debug("Interrupt kernel response: %d %s", response.code, response.reason)

def shutdown_all(self, now=False):
Expand Down Expand Up @@ -517,9 +509,8 @@ def _get_kernelspecs_endpoint_url(self, kernel_name=None):

return self.base_endpoint

@gen.coroutine
def get_all_specs(self):
fetched_kspecs = yield self.list_kernel_specs()
async def get_all_specs(self):
fetched_kspecs = await self.list_kernel_specs()

# get the default kernel name and compare to that of this server.
# If different log a warning and reset the default. However, the
Expand All @@ -535,19 +526,17 @@ def get_all_specs(self):
km.default_kernel_name = remote_default_kernel_name

remote_kspecs = fetched_kspecs.get('kernelspecs')
raise gen.Return(remote_kspecs)
return remote_kspecs

@gen.coroutine
def list_kernel_specs(self):
async def list_kernel_specs(self):
"""Get a list of kernel specs."""
kernel_spec_url = self._get_kernelspecs_endpoint_url()
self.log.debug("Request list kernel specs at: %s", kernel_spec_url)
response = yield gateway_request(kernel_spec_url, method='GET')
response = await gateway_request(kernel_spec_url, method='GET')
kernel_specs = json_decode(response.body)
raise gen.Return(kernel_specs)
return kernel_specs

@gen.coroutine
def get_kernel_spec(self, kernel_name, **kwargs):
async def get_kernel_spec(self, kernel_name, **kwargs):
"""Get kernel spec for kernel_name.
Parameters
Expand All @@ -558,7 +547,7 @@ def get_kernel_spec(self, kernel_name, **kwargs):
kernel_spec_url = self._get_kernelspecs_endpoint_url(kernel_name=str(kernel_name))
self.log.debug("Request kernel spec at: %s" % kernel_spec_url)
try:
response = yield gateway_request(kernel_spec_url, method='GET')
response = await gateway_request(kernel_spec_url, method='GET')
except web.HTTPError as error:
if error.status_code == 404:
# Convert not found to KeyError since that's what the Notebook handler expects
Expand All @@ -570,10 +559,9 @@ def get_kernel_spec(self, kernel_name, **kwargs):
else:
kernel_spec = json_decode(response.body)

raise gen.Return(kernel_spec)
return kernel_spec

@gen.coroutine
def get_kernel_spec_resource(self, kernel_name, path):
async def get_kernel_spec_resource(self, kernel_name, path):
"""Get kernel spec for kernel_name.
Parameters
Expand All @@ -586,22 +574,21 @@ def get_kernel_spec_resource(self, kernel_name, path):
kernel_spec_resource_url = url_path_join(self.base_resource_endpoint, str(kernel_name), str(path))
self.log.debug("Request kernel spec resource '{}' at: {}".format(path, kernel_spec_resource_url))
try:
response = yield gateway_request(kernel_spec_resource_url, method='GET')
response = await gateway_request(kernel_spec_resource_url, method='GET')
except web.HTTPError as error:
if error.status_code == 404:
kernel_spec_resource = None
else:
raise
else:
kernel_spec_resource = response.body
raise gen.Return(kernel_spec_resource)
return kernel_spec_resource


class GatewaySessionManager(SessionManager):
kernel_manager = Instance('jupyter_server.gateway.managers.GatewayKernelManager')

@gen.coroutine
def kernel_culled(self, kernel_id):
async def kernel_culled(self, kernel_id):
"""Checks if the kernel is still considered alive and returns true if its not found. """
kernel = yield self.kernel_manager.get_kernel(kernel_id)
raise gen.Return(kernel is None)
kernel = await self.kernel_manager.get_kernel(kernel_id)
return kernel is None
4 changes: 2 additions & 2 deletions jupyter_server/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ def serverapp(


@pytest.fixture
def serverapp(configurable_serverapp, config, argv):
def serverapp(configurable_serverapp, config, argv, io_loop):
app = configurable_serverapp(config=config, argv=argv)
yield app
app.remove_server_info_file()
app.remove_browser_open_file()
app.cleanup_kernels()
io_loop.add_callback(app.cleanup_kernels)


@pytest.fixture
Expand Down
Loading

0 comments on commit 93180d3

Please sign in to comment.